You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bh...@apache.org on 2018/07/09 20:33:38 UTC
[29/37] hadoop git commit: HDDS-182:CleanUp Reimplemented classes.
Contributed by Hansiha Koneru
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
deleted file mode 100644
index 02572a8..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
+++ /dev/null
@@ -1,1115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.impl;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Longs;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.common.helpers
- .StorageContainerException;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerLifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.NodeReportProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.StorageReportProto;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.common.helpers.KeyData;
-import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
-import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
-import org.apache.hadoop.ozone.container.common.interfaces
- .ContainerDeletionChoosingPolicy;
-import org.apache.hadoop.ozone.container.common.interfaces
- .ContainerLocationManager;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
-import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.utils.MetadataKeyFilters;
-import org.apache.hadoop.utils.MetadataStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.security.DigestInputStream;
-import java.security.DigestOutputStream;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .Result.CONTAINER_EXISTS;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .Result.CONTAINER_INTERNAL_ERROR;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .Result.CONTAINER_NOT_FOUND;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .Result.ERROR_IN_COMPACT_DB;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .Result.INVALID_CONFIG;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .Result.IO_EXCEPTION;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .Result.NO_SUCH_ALGORITHM;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .Result.UNABLE_TO_READ_METADATA_DB;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .Result.UNCLOSED_CONTAINER_IO;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .Result.UNSUPPORTED_REQUEST;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
- Result.INVALID_CONTAINER_STATE;
-import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION;
-
-/**
- * A Generic ContainerManagerImpl that will be called from Ozone
- * ContainerManagerImpl. This allows us to support delta changes to ozone
- * version without having to rewrite the containerManager.
- */
-public class ContainerManagerImpl implements ContainerManager {
- static final Logger LOG =
- LoggerFactory.getLogger(ContainerManagerImpl.class);
-
- // TODO: consider primitive collection like eclipse-collections
- // to avoid autoboxing overhead
- private final ConcurrentSkipListMap<Long, ContainerData>
- containerMap = new ConcurrentSkipListMap<>();
-
- // Use a non-fair RW lock for better throughput, we may revisit this decision
- // if this causes fairness issues.
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private ContainerLocationManager locationManager;
- private ChunkManager chunkManager;
- private KeyManager keyManager;
- private Configuration conf;
-
- private ContainerDeletionChoosingPolicy containerDeletionChooser;
-
- /**
- * Init call that sets up a container Manager.
- *
- * @param config - Configuration.
- * @param containerDirs - List of Metadata Container locations.
- * @param dnDetails - DatanodeDetails.
- * @throws IOException
- */
- @Override
- public void init(
- Configuration config, List<StorageLocation> containerDirs,
- DatanodeDetails dnDetails) throws IOException {
- Preconditions.checkNotNull(config, "Config must not be null");
- Preconditions.checkNotNull(containerDirs, "Container directories cannot " +
- "be null");
- Preconditions.checkNotNull(dnDetails, "Datanode Details cannot " +
- "be null");
-
- Preconditions.checkState(containerDirs.size() > 0, "Number of container" +
- " directories must be greater than zero.");
-
- this.conf = config;
-
- readLock();
- try {
- containerDeletionChooser = ReflectionUtils.newInstance(conf.getClass(
- ScmConfigKeys.OZONE_SCM_CONTAINER_DELETION_CHOOSING_POLICY,
- TopNOrderedContainerDeletionChoosingPolicy.class,
- ContainerDeletionChoosingPolicy.class), conf);
-
- for (StorageLocation path : containerDirs) {
- File directory = Paths.get(path.getNormalizedUri()).toFile();
- if (!directory.exists() && !directory.mkdirs()) {
- LOG.error("Container metadata directory doesn't exist "
- + "and cannot be created. Path: {}", path.toString());
- throw new StorageContainerException("Container metadata "
- + "directory doesn't exist and cannot be created " + path
- .toString(), INVALID_CONFIG);
- }
-
- // TODO: This will fail if any directory is invalid.
- // We should fix this to handle invalid directories and continue.
- // Leaving it this way to fail fast for time being.
- if (!directory.isDirectory()) {
- LOG.error("Invalid path to container metadata directory. path: {}",
- path.toString());
- throw new StorageContainerException("Invalid path to container " +
- "metadata directory." + path, INVALID_CONFIG);
- }
- LOG.info("Loading containers under {}", path);
- File[] files = directory.listFiles(new ContainerFilter());
- if (files != null) {
- for (File containerFile : files) {
- LOG.debug("Loading container {}", containerFile);
- String containerPath =
- ContainerUtils.getContainerNameFromFile(containerFile);
- Preconditions.checkNotNull(containerPath, "Container path cannot" +
- " be null");
- readContainerInfo(containerPath);
- }
- }
- }
-
- List<StorageLocation> dataDirs = new LinkedList<>();
- for (String dir : config.getStrings(DFS_DATANODE_DATA_DIR_KEY)) {
- StorageLocation location = StorageLocation.parse(dir);
- dataDirs.add(location);
- }
- this.locationManager =
- new ContainerLocationManagerImpl(containerDirs, dataDirs, config);
- } finally {
- readUnlock();
- }
- }
-
- /**
- * Reads the Container Info from a file and verifies that checksum match. If
- * the checksums match, then that file is added to containerMap.
- *
- * @param containerName - Name which points to the persisted container.
- * @throws StorageContainerException
- */
- private void readContainerInfo(String containerName)
- throws StorageContainerException {
- Preconditions.checkState(containerName.length() > 0,
- "Container name length cannot be zero.");
- FileInputStream containerStream = null;
- DigestInputStream dis = null;
- FileInputStream metaStream = null;
- Path cPath = Paths.get(containerName).getFileName();
- String keyName = null;
- if (cPath != null) {
- keyName = cPath.toString();
- }
- Preconditions.checkNotNull(keyName,
- "Container Name to container key mapping is null");
-
- long containerID = Long.parseLong(keyName);
- try {
- String containerFileName = containerName.concat(CONTAINER_EXTENSION);
-
- containerStream = new FileInputStream(containerFileName);
-
- ContainerProtos.ContainerData containerDataProto =
- ContainerProtos.ContainerData.parseDelimitedFrom(containerStream);
- ContainerData containerData;
- if (containerDataProto == null) {
- // Sometimes container metadata might have been created but empty,
- // when loading the info we get a null, this often means last time
- // SCM was ending up at some middle phase causing that the metadata
- // was not populated. Such containers are marked as inactive.
- ContainerData cData = new ContainerData(containerID, conf,
- ContainerLifeCycleState.INVALID);
- containerMap.put(containerID, cData);
- return;
- }
- containerData = ContainerData.getFromProtBuf(containerDataProto, conf);
-
- // Initialize pending deletion blocks and deleted blocks count in
- // in-memory containerData.
- MetadataStore metadata = KeyUtils.getDB(containerData, conf);
- List<Map.Entry<byte[], byte[]>> underDeletionBlocks = metadata
- .getSequentialRangeKVs(null, Integer.MAX_VALUE,
- MetadataKeyFilters.getDeletingKeyFilter());
- byte[] transactionID = metadata.get(DFSUtil.string2Bytes(
- OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX + containerID));
- if (transactionID != null) {
- containerData
- .updateDeleteTransactionId(Longs.fromByteArray(transactionID));
- }
- containerData.incrPendingDeletionBlocks(underDeletionBlocks.size());
-
- List<Map.Entry<byte[], byte[]>> liveKeys = metadata
- .getRangeKVs(null, Integer.MAX_VALUE,
- MetadataKeyFilters.getNormalKeyFilter());
-
- // Get container bytesUsed upon loading container
- // The in-memory state is updated upon key write or delete
- // TODO: update containerDataProto and persist it into container MetaFile
- long bytesUsed = 0;
- bytesUsed = liveKeys.parallelStream().mapToLong(e-> {
- KeyData keyData;
- try {
- keyData = KeyUtils.getKeyData(e.getValue());
- return keyData.getSize();
- } catch (IOException ex) {
- return 0L;
- }
- }).sum();
- containerData.setBytesUsed(bytesUsed);
-
- containerMap.put(containerID, containerData);
- } catch (IOException ex) {
- LOG.error("read failed for file: {} ex: {}", containerName,
- ex.getMessage());
-
- // TODO : Add this file to a recovery Queue.
-
- // Remember that this container is busted and we cannot use it.
- ContainerData cData = new ContainerData(containerID, conf,
- ContainerLifeCycleState.INVALID);
- containerMap.put(containerID, cData);
- throw new StorageContainerException("Unable to read container info",
- UNABLE_TO_READ_METADATA_DB);
- } finally {
- IOUtils.closeStream(dis);
- IOUtils.closeStream(containerStream);
- IOUtils.closeStream(metaStream);
- }
- }
-
- /**
- * Creates a container with the given name.
- *
- * @param containerData - Container Name and metadata.
- * @throws StorageContainerException - Exception
- */
- @Override
- public void createContainer(ContainerData containerData)
- throws StorageContainerException {
- Preconditions.checkNotNull(containerData, "Container data cannot be null");
- writeLock();
- try {
- if (containerMap.containsKey(containerData.getContainerID())) {
- LOG.debug("container already exists. {}",
- containerData.getContainerID());
- throw new StorageContainerException("container already exists.",
- CONTAINER_EXISTS);
- }
-
- // This is by design. We first write and close the
- // container Info and metadata to a directory.
- // Then read back and put that info into the containerMap.
- // This allows us to make sure that our write is consistent.
-
- writeContainerInfo(containerData, false);
- File cFile = new File(containerData.getContainerPath());
- readContainerInfo(ContainerUtils.getContainerNameFromFile(cFile));
- } catch (NoSuchAlgorithmException ex) {
- LOG.error("Internal error: We seem to be running a JVM without a " +
- "needed hash algorithm.");
- throw new StorageContainerException("failed to create container",
- NO_SUCH_ALGORITHM);
- } finally {
- writeUnlock();
- }
-
- }
-
- /**
- * Writes a container to a chosen location and updates the container Map.
- *
- * The file formats of ContainerData and Container Meta is the following.
- *
- * message ContainerData {
- * required string name = 1;
- * repeated KeyValue metadata = 2;
- * optional string dbPath = 3;
- * optional string containerPath = 4;
- * optional int64 bytesUsed = 5;
- * optional int64 size = 6;
- * }
- *
- * message ContainerMeta {
- * required string fileName = 1;
- * required string hash = 2;
- * }
- *
- * @param containerData - container Data
- * @param overwrite - Whether we are overwriting.
- * @throws StorageContainerException, NoSuchAlgorithmException
- */
- private void writeContainerInfo(ContainerData containerData,
- boolean overwrite)
- throws StorageContainerException, NoSuchAlgorithmException {
-
- Preconditions.checkNotNull(this.locationManager,
- "Internal error: location manager cannot be null");
-
- FileOutputStream containerStream = null;
- DigestOutputStream dos = null;
- FileOutputStream metaStream = null;
-
- try {
- Path metadataPath = null;
- Path location = (!overwrite) ? locationManager.getContainerPath():
- Paths.get(containerData.getContainerPath()).getParent();
- if (location == null) {
- throw new StorageContainerException(
- "Failed to get container file path.",
- CONTAINER_INTERNAL_ERROR);
- }
-
- File containerFile = ContainerUtils.getContainerFile(containerData,
- location);
- String containerName = Long.toString(containerData.getContainerID());
-
- if(!overwrite) {
- ContainerUtils.verifyIsNewContainer(containerFile);
- metadataPath = this.locationManager.getDataPath(containerName);
- metadataPath = ContainerUtils.createMetadata(metadataPath,
- containerName, conf);
- } else {
- metadataPath = ContainerUtils.getMetadataDirectory(containerData);
- }
-
- containerStream = new FileOutputStream(containerFile);
-
- MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-
- dos = new DigestOutputStream(containerStream, sha);
- containerData.setDBPath(metadataPath.resolve(
- ContainerUtils.getContainerDbFileName(containerName))
- .toString());
- containerData.setContainerPath(containerFile.toString());
-
- if(containerData.getContainerDBType() == null) {
- String impl = conf.getTrimmed(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL,
- OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_DEFAULT);
- containerData.setContainerDBType(impl);
- }
-
- ContainerProtos.ContainerData protoData = containerData
- .getProtoBufMessage();
- protoData.writeDelimitedTo(dos);
-
- } catch (IOException ex) {
- // TODO : we need to clean up partially constructed files
- // The proper way to do would be for a thread
- // to read all these 3 artifacts and make sure they are
- // sane. That info needs to come from the replication
- // pipeline, and if not consistent delete these file.
-
- // In case of ozone this is *not* a deal breaker since
- // SCM is guaranteed to generate unique container names.
- // The saving grace is that we check if we have residue files
- // lying around when creating a new container. We need to queue
- // this information to a cleaner thread.
-
- LOG.error("Creation of container failed. Name: {}, we might need to " +
- "cleanup partially created artifacts. ",
- containerData.getContainerID(), ex);
- throw new StorageContainerException("Container creation failed. ",
- ex, CONTAINER_INTERNAL_ERROR);
- } finally {
- IOUtils.closeStream(dos);
- IOUtils.closeStream(containerStream);
- IOUtils.closeStream(metaStream);
- }
- }
-
- /**
- * Deletes an existing container.
- *
- * @param containerID - ID of the container.
- * @param forceDelete - whether this container should be deleted forcibly.
- * @throws StorageContainerException
- */
- @Override
- public void deleteContainer(long containerID,
- boolean forceDelete) throws StorageContainerException {
- Preconditions.checkState(containerID >= 0,
- "Container ID cannot be negative.");
- writeLock();
- try {
- if (isOpen(containerID)) {
- throw new StorageContainerException(
- "Deleting an open container is not allowed.",
- UNCLOSED_CONTAINER_IO);
- }
-
- ContainerData containerData = containerMap.get(containerID);
- if (containerData == null) {
- LOG.debug("No such container. ID: {}", containerID);
- throw new StorageContainerException("No such container. ID : " +
- containerID, CONTAINER_NOT_FOUND);
- }
-
- if(!containerData.isValid()) {
- LOG.debug("Invalid container data. ID: {}", containerID);
- throw new StorageContainerException("Invalid container data. Name : " +
- containerID, CONTAINER_NOT_FOUND);
- }
- ContainerUtils.removeContainer(containerData, conf, forceDelete);
- containerMap.remove(containerID);
- } catch (StorageContainerException e) {
- throw e;
- } catch (IOException e) {
- // TODO : An I/O error during delete can leave partial artifacts on the
- // disk. We will need the cleaner thread to cleanup this information.
- String errMsg = String.format("Failed to cleanup container. ID: %d",
- containerID);
- LOG.error(errMsg, e);
- throw new StorageContainerException(errMsg, e, IO_EXCEPTION);
- } finally {
- writeUnlock();
- }
- }
-
- /**
- * A simple interface for container Iterations.
- * <p/>
- * This call make no guarantees about consistency of the data between
- * different list calls. It just returns the best known data at that point of
- * time. It is possible that using this iteration you can miss certain
- * container from the listing.
- *
- * @param startContainerID - Return containers with ID >= startContainerID.
- * @param count - how many to return
- * @param data - Actual containerData
- * @throws StorageContainerException
- */
- @Override
- public void listContainer(long startContainerID, long count,
- List<ContainerData> data) throws StorageContainerException {
- Preconditions.checkNotNull(data,
- "Internal assertion: data cannot be null");
- Preconditions.checkState(startContainerID >= 0,
- "Start container ID cannot be negative");
- Preconditions.checkState(count > 0,
- "max number of containers returned " +
- "must be positive");
-
- readLock();
- try {
- ConcurrentNavigableMap<Long, ContainerData> map;
- if (startContainerID == 0) {
- map = containerMap.tailMap(containerMap.firstKey(), true);
- } else {
- map = containerMap.tailMap(startContainerID, false);
- }
-
- int currentCount = 0;
- for (ContainerData entry : map.values()) {
- if (currentCount < count) {
- data.add(entry);
- currentCount++;
- } else {
- return;
- }
- }
- } finally {
- readUnlock();
- }
- }
-
- /**
- * Get metadata about a specific container.
- *
- * @param containerID - ID of the container
- * @return ContainerData - Container Data.
- * @throws StorageContainerException
- */
- @Override
- public ContainerData readContainer(long containerID)
- throws StorageContainerException {
- Preconditions.checkState(containerID >= 0,
- "Container ID cannot be negative.");
- if (!containerMap.containsKey(containerID)) {
- throw new StorageContainerException("Unable to find the container. ID: "
- + containerID, CONTAINER_NOT_FOUND);
- }
- ContainerData cData = containerMap.get(containerID);
- if (cData == null) {
- throw new StorageContainerException("Invalid container data. ID: "
- + containerID, CONTAINER_INTERNAL_ERROR);
- }
- return cData;
- }
-
- /**
- * Closes a open container, if it is already closed or does not exist a
- * StorageContainerException is thrown.
- *
- * @param containerID - ID of the container.
- * @throws StorageContainerException
- */
- @Override
- public void closeContainer(long containerID)
- throws StorageContainerException, NoSuchAlgorithmException {
- ContainerData containerData = readContainer(containerID);
- containerData.closeContainer();
- writeContainerInfo(containerData, true);
- MetadataStore db = KeyUtils.getDB(containerData, conf);
-
- // It is ok if this operation takes a bit of time.
- // Close container is not expected to be instantaneous.
- try {
- db.compactDB();
- } catch (IOException e) {
- LOG.error("Error in DB compaction while closing container", e);
- throw new StorageContainerException(e, ERROR_IN_COMPACT_DB);
- }
-
- // Active is different from closed. Closed means it is immutable, active
- // false means we have some internal error that is happening to this
- // container. This is a way to track damaged containers if we have an
- // I/O failure, this allows us to take quick action in case of container
- // issues.
-
- containerMap.put(containerID, containerData);
- }
-
- @Override
- public void updateContainer(long containerID, ContainerData data,
- boolean forceUpdate) throws StorageContainerException {
- Preconditions.checkState(containerID >= 0,
- "Container ID cannot be negative.");
- Preconditions.checkNotNull(data, "Container data cannot be null");
- FileOutputStream containerStream = null;
- DigestOutputStream dos = null;
- MessageDigest sha = null;
- File containerFileBK = null, containerFile = null;
- boolean deleted = false;
-
- if(!containerMap.containsKey(containerID)) {
- throw new StorageContainerException("Container doesn't exist. Name :"
- + containerID, CONTAINER_NOT_FOUND);
- }
-
- try {
- sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
- } catch (NoSuchAlgorithmException e) {
- throw new StorageContainerException("Unable to create Message Digest,"
- + " usually this is a java configuration issue.",
- NO_SUCH_ALGORITHM);
- }
-
- try {
- Path location = locationManager.getContainerPath();
- ContainerData orgData = containerMap.get(containerID);
- if (orgData == null) {
- // updating a invalid container
- throw new StorageContainerException("Update a container with invalid" +
- "container meta data", CONTAINER_INTERNAL_ERROR);
- }
-
- if (!forceUpdate && !orgData.isOpen()) {
- throw new StorageContainerException(
- "Update a closed container is not allowed. ID: " + containerID,
- UNSUPPORTED_REQUEST);
- }
-
- containerFile = ContainerUtils.getContainerFile(orgData, location);
- // If forceUpdate is true, there is no need to check
- // whether the container file exists.
- if (!forceUpdate) {
- if (!containerFile.exists() || !containerFile.canWrite()) {
- throw new StorageContainerException(
- "Container file not exists or corrupted. ID: " + containerID,
- CONTAINER_INTERNAL_ERROR);
- }
-
- // Backup the container file
- containerFileBK = File.createTempFile(
- "tmp_" + System.currentTimeMillis() + "_",
- containerFile.getName(), containerFile.getParentFile());
- FileUtils.copyFile(containerFile, containerFileBK);
-
- deleted = containerFile.delete();
- containerStream = new FileOutputStream(containerFile);
- dos = new DigestOutputStream(containerStream, sha);
-
- ContainerProtos.ContainerData protoData = data.getProtoBufMessage();
- protoData.writeDelimitedTo(dos);
- }
-
- // Update the in-memory map
- containerMap.replace(containerID, data);
- } catch (IOException e) {
- // Restore the container file from backup
- if(containerFileBK != null && containerFileBK.exists() && deleted) {
- if(containerFile.delete()
- && containerFileBK.renameTo(containerFile)) {
- throw new StorageContainerException("Container update failed,"
- + " container data restored from the backup.",
- CONTAINER_INTERNAL_ERROR);
- } else {
- throw new StorageContainerException(
- "Failed to restore container data from the backup. ID: "
- + containerID, CONTAINER_INTERNAL_ERROR);
- }
- } else {
- throw new StorageContainerException(
- e.getMessage(), CONTAINER_INTERNAL_ERROR);
- }
- } finally {
- if (containerFileBK != null && containerFileBK.exists()) {
- if(!containerFileBK.delete()) {
- LOG.warn("Unable to delete container file backup : {}.",
- containerFileBK.getAbsolutePath());
- }
- }
- IOUtils.closeStream(dos);
- IOUtils.closeStream(containerStream);
- }
- }
-
- @VisibleForTesting
- protected File getContainerFile(ContainerData data) throws IOException {
- return ContainerUtils.getContainerFile(data,
- this.locationManager.getContainerPath());
- }
-
- /**
- * Checks if a container exists.
- *
- * @param containerID - ID of the container.
- * @return true if the container is open false otherwise.
- * @throws StorageContainerException - Throws Exception if we are not able to
- * find the container.
- */
- @Override
- public boolean isOpen(long containerID) throws StorageContainerException {
- final ContainerData containerData = containerMap.get(containerID);
- if (containerData == null) {
- throw new StorageContainerException(
- "Container not found: " + containerID, CONTAINER_NOT_FOUND);
- }
- return containerData.isOpen();
- }
-
- /**
- * Returns LifeCycle State of the container.
- * @param containerID - Id of the container
- * @return LifeCycle State of the container
- * @throws StorageContainerException
- */
- private HddsProtos.LifeCycleState getState(long containerID)
- throws StorageContainerException {
- LifeCycleState state;
- final ContainerData data = containerMap.get(containerID);
- if (data == null) {
- throw new StorageContainerException(
- "Container status not found: " + containerID, CONTAINER_NOT_FOUND);
- }
- switch (data.getState()) {
- case OPEN:
- state = LifeCycleState.OPEN;
- break;
- case CLOSING:
- state = LifeCycleState.CLOSING;
- break;
- case CLOSED:
- state = LifeCycleState.CLOSED;
- break;
- default:
- throw new StorageContainerException(
- "Invalid Container state found: " + containerID,
- INVALID_CONTAINER_STATE);
- }
-
- return state;
- }
-
- /**
- * Supports clean shutdown of container.
- *
- * @throws IOException
- */
- @Override
- public void shutdown() throws IOException {
- Preconditions.checkState(this.hasWriteLock(),
- "Assumption that we are holding the lock violated.");
- this.containerMap.clear();
- this.locationManager.shutdown();
- }
-
-
- @VisibleForTesting
- public ConcurrentSkipListMap<Long, ContainerData> getContainerMap() {
- return containerMap;
- }
-
- /**
- * Acquire read lock.
- */
- @Override
- public void readLock() {
- this.lock.readLock().lock();
-
- }
-
- @Override
- public void readLockInterruptibly() throws InterruptedException {
- this.lock.readLock().lockInterruptibly();
- }
-
- /**
- * Release read lock.
- */
- @Override
- public void readUnlock() {
- this.lock.readLock().unlock();
- }
-
- /**
- * Check if the current thread holds read lock.
- */
- @Override
- public boolean hasReadLock() {
- return this.lock.readLock().tryLock();
- }
-
- /**
- * Acquire write lock.
- */
- @Override
- public void writeLock() {
- this.lock.writeLock().lock();
- }
-
- /**
- * Acquire write lock, unless interrupted while waiting.
- */
- @Override
- public void writeLockInterruptibly() throws InterruptedException {
- this.lock.writeLock().lockInterruptibly();
-
- }
-
- /**
- * Release write lock.
- */
- @Override
- public void writeUnlock() {
- this.lock.writeLock().unlock();
-
- }
-
- /**
- * Check if the current thread holds write lock.
- */
- @Override
- public boolean hasWriteLock() {
- return this.lock.writeLock().isHeldByCurrentThread();
- }
-
- public ChunkManager getChunkManager() {
- return this.chunkManager;
- }
-
- /**
- * Sets the chunk Manager.
- *
- * @param chunkManager - Chunk Manager
- */
- public void setChunkManager(ChunkManager chunkManager) {
- this.chunkManager = chunkManager;
- }
-
- /**
- * Gets the Key Manager.
- *
- * @return KeyManager.
- */
- @Override
- public KeyManager getKeyManager() {
- return this.keyManager;
- }
-
- /**
- * Get the node report.
- * @return node report.
- */
- @Override
- public NodeReportProto getNodeReport() throws IOException {
- StorageLocationReport[] reports = locationManager.getLocationReport();
- NodeReportProto.Builder nrb = NodeReportProto.newBuilder();
- for (int i = 0; i < reports.length; i++) {
- StorageReportProto.Builder srb = StorageReportProto.newBuilder();
- nrb.addStorageReport(reports[i].getProtoBufMessage());
- }
- return nrb.build();
- }
-
-
- /**
- * Gets container reports.
- *
- * @return List of all closed containers.
- * @throws IOException
- */
- @Override
- public List<ContainerData> getClosedContainerReports() throws IOException {
- LOG.debug("Starting container report iteration.");
- // No need for locking since containerMap is a ConcurrentSkipListMap
- // And we can never get the exact state since close might happen
- // after we iterate a point.
- return containerMap.entrySet().stream()
- .filter(containerData ->
- containerData.getValue().isClosed())
- .map(containerData -> containerData.getValue())
- .collect(Collectors.toList());
- }
-
- /**
- * Get container report.
- *
- * @return The container report.
- * @throws IOException
- */
- @Override
- public ContainerReportsProto getContainerReport() throws IOException {
- LOG.debug("Starting container report iteration.");
- // No need for locking since containerMap is a ConcurrentSkipListMap
- // And we can never get the exact state since close might happen
- // after we iterate a point.
- List<ContainerData> containers = containerMap.values().stream()
- .collect(Collectors.toList());
-
- ContainerReportsProto.Builder crBuilder =
- ContainerReportsProto.newBuilder();
-
- for (ContainerData container: containers) {
- long containerId = container.getContainerID();
- StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
- StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
- ciBuilder.setContainerID(container.getContainerID())
- .setSize(container.getMaxSize())
- .setUsed(container.getBytesUsed())
- .setKeyCount(container.getKeyCount())
- .setReadCount(container.getReadCount())
- .setWriteCount(container.getWriteCount())
- .setReadBytes(container.getReadBytes())
- .setWriteBytes(container.getWriteBytes())
- .setState(getState(containerId))
- .setDeleteTransactionId(container.getDeleteTransactionId());
-
- crBuilder.addReports(ciBuilder.build());
- }
-
- return crBuilder.build();
- }
-
- /**
- * Sets the Key Manager.
- *
- * @param keyManager - Key Manager.
- */
- @Override
- public void setKeyManager(KeyManager keyManager) {
- this.keyManager = keyManager;
- }
-
- /**
- * Filter out only container files from the container metadata dir.
- */
- private static class ContainerFilter implements FilenameFilter {
- /**
- * Tests if a specified file should be included in a file list.
- *
- * @param dir the directory in which the file was found.
- * @param name the name of the file.
- * @return <code>true</code> if and only if the name should be included in
- * the file list; <code>false</code> otherwise.
- */
- @Override
- public boolean accept(File dir, String name) {
- return name.endsWith(CONTAINER_EXTENSION);
- }
- }
-
- @Override
- public List<ContainerData> chooseContainerForBlockDeletion(
- int count) throws StorageContainerException {
- readLock();
- try {
- return containerDeletionChooser.chooseContainerForBlockDeletion(
- count, containerMap);
- } finally {
- readUnlock();
- }
- }
-
- @VisibleForTesting
- public ContainerDeletionChoosingPolicy getContainerDeletionChooser() {
- return containerDeletionChooser;
- }
-
- @Override
- public void incrPendingDeletionBlocks(int numBlocks, long containerId) {
- writeLock();
- try {
- ContainerData cData = containerMap.get(containerId);
- cData.incrPendingDeletionBlocks(numBlocks);
- } finally {
- writeUnlock();
- }
- }
-
- @Override
- public void decrPendingDeletionBlocks(int numBlocks, long containerId) {
- writeLock();
- try {
- ContainerData cData = containerMap.get(containerId);
- cData.decrPendingDeletionBlocks(numBlocks);
- } finally {
- writeUnlock();
- }
- }
-
- /**
- * Increase the read count of the container.
- *
- * @param containerId - ID of the container.
- */
- @Override
- public void incrReadCount(long containerId) {
- ContainerData cData = containerMap.get(containerId);
- cData.incrReadCount();
- }
-
- public long getReadCount(long containerId) {
- ContainerData cData = containerMap.get(containerId);
- return cData.getReadCount();
- }
-
- /**
- * Increase the read counter for bytes read from the container.
- *
- * @param containerId - ID of the container.
- * @param readBytes - bytes read from the container.
- */
- @Override
- public void incrReadBytes(long containerId, long readBytes) {
- ContainerData cData = containerMap.get(containerId);
- cData.incrReadBytes(readBytes);
- }
-
- /**
- * Returns number of bytes read from the container.
- * @param containerId
- * @return
- */
- public long getReadBytes(long containerId) {
- readLock();
- try {
- ContainerData cData = containerMap.get(containerId);
- return cData.getReadBytes();
- } finally {
- readUnlock();
- }
- }
-
- /**
- * Increase the write count of the container.
- *
- * @param containerId - Name of the container.
- */
- @Override
- public void incrWriteCount(long containerId) {
- ContainerData cData = containerMap.get(containerId);
- cData.incrWriteCount();
- }
-
- public long getWriteCount(long containerId) {
- ContainerData cData = containerMap.get(containerId);
- return cData.getWriteCount();
- }
-
- /**
- * Increse the write counter for bytes write into the container.
- *
- * @param containerId - ID of the container.
- * @param writeBytes - bytes write into the container.
- */
- @Override
- public void incrWriteBytes(long containerId, long writeBytes) {
- ContainerData cData = containerMap.get(containerId);
- cData.incrWriteBytes(writeBytes);
- }
-
- public long getWriteBytes(long containerId) {
- ContainerData cData = containerMap.get(containerId);
- return cData.getWriteBytes();
- }
-
- /**
- * Increase the bytes used by the container.
- *
- * @param containerId - ID of the container.
- * @param used - additional bytes used by the container.
- * @return the current bytes used.
- */
- @Override
- public long incrBytesUsed(long containerId, long used) {
- ContainerData cData = containerMap.get(containerId);
- return cData.incrBytesUsed(used);
- }
-
- /**
- * Decrease the bytes used by the container.
- *
- * @param containerId - ID of the container.
- * @param used - additional bytes reclaimed by the container.
- * @return the current bytes used.
- */
- @Override
- public long decrBytesUsed(long containerId, long used) {
- ContainerData cData = containerMap.get(containerId);
- return cData.decrBytesUsed(used);
- }
-
- public long getBytesUsed(long containerId) {
- ContainerData cData = containerMap.get(containerId);
- return cData.getBytesUsed();
- }
-
- /**
- * Get the number of keys in the container.
- *
- * @param containerId - ID of the container.
- * @return the current key count.
- */
- @Override
- public long getNumKeys(long containerId) {
- ContainerData cData = containerMap.get(containerId);
- return cData.getKeyCount();
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
index 18a7839..bcba8c8 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.common.impl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
@@ -63,7 +64,7 @@ public class ContainerSet {
StorageContainerException {
Preconditions.checkNotNull(container, "container cannot be null");
- long containerId = container.getContainerData().getContainerId();
+ long containerId = container.getContainerData().getContainerID();
if(containerMap.putIfAbsent(containerId, container) == null) {
LOG.debug("Container with container Id {} is added to containerMap",
containerId);
@@ -133,6 +134,13 @@ public class ContainerSet {
return containerMap.entrySet().iterator();
}
+ /**
+ * Return a copy of the containerMap
+ * @return containerMap
+ */
+ public Map<Long, Container> getContainerMap() {
+ return ImmutableMap.copyOf(containerMap);
+ }
/**
* A simple interface for container Iterations.
@@ -196,7 +204,7 @@ public class ContainerSet {
for (Container container: containers) {
- long containerId = container.getContainerData().getContainerId();
+ long containerId = container.getContainerData().getContainerID();
ContainerInfo.Builder ciBuilder = ContainerInfo.newBuilder();
ContainerData containerData = container.getContainerData();
ciBuilder.setContainerID(containerId)
@@ -234,9 +242,14 @@ public class ContainerSet {
break;
default:
throw new StorageContainerException("Invalid Container state found: " +
- containerData.getContainerId(), INVALID_CONTAINER_STATE);
+ containerData.getContainerID(), INVALID_CONTAINER_STATE);
}
return state;
}
+ // TODO: Implement BlockDeletingService
+ public List<ContainerData> chooseContainerForBlockDeletion(
+ int count) throws StorageContainerException {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java
deleted file mode 100644
index 7431baa..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.impl;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CachingGetSpaceUsed;
-import org.apache.hadoop.fs.DF;
-import org.apache.hadoop.fs.GetSpaceUsed;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.net.URI;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Paths;
-import java.util.Scanner;
-
-import static org.apache.hadoop.util.RunJar.SHUTDOWN_HOOK_PRIORITY;
-
-/**
- * Class that wraps the space usage of the Datanode Container Storage Location
- * by SCM containers.
- */
-public class ContainerStorageLocation {
- private static final Logger LOG =
- LoggerFactory.getLogger(ContainerStorageLocation.class);
-
- private static final String DU_CACHE_FILE = "scmUsed";
- private volatile boolean scmUsedSaved = false;
-
- private final StorageLocation dataLocation;
- private final String storageUuId;
- private final DF usage;
- private final GetSpaceUsed scmUsage;
- private final File scmUsedFile;
-
- public ContainerStorageLocation(StorageLocation dataLoc, Configuration conf)
- throws IOException {
- this.dataLocation = dataLoc;
- this.storageUuId = DatanodeStorage.generateUuid();
- File dataDir = Paths.get(dataLoc.getNormalizedUri()).resolve(
- OzoneConsts.CONTAINER_PREFIX).toFile();
- // Initialize container data root if it does not exist as required by DF/DU
- if (!dataDir.exists()) {
- if (!dataDir.mkdirs()) {
- LOG.error("Unable to create the container storage location at : {}",
- dataDir);
- throw new IllegalArgumentException("Unable to create the container" +
- " storage location at : " + dataDir);
- }
- }
- scmUsedFile = new File(dataDir, DU_CACHE_FILE);
- // get overall disk usage
- this.usage = new DF(dataDir, conf);
- // get SCM specific usage
- this.scmUsage = new CachingGetSpaceUsed.Builder().setPath(dataDir)
- .setConf(conf)
- .setInitialUsed(loadScmUsed())
- .build();
-
- // Ensure scm usage is saved during shutdown.
- ShutdownHookManager.get().addShutdownHook(
- new Runnable() {
- @Override
- public void run() {
- if (!scmUsedSaved) {
- saveScmUsed();
- }
- }
- }, SHUTDOWN_HOOK_PRIORITY);
- }
-
- public URI getNormalizedUri() {
- return dataLocation.getNormalizedUri();
- }
-
- public String getStorageUuId() {
- return storageUuId;
- }
- public long getCapacity() {
- long capacity = usage.getCapacity();
- return (capacity > 0) ? capacity : 0;
- }
-
- public long getAvailable() throws IOException {
- long remaining = getCapacity() - getScmUsed();
- long available = usage.getAvailable();
- if (remaining > available) {
- remaining = available;
- }
- return (remaining > 0) ? remaining : 0;
- }
-
- public long getScmUsed() throws IOException{
- return scmUsage.getUsed();
- }
-
- public String getStorageLocation() {
- return getNormalizedUri().getRawPath();
- }
-
- public StorageType getStorageType() {
- return dataLocation.getStorageType();
- }
-
- public void shutdown() {
- saveScmUsed();
- scmUsedSaved = true;
-
- if (scmUsage instanceof CachingGetSpaceUsed) {
- IOUtils.cleanupWithLogger(null, ((CachingGetSpaceUsed) scmUsage));
- }
- }
-
- /**
- * Read in the cached DU value and return it if it is less than 600 seconds
- * old (DU update interval). Slight imprecision of scmUsed is not critical
- * and skipping DU can significantly shorten the startup time.
- * If the cached value is not available or too old, -1 is returned.
- */
- long loadScmUsed() {
- long cachedScmUsed;
- long mtime;
- Scanner sc;
-
- try {
- sc = new Scanner(scmUsedFile, "UTF-8");
- } catch (FileNotFoundException fnfe) {
- return -1;
- }
-
- try {
- // Get the recorded scmUsed from the file.
- if (sc.hasNextLong()) {
- cachedScmUsed = sc.nextLong();
- } else {
- return -1;
- }
- // Get the recorded mtime from the file.
- if (sc.hasNextLong()) {
- mtime = sc.nextLong();
- } else {
- return -1;
- }
-
- // Return the cached value if mtime is okay.
- if (mtime > 0 && (Time.now() - mtime < 600000L)) {
- LOG.info("Cached ScmUsed found for {} : {} ", dataLocation,
- cachedScmUsed);
- return cachedScmUsed;
- }
- return -1;
- } finally {
- sc.close();
- }
- }
-
- /**
- * Write the current scmUsed to the cache file.
- */
- void saveScmUsed() {
- if (scmUsedFile.exists() && !scmUsedFile.delete()) {
- LOG.warn("Failed to delete old scmUsed file in {}.", dataLocation);
- }
- OutputStreamWriter out = null;
- try {
- long used = getScmUsed();
- if (used > 0) {
- out = new OutputStreamWriter(new FileOutputStream(scmUsedFile),
- StandardCharsets.UTF_8);
- // mtime is written last, so that truncated writes won't be valid.
- out.write(Long.toString(used) + " " + Long.toString(Time.now()));
- out.flush();
- out.close();
- out = null;
- }
- } catch (IOException ioe) {
- // If write failed, the volume might be bad. Since the cache file is
- // not critical, log the error and continue.
- LOG.warn("Failed to write scmUsed to " + scmUsedFile, ioe);
- } finally {
- IOUtils.cleanupWithLogger(null, out);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
deleted file mode 100644
index 3ffe6e4..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
+++ /dev/null
@@ -1,695 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.container.common.helpers
- .StorageContainerException;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
-import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
-import org.apache.hadoop.ozone.container.common.helpers.KeyData;
-import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.security.NoSuchAlgorithmException;
-import java.util.LinkedList;
-import java.util.List;
-
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .Result.CLOSED_CONTAINER_IO;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .Result.GET_SMALL_FILE_ERROR;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .Result.NO_SUCH_ALGORITHM;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .Result.PUT_SMALL_FILE_ERROR;
-
-/**
- * Ozone Container dispatcher takes a call from the netty server and routes it
- * to the right handler function.
- */
-public class Dispatcher implements ContainerDispatcher {
- static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class);
-
- private final ContainerManager containerManager;
- private ContainerMetrics metrics;
- private Configuration conf;
-
- /**
- * Constructs an OzoneContainer that receives calls from
- * XceiverServerHandler.
- *
- * @param containerManager - A class that manages containers.
- */
- public Dispatcher(ContainerManager containerManager, Configuration config) {
- Preconditions.checkNotNull(containerManager);
- this.containerManager = containerManager;
- this.metrics = null;
- this.conf = config;
- }
-
- @Override
- public void init() {
- this.metrics = ContainerMetrics.create(conf);
- }
-
- @Override
- public void shutdown() {
- }
-
- @Override
- public Handler getHandler(ContainerProtos.ContainerType containerType) {
- return null;
- }
-
- @Override
- public void setScmId(String scmId) {
- // DO nothing, this will be removed when cleanup.
- }
-
- @Override
- public ContainerCommandResponseProto dispatch(
- ContainerCommandRequestProto msg) {
- LOG.trace("Command {}, trace ID: {} ", msg.getCmdType().toString(),
- msg.getTraceID());
- long startNanos = System.nanoTime();
- ContainerCommandResponseProto resp = null;
- try {
- Preconditions.checkNotNull(msg);
- Type cmdType = msg.getCmdType();
- metrics.incContainerOpsMetrics(cmdType);
- if ((cmdType == Type.CreateContainer) ||
- (cmdType == Type.DeleteContainer) ||
- (cmdType == Type.ReadContainer) ||
- (cmdType == Type.ListContainer) ||
- (cmdType == Type.UpdateContainer) ||
- (cmdType == Type.CloseContainer)) {
- resp = containerProcessHandler(msg);
- }
-
- if ((cmdType == Type.PutKey) ||
- (cmdType == Type.GetKey) ||
- (cmdType == Type.DeleteKey) ||
- (cmdType == Type.ListKey)) {
- resp = keyProcessHandler(msg);
- }
-
- if ((cmdType == Type.WriteChunk) ||
- (cmdType == Type.ReadChunk) ||
- (cmdType == Type.DeleteChunk)) {
- resp = chunkProcessHandler(msg);
- }
-
- if ((cmdType == Type.PutSmallFile) ||
- (cmdType == Type.GetSmallFile)) {
- resp = smallFileHandler(msg);
- }
-
- if (resp != null) {
- metrics.incContainerOpsLatencies(cmdType,
- System.nanoTime() - startNanos);
- return resp;
- }
-
- return ContainerUtils.unsupportedRequest(msg);
- } catch (StorageContainerException e) {
- // This useful since the trace ID will allow us to correlate failures.
- return ContainerUtils.logAndReturnError(LOG, e, msg);
- }
- }
-
- public ContainerMetrics getContainerMetrics() {
- return metrics;
- }
-
- /**
- * Handles the all Container related functionality.
- *
- * @param msg - command
- * @return - response
- * @throws StorageContainerException
- */
- private ContainerCommandResponseProto containerProcessHandler(
- ContainerCommandRequestProto msg) throws StorageContainerException {
- try {
-
- switch (msg.getCmdType()) {
- case CreateContainer:
- return handleCreateContainer(msg);
-
- case DeleteContainer:
- return handleDeleteContainer(msg);
-
- case ListContainer:
- // TODO : Support List Container.
- return ContainerUtils.unsupportedRequest(msg);
-
- case UpdateContainer:
- return handleUpdateContainer(msg);
-
- case ReadContainer:
- return handleReadContainer(msg);
-
- case CloseContainer:
- return handleCloseContainer(msg);
-
- default:
- return ContainerUtils.unsupportedRequest(msg);
- }
- } catch (StorageContainerException e) {
- return ContainerUtils.logAndReturnError(LOG, e, msg);
- } catch (IOException ex) {
- LOG.warn("Container operation failed. " +
- "Container: {} Operation: {} trace ID: {} Error: {}",
- msg.getCreateContainer().getContainerID(),
- msg.getCmdType().name(),
- msg.getTraceID(),
- ex.toString(), ex);
-
- // TODO : Replace with finer error codes.
- return ContainerUtils.getContainerCommandResponse(msg,
- ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
- ex.toString()).build();
- }
- }
-
- /**
- * Handles the all key related functionality.
- *
- * @param msg - command
- * @return - response
- * @throws StorageContainerException
- */
- private ContainerCommandResponseProto keyProcessHandler(
- ContainerCommandRequestProto msg) throws StorageContainerException {
- try {
- switch (msg.getCmdType()) {
- case PutKey:
- return handlePutKey(msg);
-
- case GetKey:
- return handleGetKey(msg);
-
- case DeleteKey:
- return handleDeleteKey(msg);
-
- case ListKey:
- return ContainerUtils.unsupportedRequest(msg);
-
- default:
- return ContainerUtils.unsupportedRequest(msg);
-
- }
- } catch (StorageContainerException e) {
- return ContainerUtils.logAndReturnError(LOG, e, msg);
- } catch (IOException ex) {
- LOG.warn("Container operation failed. " +
- "Container: {} Operation: {} trace ID: {} Error: {}",
- msg.getCreateContainer().getContainerID(),
- msg.getCmdType().name(),
- msg.getTraceID(),
- ex.toString(), ex);
-
- // TODO : Replace with finer error codes.
- return ContainerUtils.getContainerCommandResponse(msg,
- ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
- ex.toString()).build();
- }
- }
-
- /**
- * Handles the all chunk related functionality.
- *
- * @param msg - command
- * @return - response
- * @throws StorageContainerException
- */
- private ContainerCommandResponseProto chunkProcessHandler(
- ContainerCommandRequestProto msg) throws StorageContainerException {
- try {
- switch (msg.getCmdType()) {
- case WriteChunk:
- return handleWriteChunk(msg);
-
- case ReadChunk:
- return handleReadChunk(msg);
-
- case DeleteChunk:
- return handleDeleteChunk(msg);
-
- case ListChunk:
- return ContainerUtils.unsupportedRequest(msg);
-
- default:
- return ContainerUtils.unsupportedRequest(msg);
- }
- } catch (StorageContainerException e) {
- return ContainerUtils.logAndReturnError(LOG, e, msg);
- } catch (IOException ex) {
- LOG.warn("Container operation failed. " +
- "Container: {} Operation: {} trace ID: {} Error: {}",
- msg.getCreateContainer().getContainerID(),
- msg.getCmdType().name(),
- msg.getTraceID(),
- ex.toString(), ex);
-
- // TODO : Replace with finer error codes.
- return ContainerUtils.getContainerCommandResponse(msg,
- ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
- ex.toString()).build();
- }
- }
-
- /**
- * Dispatch calls to small file hanlder.
- * @param msg - request
- * @return response
- * @throws StorageContainerException
- */
- private ContainerCommandResponseProto smallFileHandler(
- ContainerCommandRequestProto msg) throws StorageContainerException {
- switch (msg.getCmdType()) {
- case PutSmallFile:
- return handlePutSmallFile(msg);
- case GetSmallFile:
- return handleGetSmallFile(msg);
- default:
- return ContainerUtils.unsupportedRequest(msg);
- }
- }
-
- /**
- * Update an existing container with the new container data.
- *
- * @param msg Request
- * @return ContainerCommandResponseProto
- * @throws IOException
- */
- private ContainerCommandResponseProto handleUpdateContainer(
- ContainerCommandRequestProto msg)
- throws IOException {
- if (!msg.hasUpdateContainer()) {
- LOG.debug("Malformed read container request. trace ID: {}",
- msg.getTraceID());
- return ContainerUtils.malformedRequest(msg);
- }
- long containerID = msg.getUpdateContainer().getContainerID();
-
- ContainerData data = new ContainerData(msg.getUpdateContainer()
- .getContainerID(), conf);
- boolean forceUpdate = msg.getUpdateContainer().getForceUpdate();
- this.containerManager.updateContainer(containerID,
- data, forceUpdate);
- return ContainerUtils.getSuccessResponse(msg);
- }
-
- /**
- * Calls into container logic and returns appropriate response.
- *
- * @param msg - Request
- * @return ContainerCommandResponseProto
- * @throws IOException
- */
- private ContainerCommandResponseProto handleReadContainer(
- ContainerCommandRequestProto msg)
- throws IOException {
-
- if (!msg.hasReadContainer()) {
- LOG.debug("Malformed read container request. trace ID: {}",
- msg.getTraceID());
- return ContainerUtils.malformedRequest(msg);
- }
-
- long containerID = msg.getReadContainer().getContainerID();
- ContainerData container = this.containerManager.
- readContainer(containerID);
- return ContainerUtils.getReadContainerResponse(msg, container);
- }
-
- /**
- * Calls into container logic and returns appropriate response.
- *
- * @param msg - Request
- * @return Response.
- * @throws IOException
- */
- private ContainerCommandResponseProto handleDeleteContainer(
- ContainerCommandRequestProto msg) throws IOException {
-
- if (!msg.hasDeleteContainer()) {
- LOG.debug("Malformed delete container request. trace ID: {}",
- msg.getTraceID());
- return ContainerUtils.malformedRequest(msg);
- }
-
- long containerID = msg.getDeleteContainer().getContainerID();
- boolean forceDelete = msg.getDeleteContainer().getForceDelete();
- this.containerManager.deleteContainer(containerID, forceDelete);
- return ContainerUtils.getSuccessResponse(msg);
- }
-
- /**
- * Calls into container logic and returns appropriate response.
- *
- * @param msg - Request
- * @return Response.
- * @throws IOException
- */
- private ContainerCommandResponseProto handleCreateContainer(
- ContainerCommandRequestProto msg) throws IOException {
- if (!msg.hasCreateContainer()) {
- LOG.debug("Malformed create container request. trace ID: {}",
- msg.getTraceID());
- return ContainerUtils.malformedRequest(msg);
- }
- ContainerData cData = new ContainerData(
- msg.getCreateContainer().getContainerID(), conf);
-
- this.containerManager.createContainer(cData);
- return ContainerUtils.getSuccessResponse(msg);
- }
-
- /**
- * closes an open container.
- *
- * @param msg -
- * @return
- * @throws IOException
- */
- private ContainerCommandResponseProto handleCloseContainer(
- ContainerCommandRequestProto msg) throws IOException {
- try {
- if (!msg.hasCloseContainer()) {
- LOG.debug("Malformed close Container request. trace ID: {}",
- msg.getTraceID());
- return ContainerUtils.malformedRequest(msg);
- }
- long containerID = msg.getCloseContainer().getContainerID();
- if (!this.containerManager.isOpen(containerID)) {
- throw new StorageContainerException("Attempting to close a closed " +
- "container.", CLOSED_CONTAINER_IO);
- }
- this.containerManager.closeContainer(containerID);
- return ContainerUtils.getSuccessResponse(msg);
- } catch (NoSuchAlgorithmException e) {
- throw new StorageContainerException("No such Algorithm", e,
- NO_SUCH_ALGORITHM);
- }
- }
-
- /**
- * Calls into chunk manager to write a chunk.
- *
- * @param msg - Request.
- * @return Response.
- * @throws IOException
- */
- private ContainerCommandResponseProto handleWriteChunk(
- ContainerCommandRequestProto msg) throws IOException {
- if (!msg.hasWriteChunk()) {
- LOG.debug("Malformed write chunk request. trace ID: {}",
- msg.getTraceID());
- return ContainerUtils.malformedRequest(msg);
- }
- BlockID blockID = BlockID.getFromProtobuf(
- msg.getWriteChunk().getBlockID());
- if (!this.containerManager.isOpen(blockID.getContainerID())) {
- throw new StorageContainerException("Write to closed container.",
- CLOSED_CONTAINER_IO);
- }
-
- ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getWriteChunk()
- .getChunkData());
- Preconditions.checkNotNull(chunkInfo);
- byte[] data = null;
- if (msg.getWriteChunk().getStage() == ContainerProtos.Stage.WRITE_DATA
- || msg.getWriteChunk().getStage() == ContainerProtos.Stage.COMBINED) {
- data = msg.getWriteChunk().getData().toByteArray();
- metrics.incContainerBytesStats(Type.WriteChunk, data.length);
-
- }
- this.containerManager.getChunkManager()
- .writeChunk(blockID, chunkInfo,
- data, msg.getWriteChunk().getStage());
-
- return ChunkUtils.getChunkResponse(msg);
- }
-
- /**
- * Calls into chunk manager to read a chunk.
- *
- * @param msg - Request.
- * @return - Response.
- * @throws IOException
- */
- private ContainerCommandResponseProto handleReadChunk(
- ContainerCommandRequestProto msg) throws IOException {
- if (!msg.hasReadChunk()) {
- LOG.debug("Malformed read chunk request. trace ID: {}",
- msg.getTraceID());
- return ContainerUtils.malformedRequest(msg);
- }
- BlockID blockID = BlockID.getFromProtobuf(
- msg.getReadChunk().getBlockID());
- ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getReadChunk()
- .getChunkData());
- Preconditions.checkNotNull(chunkInfo);
- byte[] data = this.containerManager.getChunkManager().
- readChunk(blockID, chunkInfo);
- metrics.incContainerBytesStats(Type.ReadChunk, data.length);
- return ChunkUtils.getReadChunkResponse(msg, data, chunkInfo);
- }
-
- /**
- * Calls into chunk manager to write a chunk.
- *
- * @param msg - Request.
- * @return Response.
- * @throws IOException
- */
- private ContainerCommandResponseProto handleDeleteChunk(
- ContainerCommandRequestProto msg) throws IOException {
- if (!msg.hasDeleteChunk()) {
- LOG.debug("Malformed delete chunk request. trace ID: {}",
- msg.getTraceID());
- return ContainerUtils.malformedRequest(msg);
- }
-
- BlockID blockID = BlockID.getFromProtobuf(msg.getDeleteChunk()
- .getBlockID());
- long containerID = blockID.getContainerID();
- if (!this.containerManager.isOpen(containerID)) {
- throw new StorageContainerException("Write to closed container.",
- CLOSED_CONTAINER_IO);
- }
- ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getDeleteChunk()
- .getChunkData());
- Preconditions.checkNotNull(chunkInfo);
-
- this.containerManager.getChunkManager().deleteChunk(blockID,
- chunkInfo);
- return ChunkUtils.getChunkResponse(msg);
- }
-
- /**
- * Put Key handler.
- *
- * @param msg - Request.
- * @return - Response.
- * @throws IOException
- */
- private ContainerCommandResponseProto handlePutKey(
- ContainerCommandRequestProto msg) throws IOException {
- if (!msg.hasPutKey()) {
- LOG.debug("Malformed put key request. trace ID: {}",
- msg.getTraceID());
- return ContainerUtils.malformedRequest(msg);
- }
- BlockID blockID = BlockID.getFromProtobuf(
- msg.getPutKey().getKeyData().getBlockID());
- long containerID = blockID.getContainerID();
- if (!this.containerManager.isOpen(containerID)) {
- throw new StorageContainerException("Write to closed container.",
- CLOSED_CONTAINER_IO);
- }
- KeyData keyData = KeyData.getFromProtoBuf(msg.getPutKey().getKeyData());
- Preconditions.checkNotNull(keyData);
- this.containerManager.getKeyManager().putKey(keyData);
- long numBytes = keyData.getProtoBufMessage().toByteArray().length;
- metrics.incContainerBytesStats(Type.PutKey, numBytes);
- return KeyUtils.getKeyResponse(msg);
- }
-
- /**
- * Handle Get Key.
- *
- * @param msg - Request.
- * @return - Response.
- * @throws IOException
- */
- private ContainerCommandResponseProto handleGetKey(
- ContainerCommandRequestProto msg) throws IOException {
- if (!msg.hasGetKey()) {
- LOG.debug("Malformed get key request. trace ID: {}",
- msg.getTraceID());
- return ContainerUtils.malformedRequest(msg);
- }
- KeyData keyData = new KeyData(
- BlockID.getFromProtobuf(msg.getGetKey().getBlockID()));
- Preconditions.checkNotNull(keyData);
- KeyData responseData =
- this.containerManager.getKeyManager().getKey(keyData);
- long numBytes = responseData.getProtoBufMessage().toByteArray().length;
- metrics.incContainerBytesStats(Type.GetKey, numBytes);
- return KeyUtils.getKeyDataResponse(msg, responseData);
- }
-
- /**
- * Handle Delete Key.
- *
- * @param msg - Request.
- * @return - Response.
- * @throws IOException
- */
- private ContainerCommandResponseProto handleDeleteKey(
- ContainerCommandRequestProto msg) throws IOException {
- if (!msg.hasDeleteKey()) {
- LOG.debug("Malformed delete key request. trace ID: {}",
- msg.getTraceID());
- return ContainerUtils.malformedRequest(msg);
- }
- BlockID blockID = BlockID.getFromProtobuf(msg.getDeleteKey()
- .getBlockID());
- Preconditions.checkNotNull(blockID);
- long containerID = blockID.getContainerID();
- if (!this.containerManager.isOpen(containerID)) {
- throw new StorageContainerException("Write to closed container.",
- CLOSED_CONTAINER_IO);
- }
- this.containerManager.getKeyManager().deleteKey(blockID);
- return KeyUtils.getKeyResponse(msg);
- }
-
- /**
- * Handles writing a chunk and associated key using single RPC.
- *
- * @param msg - Message.
- * @return ContainerCommandResponseProto
- * @throws StorageContainerException
- */
- private ContainerCommandResponseProto handlePutSmallFile(
- ContainerCommandRequestProto msg) throws StorageContainerException {
-
- if (!msg.hasPutSmallFile()) {
- LOG.debug("Malformed put small file request. trace ID: {}",
- msg.getTraceID());
- return ContainerUtils.malformedRequest(msg);
- }
- try {
-
- BlockID blockID = BlockID.getFromProtobuf(msg.
- getPutSmallFile().getKey().getKeyData().getBlockID());
- long containerID = blockID.getContainerID();
-
- if (!this.containerManager.isOpen(containerID)) {
- throw new StorageContainerException("Write to closed container.",
- CLOSED_CONTAINER_IO);
- }
- KeyData keyData = KeyData.getFromProtoBuf(msg.getPutSmallFile().getKey()
- .getKeyData());
- ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getPutSmallFile()
- .getChunkInfo());
- byte[] data = msg.getPutSmallFile().getData().toByteArray();
-
- metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
- this.containerManager.getChunkManager().writeChunk(blockID,
- chunkInfo, data, ContainerProtos.Stage.COMBINED);
- List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
- chunks.add(chunkInfo.getProtoBufMessage());
- keyData.setChunks(chunks);
- this.containerManager.getKeyManager().putKey(keyData);
- return SmallFileUtils.getPutFileResponseSuccess(msg);
- } catch (StorageContainerException e) {
- return ContainerUtils.logAndReturnError(LOG, e, msg);
- } catch (IOException e) {
- throw new StorageContainerException("Put Small File Failed.", e,
- PUT_SMALL_FILE_ERROR);
- }
- }
-
- /**
- * Handles getting a data stream using a key. This helps in reducing the RPC
- * overhead for small files.
- *
- * @param msg - ContainerCommandRequestProto
- * @return ContainerCommandResponseProto
- * @throws StorageContainerException
- */
- private ContainerCommandResponseProto handleGetSmallFile(
- ContainerCommandRequestProto msg) throws StorageContainerException {
- ByteString dataBuf = ByteString.EMPTY;
- if (!msg.hasGetSmallFile()) {
- LOG.debug("Malformed get small file request. trace ID: {}",
- msg.getTraceID());
- return ContainerUtils.malformedRequest(msg);
- }
- try {
- long bytes = 0;
- KeyData keyData = new KeyData(BlockID.getFromProtobuf(
- msg.getGetSmallFile().getKey().getBlockID()));
- KeyData data = this.containerManager.getKeyManager().getKey(keyData);
- ContainerProtos.ChunkInfo c = null;
- for (ContainerProtos.ChunkInfo chunk : data.getChunks()) {
- bytes += chunk.getSerializedSize();
- ByteString current =
- ByteString.copyFrom(this.containerManager.getChunkManager()
- .readChunk(keyData.getBlockID(),
- ChunkInfo.getFromProtoBuf(chunk)));
- dataBuf = dataBuf.concat(current);
- c = chunk;
- }
- metrics.incContainerBytesStats(Type.GetSmallFile, bytes);
- return SmallFileUtils.getGetSmallFileResponseSuccess(
- msg, dataBuf.toByteArray(), ChunkInfo.getFromProtoBuf(c));
- } catch (StorageContainerException e) {
- return ContainerUtils.logAndReturnError(LOG, e, msg);
- } catch (IOException e) {
- throw new StorageContainerException("Get Small File Failed", e,
- GET_SMALL_FILE_ERROR);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
deleted file mode 100644
index 40ae1c7..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.impl;
-
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Longs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.container.common.helpers
- .StorageContainerException;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
-import org.apache.hadoop.ozone.container.common.helpers.KeyData;
-import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
-import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
-import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
-import org.apache.hadoop.utils.MetadataStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .Result.NO_SUCH_KEY;
-
-/**
- * Key Manager impl.
- */
-public class KeyManagerImpl implements KeyManager {
- static final Logger LOG =
- LoggerFactory.getLogger(KeyManagerImpl.class);
-
- private static final float LOAD_FACTOR = 0.75f;
- private final ContainerManager containerManager;
- private final Configuration conf;
-
- /**
- * Constructs a key Manager.
- *
- * @param containerManager - Container Manager.
- */
- public KeyManagerImpl(ContainerManager containerManager, Configuration conf) {
- Preconditions.checkNotNull(containerManager, "Container manager cannot be" +
- " null");
- Preconditions.checkNotNull(conf, "Config cannot be null");
- this.containerManager = containerManager;
- this.conf = conf;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void putKey(KeyData data) throws IOException {
- Preconditions.checkNotNull(data,
- "KeyData cannot be null for put operation.");
- Preconditions.checkState(data.getContainerID() >= 0,
- "Container ID cannot be negative");
- containerManager.readLock();
- try {
- // We are not locking the key manager since LevelDb serializes all actions
- // against a single DB. We rely on DB level locking to avoid conflicts.
- ContainerData cData = containerManager.readContainer(
- data.getContainerID());
- MetadataStore db = KeyUtils.getDB(cData, conf);
-
- // This is a post condition that acts as a hint to the user.
- // Should never fail.
- Preconditions.checkNotNull(db, "DB cannot be null here");
- db.put(Longs.toByteArray(data.getLocalID()), data
- .getProtoBufMessage().toByteArray());
- } finally {
- containerManager.readUnlock();
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public KeyData getKey(KeyData data) throws IOException {
- containerManager.readLock();
- try {
- Preconditions.checkNotNull(data, "Key data cannot be null");
- Preconditions.checkNotNull(data.getContainerID(),
- "Container name cannot be null");
- ContainerData cData = containerManager.readContainer(data
- .getContainerID());
- MetadataStore db = KeyUtils.getDB(cData, conf);
-
- // This is a post condition that acts as a hint to the user.
- // Should never fail.
- Preconditions.checkNotNull(db, "DB cannot be null here");
-
- byte[] kData = db.get(Longs.toByteArray(data.getLocalID()));
- if (kData == null) {
- throw new StorageContainerException("Unable to find the key.",
- NO_SUCH_KEY);
- }
- ContainerProtos.KeyData keyData =
- ContainerProtos.KeyData.parseFrom(kData);
- return KeyData.getFromProtoBuf(keyData);
- } finally {
- containerManager.readUnlock();
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void deleteKey(BlockID blockID)
- throws IOException {
- Preconditions.checkNotNull(blockID, "block ID cannot be null.");
- Preconditions.checkState(blockID.getContainerID() >= 0,
- "Container ID cannot be negative.");
- Preconditions.checkState(blockID.getLocalID() >= 0,
- "Local ID cannot be negative.");
-
- containerManager.readLock();
- try {
-
- ContainerData cData = containerManager
- .readContainer(blockID.getContainerID());
- MetadataStore db = KeyUtils.getDB(cData, conf);
-
- // This is a post condition that acts as a hint to the user.
- // Should never fail.
- Preconditions.checkNotNull(db, "DB cannot be null here");
- // Note : There is a race condition here, since get and delete
- // are not atomic. Leaving it here since the impact is refusing
- // to delete a key which might have just gotten inserted after
- // the get check.
-
- byte[] kKey = Longs.toByteArray(blockID.getLocalID());
- byte[] kData = db.get(kKey);
- if (kData == null) {
- throw new StorageContainerException("Unable to find the key.",
- NO_SUCH_KEY);
- }
- db.delete(kKey);
- } finally {
- containerManager.readUnlock();
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public List<KeyData> listKey(
- long containerID, long startLocalID, int count)
- throws IOException {
- Preconditions.checkState(containerID >= 0,
- "Container ID cannot be negative");
- Preconditions.checkState(startLocalID >= 0,
- "startLocal ID cannot be negative");
- Preconditions.checkArgument(count > 0,
- "Count must be a positive number.");
- ContainerData cData = containerManager.readContainer(containerID);
- MetadataStore db = KeyUtils.getDB(cData, conf);
-
- List<KeyData> result = new ArrayList<>();
- byte[] startKeyInBytes = Longs.toByteArray(startLocalID);
- List<Map.Entry<byte[], byte[]>> range =
- db.getSequentialRangeKVs(startKeyInBytes, count, null);
- for (Map.Entry<byte[], byte[]> entry : range) {
- KeyData value = KeyUtils.getKeyData(entry.getValue());
- KeyData data = new KeyData(value.getBlockID());
- result.add(data);
- }
- return result;
- }
-
- /**
- * Shutdown keyManager.
- */
- @Override
- public void shutdown() {
- Preconditions.checkState(this.containerManager.hasWriteLock(), "asserts " +
- "that we are holding the container manager lock when shutting down.");
- KeyUtils.shutdownCache(ContainerCache.getInstance(conf));
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
index 97fdb9e..83d746b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
@@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.container.common.interfaces
.ContainerDeletionChoosingPolicy;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
index 9a109e8..68074fc 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.container.common.impl;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.container.common.interfaces
.ContainerDeletionChoosingPolicy;
import org.slf4j.Logger;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org