You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2018/10/05 22:06:48 UTC
[19/50] [abbrv] hadoop git commit: HDDS-567. Rename Mapping to
ContainerManager in SCM. Contributed by Nanda kumar.
HDDS-567. Rename Mapping to ContainerManager in SCM. Contributed by Nanda kumar.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/095c2696
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/095c2696
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/095c2696
Branch: refs/heads/HDFS-12943
Commit: 095c269620e01ce46832ea25e696c0ab71613ea3
Parents: 7b37448
Author: Nanda kumar <na...@apache.org>
Authored: Wed Oct 3 16:00:39 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Wed Oct 3 16:00:39 2018 +0530
----------------------------------------------------------------------
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 6 +-
.../block/DatanodeDeletedBlockTransactions.java | 10 +-
.../hdds/scm/block/DeletedBlockLogImpl.java | 8 +-
.../hdds/scm/block/SCMBlockDeletingService.java | 15 +-
.../container/CloseContainerEventHandler.java | 4 +-
.../scm/container/CloseContainerWatcher.java | 4 +-
.../hdds/scm/container/ContainerManager.java | 142 ++++
.../hdds/scm/container/ContainerMapping.java | 699 -------------------
.../scm/container/ContainerReportHandler.java | 12 +-
.../scm/container/ContainerStateManager.java | 8 +-
.../hadoop/hdds/scm/container/Mapping.java | 141 ----
.../hdds/scm/container/SCMContainerManager.java | 699 +++++++++++++++++++
.../scm/server/SCMClientProtocolServer.java | 22 +-
.../scm/server/SCMDatanodeProtocolServer.java | 2 +-
.../scm/server/StorageContainerManager.java | 36 +-
.../hadoop/hdds/scm/block/TestBlockManager.java | 6 +-
.../hdds/scm/block/TestDeletedBlockLog.java | 8 +-
.../TestCloseContainerEventHandler.java | 4 +-
.../scm/container/TestContainerMapping.java | 380 ----------
.../container/TestContainerReportHandler.java | 10 +-
.../container/TestContainerStateManager.java | 2 +-
.../scm/container/TestSCMContainerManager.java | 378 ++++++++++
.../hdds/scm/node/TestContainerPlacement.java | 9 +-
.../hdds/scm/node/TestDeadNodeHandler.java | 4 +-
.../container/TestCloseContainerWatcher.java | 12 +-
.../TestContainerStateManagerIntegration.java | 52 +-
.../hdds/scm/pipeline/TestNode2PipelineMap.java | 21 +-
.../hdds/scm/pipeline/TestNodeFailure.java | 18 +-
.../hdds/scm/pipeline/TestPipelineClose.java | 28 +-
.../hdds/scm/pipeline/TestSCMRestart.java | 33 +-
.../org/apache/hadoop/ozone/OzoneTestUtils.java | 6 +-
.../ozone/client/rest/TestOzoneRestClient.java | 2 +-
.../rpc/TestCloseContainerHandlingByClient.java | 6 +-
.../ozone/client/rpc/TestOzoneRpcClient.java | 2 +-
.../commandhandler/TestBlockDeletion.java | 3 +-
.../TestCloseContainerByPipeline.java | 6 +-
.../TestCloseContainerHandler.java | 2 +-
.../hadoop/ozone/om/TestScmChillMode.java | 12 +-
.../hadoop/ozone/scm/TestContainerSQLCli.java | 12 +-
39 files changed, 1415 insertions(+), 1409 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/095c2696/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 4777016..30740c7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmUtils;
-import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
@@ -70,7 +70,7 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
// by itself and does not rely on the Block service offered by SCM.
private final NodeManager nodeManager;
- private final Mapping containerManager;
+ private final ContainerManager containerManager;
private final long containerSize;
@@ -92,7 +92,7 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
* @throws IOException
*/
public BlockManagerImpl(final Configuration conf,
- final NodeManager nodeManager, final Mapping containerManager,
+ final NodeManager nodeManager, final ContainerManager containerManager,
EventPublisher eventPublisher)
throws IOException {
this.nodeManager = nodeManager;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/095c2696/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
index 8702a42..c86f9cd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
@@ -17,7 +17,7 @@
package org.apache.hadoop.hdds.scm.block;
import com.google.common.collect.ArrayListMultimap;
-import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
@@ -42,15 +42,15 @@ public class DatanodeDeletedBlockTransactions {
private int maximumAllowedTXNum;
// Current counter of inserted TX.
private int currentTXNum;
- private Mapping mappingService;
+ private ContainerManager containerManager;
// A list of TXs mapped to a certain datanode ID.
private final ArrayListMultimap<UUID, DeletedBlocksTransaction>
transactions;
- DatanodeDeletedBlockTransactions(Mapping mappingService,
+ DatanodeDeletedBlockTransactions(ContainerManager containerManager,
int maximumAllowedTXNum, int nodeNum) {
this.transactions = ArrayListMultimap.create();
- this.mappingService = mappingService;
+ this.containerManager = containerManager;
this.maximumAllowedTXNum = maximumAllowedTXNum;
this.nodeNum = nodeNum;
}
@@ -60,7 +60,7 @@ public class DatanodeDeletedBlockTransactions {
Pipeline pipeline = null;
try {
ContainerWithPipeline containerWithPipeline =
- mappingService.getContainerWithPipeline(tx.getContainerID());
+ containerManager.getContainerWithPipeline(tx.getContainerID());
if (containerWithPipeline.getContainerInfo().isContainerOpen()
|| containerWithPipeline.getPipeline().isEmpty()) {
return false;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/095c2696/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 68435d1..5d3afd5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto
.DeleteBlockTransactionResult;
import org.apache.hadoop.hdds.scm.command
.CommandStatusReportHandler.DeleteBlockStatus;
-import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -92,15 +92,15 @@ public class DeletedBlockLogImpl
private final int maxRetry;
private final MetadataStore deletedStore;
- private final Mapping containerManager;
+ private final ContainerManager containerManager;
private final Lock lock;
// The latest id of deleted blocks in the db.
private long lastTxID;
// Maps txId to set of DNs which are successful in committing the transaction
private Map<Long, Set<UUID>> transactionToDNsCommitMap;
- public DeletedBlockLogImpl(Configuration conf, Mapping containerManager)
- throws IOException {
+ public DeletedBlockLogImpl(Configuration conf,
+ ContainerManager containerManager) throws IOException {
maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/095c2696/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index b85d77f..8e07fa2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hdds.scm.block;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -63,7 +63,7 @@ public class SCMBlockDeletingService extends BackgroundService {
// ThreadPoolSize=2, 1 for scheduler and the other for the scanner.
private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 2;
private final DeletedBlockLog deletedBlockLog;
- private final Mapping mappingService;
+ private final ContainerManager containerManager;
private final NodeManager nodeManager;
private final EventPublisher eventPublisher;
@@ -81,12 +81,13 @@ public class SCMBlockDeletingService extends BackgroundService {
private int blockDeleteLimitSize;
public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
- Mapping mapper, NodeManager nodeManager, EventPublisher eventPublisher,
- long interval, long serviceTimeout, Configuration conf) {
+ ContainerManager containerManager, NodeManager nodeManager,
+ EventPublisher eventPublisher, long interval, long serviceTimeout,
+ Configuration conf) {
super("SCMBlockDeletingService", interval, TimeUnit.MILLISECONDS,
BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
this.deletedBlockLog = deletedBlockLog;
- this.mappingService = mapper;
+ this.containerManager = containerManager;
this.nodeManager = nodeManager;
this.eventPublisher = eventPublisher;
@@ -139,7 +140,7 @@ public class SCMBlockDeletingService extends BackgroundService {
List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
Map<Long, Long> transactionMap = null;
if (datanodes != null) {
- transactions = new DatanodeDeletedBlockTransactions(mappingService,
+ transactions = new DatanodeDeletedBlockTransactions(containerManager,
blockDeleteLimitSize, datanodes.size());
try {
transactionMap = deletedBlockLog.getTransactions(transactions);
@@ -174,7 +175,7 @@ public class SCMBlockDeletingService extends BackgroundService {
transactions.getTransactionIDList(dnId)));
}
}
- mappingService.updateDeleteTransactionId(transactionMap);
+ containerManager.updateDeleteTransactionId(transactionMap);
}
if (dnTxCount > 0) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/095c2696/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index 7baecc4..8be7803 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -46,9 +46,9 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
LoggerFactory.getLogger(CloseContainerEventHandler.class);
- private final Mapping containerManager;
+ private final ContainerManager containerManager;
- public CloseContainerEventHandler(Mapping containerManager) {
+ public CloseContainerEventHandler(ContainerManager containerManager) {
this.containerManager = containerManager;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/095c2696/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
index 8e277b9..7b94bd2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
@@ -44,11 +44,11 @@ public class CloseContainerWatcher extends
public static final Logger LOG =
LoggerFactory.getLogger(CloseContainerWatcher.class);
- private final Mapping containerManager;
+ private final ContainerManager containerManager;
public CloseContainerWatcher(Event<CloseContainerRetryableReq> startEvent,
Event<CloseContainerStatus> completionEvent,
- LeaseManager<Long> leaseManager, Mapping containerManager) {
+ LeaseManager<Long> leaseManager, ContainerManager containerManager) {
super(startEvent, completionEvent, leaseManager);
this.containerManager = containerManager;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/095c2696/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
new file mode 100644
index 0000000..e586f3e
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * ContainerManager class contains the mapping from a name to a pipeline
+ * mapping. This is used by SCM when allocating new locations and when
+ * looking up a key.
+ */
+public interface ContainerManager extends Closeable {
+ /**
+ * Returns the ContainerInfo from the container ID.
+ *
+ * @param containerID - ID of container.
+ * @return - ContainerInfo such as creation state and the pipeline.
+ * @throws IOException
+ */
+ ContainerInfo getContainer(long containerID) throws IOException;
+
+ /**
+ * Returns the ContainerInfo from the container ID.
+ *
+ * @param containerID - ID of container.
+ * @return - ContainerWithPipeline such as creation state and the pipeline.
+ * @throws IOException
+ */
+ ContainerWithPipeline getContainerWithPipeline(long containerID)
+ throws IOException;
+
+ /**
+ * Returns containers under certain conditions.
+ * Search container IDs from start ID(exclusive),
+ * The max size of the searching range cannot exceed the
+ * value of count.
+ *
+ * @param startContainerID start containerID, >=0,
+ * start searching at the head if 0.
+ * @param count count must be >= 0
+ * Usually the count will be replace with a very big
+ * value instead of being unlimited in case the db is very big.
+ *
+ * @return a list of container.
+ * @throws IOException
+ */
+ List<ContainerInfo> listContainer(long startContainerID, int count)
+ throws IOException;
+
+ /**
+ * Allocates a new container for a given keyName and replication factor.
+ *
+ * @param replicationFactor - replication factor of the container.
+ * @param owner
+ * @return - ContainerWithPipeline.
+ * @throws IOException
+ */
+ ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType type,
+ HddsProtos.ReplicationFactor replicationFactor, String owner)
+ throws IOException;
+
+ /**
+ * Deletes a container from SCM.
+ *
+ * @param containerID - Container ID
+ * @throws IOException
+ */
+ void deleteContainer(long containerID) throws IOException;
+
+ /**
+ * Update container state.
+ * @param containerID - Container ID
+ * @param event - container life cycle event
+ * @return - new container state
+ * @throws IOException
+ */
+ HddsProtos.LifeCycleState updateContainerState(long containerID,
+ HddsProtos.LifeCycleEvent event) throws IOException;
+
+ /**
+ * Returns the container State Manager.
+ * @return ContainerStateManager
+ */
+ ContainerStateManager getStateManager();
+
+ /**
+ * Process container report from Datanode.
+ *
+ * @param reports Container report
+ */
+ void processContainerReports(DatanodeDetails datanodeDetails,
+ ContainerReportsProto reports, boolean isRegisterCall)
+ throws IOException;
+
+ /**
+ * Update deleteTransactionId according to deleteTransactionMap.
+ *
+ * @param deleteTransactionMap Maps the containerId to latest delete
+ * transaction id for the container.
+ * @throws IOException
+ */
+ void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
+ throws IOException;
+
+ /**
+ * Returns the ContainerWithPipeline.
+ * @return NodeManager
+ */
+ ContainerWithPipeline getMatchingContainerWithPipeline(long size,
+ String owner, ReplicationType type, ReplicationFactor factor,
+ LifeCycleState state) throws IOException;
+
+ PipelineSelector getPipelineSelector();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/095c2696/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
deleted file mode 100644
index 71e17e9..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ /dev/null
@@ -1,699 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * <p>http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * <p>Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Longs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.StorageUnit;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo;
-import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.lease.Lease;
-import org.apache.hadoop.ozone.lease.LeaseException;
-import org.apache.hadoop.ozone.lease.LeaseManager;
-import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataStoreBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
- .OZONE_SCM_CONTAINER_SIZE_DEFAULT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
- .OZONE_SCM_CONTAINER_SIZE;
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
- .FAILED_TO_CHANGE_CONTAINER_STATE;
-import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB;
-
-/**
- * Mapping class contains the mapping from a name to a pipeline mapping. This
- * is used by SCM when
- * allocating new locations and when looking up a key.
- */
-public class ContainerMapping implements Mapping {
- private static final Logger LOG = LoggerFactory.getLogger(ContainerMapping
- .class);
-
- private final NodeManager nodeManager;
- private final long cacheSize;
- private final Lock lock;
- private final Charset encoding = Charset.forName("UTF-8");
- private final MetadataStore containerStore;
- private final PipelineSelector pipelineSelector;
- private final ContainerStateManager containerStateManager;
- private final LeaseManager<ContainerInfo> containerLeaseManager;
- private final EventPublisher eventPublisher;
- private final long size;
-
- /**
- * Constructs a mapping class that creates mapping between container names
- * and pipelines.
- *
- * @param nodeManager - NodeManager so that we can get the nodes that are
- * healthy to place new
- * containers.
- * @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache
- * its nodes. This is
- * passed to LevelDB and this memory is allocated in Native code space.
- * CacheSize is specified
- * in MB.
- * @throws IOException on Failure.
- */
- @SuppressWarnings("unchecked")
- public ContainerMapping(
- final Configuration conf, final NodeManager nodeManager, final int
- cacheSizeMB, EventPublisher eventPublisher) throws IOException {
- this.nodeManager = nodeManager;
- this.cacheSize = cacheSizeMB;
-
- File metaDir = getOzoneMetaDirPath(conf);
-
- // Write the container name to pipeline mapping.
- File containerDBPath = new File(metaDir, SCM_CONTAINER_DB);
- containerStore =
- MetadataStoreBuilder.newBuilder()
- .setConf(conf)
- .setDbFile(containerDBPath)
- .setCacheSize(this.cacheSize * OzoneConsts.MB)
- .build();
-
- this.lock = new ReentrantLock();
-
- size = (long)conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE,
- OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
-
- this.pipelineSelector = new PipelineSelector(nodeManager,
- conf, eventPublisher, cacheSizeMB);
-
- this.containerStateManager =
- new ContainerStateManager(conf, this, pipelineSelector);
- LOG.trace("Container State Manager created.");
-
- this.eventPublisher = eventPublisher;
-
- long containerCreationLeaseTimeout = conf.getTimeDuration(
- ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT,
- ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT,
- TimeUnit.MILLISECONDS);
- containerLeaseManager = new LeaseManager<>("ContainerCreation",
- containerCreationLeaseTimeout);
- containerLeaseManager.start();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public ContainerInfo getContainer(final long containerID) throws
- IOException {
- ContainerInfo containerInfo;
- lock.lock();
- try {
- byte[] containerBytes = containerStore.get(
- Longs.toByteArray(containerID));
- if (containerBytes == null) {
- throw new SCMException(
- "Specified key does not exist. key : " + containerID,
- SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
- }
-
- HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER
- .parseFrom(containerBytes);
- containerInfo = ContainerInfo.fromProtobuf(temp);
- return containerInfo;
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Returns the ContainerInfo and pipeline from the containerID. If container
- * has no available replicas in datanodes it returns pipeline with no
- * datanodes and empty leaderID . Pipeline#isEmpty can be used to check for
- * an empty pipeline.
- *
- * @param containerID - ID of container.
- * @return - ContainerWithPipeline such as creation state and the pipeline.
- * @throws IOException
- */
- @Override
- public ContainerWithPipeline getContainerWithPipeline(long containerID)
- throws IOException {
- ContainerInfo contInfo;
- lock.lock();
- try {
- byte[] containerBytes = containerStore.get(
- Longs.toByteArray(containerID));
- if (containerBytes == null) {
- throw new SCMException(
- "Specified key does not exist. key : " + containerID,
- SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
- }
- HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER
- .parseFrom(containerBytes);
- contInfo = ContainerInfo.fromProtobuf(temp);
-
- Pipeline pipeline;
- String leaderId = "";
- if (contInfo.isContainerOpen()) {
- // If pipeline with given pipeline Id already exist return it
- pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID());
- } else {
- // For close containers create pipeline from datanodes with replicas
- Set<DatanodeDetails> dnWithReplicas = containerStateManager
- .getContainerReplicas(contInfo.containerID());
- if (!dnWithReplicas.isEmpty()) {
- leaderId = dnWithReplicas.iterator().next().getUuidString();
- }
- pipeline = new Pipeline(leaderId, contInfo.getState(),
- ReplicationType.STAND_ALONE, contInfo.getReplicationFactor(),
- PipelineID.randomId());
- dnWithReplicas.forEach(pipeline::addMember);
- }
- return new ContainerWithPipeline(contInfo, pipeline);
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public List<ContainerInfo> listContainer(long startContainerID,
- int count) throws IOException {
- List<ContainerInfo> containerList = new ArrayList<>();
- lock.lock();
- try {
- if (containerStore.isEmpty()) {
- throw new IOException("No container exists in current db");
- }
- byte[] startKey = startContainerID <= 0 ? null :
- Longs.toByteArray(startContainerID);
- List<Map.Entry<byte[], byte[]>> range =
- containerStore.getSequentialRangeKVs(startKey, count, null);
-
- // Transform the values into the pipelines.
- // TODO: filter by container state
- for (Map.Entry<byte[], byte[]> entry : range) {
- ContainerInfo containerInfo =
- ContainerInfo.fromProtobuf(
- HddsProtos.SCMContainerInfo.PARSER.parseFrom(
- entry.getValue()));
- Preconditions.checkNotNull(containerInfo);
- containerList.add(containerInfo);
- }
- } finally {
- lock.unlock();
- }
- return containerList;
- }
-
- /**
- * Allocates a new container.
- *
- * @param replicationFactor - replication factor of the container.
- * @param owner - The string name of the Service that owns this container.
- * @return - Pipeline that makes up this container.
- * @throws IOException - Exception
- */
- @Override
- public ContainerWithPipeline allocateContainer(
- ReplicationType type,
- ReplicationFactor replicationFactor,
- String owner)
- throws IOException {
-
- ContainerInfo containerInfo;
- ContainerWithPipeline containerWithPipeline;
-
- lock.lock();
- try {
- containerWithPipeline = containerStateManager.allocateContainer(
- pipelineSelector, type, replicationFactor, owner);
- containerInfo = containerWithPipeline.getContainerInfo();
-
- byte[] containerIDBytes = Longs.toByteArray(
- containerInfo.getContainerID());
- containerStore.put(containerIDBytes, containerInfo.getProtobuf()
- .toByteArray());
- } finally {
- lock.unlock();
- }
- return containerWithPipeline;
- }
-
- /**
- * Deletes a container from SCM.
- *
- * @param containerID - Container ID
- * @throws IOException if container doesn't exist or container store failed
- * to delete the
- * specified key.
- */
- @Override
- public void deleteContainer(long containerID) throws IOException {
- lock.lock();
- try {
- byte[] dbKey = Longs.toByteArray(containerID);
- byte[] containerBytes = containerStore.get(dbKey);
- if (containerBytes == null) {
- throw new SCMException(
- "Failed to delete container " + containerID + ", reason : " +
- "container doesn't exist.",
- SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
- }
- containerStore.delete(dbKey);
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * {@inheritDoc} Used by client to update container state on SCM.
- */
- @Override
- public HddsProtos.LifeCycleState updateContainerState(
- long containerID, HddsProtos.LifeCycleEvent event) throws
- IOException {
- ContainerInfo containerInfo;
- lock.lock();
- try {
- byte[] dbKey = Longs.toByteArray(containerID);
- byte[] containerBytes = containerStore.get(dbKey);
- if (containerBytes == null) {
- throw new SCMException(
- "Failed to update container state"
- + containerID
- + ", reason : container doesn't exist.",
- SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
- }
- containerInfo =
- ContainerInfo.fromProtobuf(HddsProtos.SCMContainerInfo.PARSER
- .parseFrom(containerBytes));
-
- Preconditions.checkNotNull(containerInfo);
- switch (event) {
- case CREATE:
- // Acquire lease on container
- Lease<ContainerInfo> containerLease =
- containerLeaseManager.acquire(containerInfo);
- // Register callback to be executed in case of timeout
- containerLease.registerCallBack(() -> {
- updateContainerState(containerID,
- HddsProtos.LifeCycleEvent.TIMEOUT);
- return null;
- });
- break;
- case CREATED:
- // Release the lease on container
- containerLeaseManager.release(containerInfo);
- break;
- case FINALIZE:
- // TODO: we don't need a lease manager here for closing as the
- // container report will include the container state after HDFS-13008
- // If a client failed to update the container close state, DN container
- // report from 3 DNs will be used to close the container eventually.
- break;
- case CLOSE:
- break;
- case UPDATE:
- break;
- case DELETE:
- break;
- case TIMEOUT:
- break;
- case CLEANUP:
- break;
- default:
- throw new SCMException("Unsupported container LifeCycleEvent.",
- FAILED_TO_CHANGE_CONTAINER_STATE);
- }
- // If the below updateContainerState call fails, we should revert the
- // changes made in switch case.
- // Like releasing the lease in case of BEGIN_CREATE.
- ContainerInfo updatedContainer = containerStateManager
- .updateContainerState(containerInfo, event);
- if (!updatedContainer.isContainerOpen()) {
- pipelineSelector.removeContainerFromPipeline(
- containerInfo.getPipelineID(), containerID);
- }
- containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
- return updatedContainer.getState();
- } catch (LeaseException e) {
- throw new IOException("Lease Exception.", e);
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Update deleteTransactionId according to deleteTransactionMap.
- *
- * @param deleteTransactionMap Maps the containerId to latest delete
- * transaction id for the container.
- * @throws IOException
- */
- public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
- throws IOException {
- if (deleteTransactionMap == null) {
- return;
- }
-
- lock.lock();
- try {
- BatchOperation batch = new BatchOperation();
- for (Map.Entry<Long, Long> entry : deleteTransactionMap.entrySet()) {
- long containerID = entry.getKey();
- byte[] dbKey = Longs.toByteArray(containerID);
- byte[] containerBytes = containerStore.get(dbKey);
- if (containerBytes == null) {
- throw new SCMException(
- "Failed to increment number of deleted blocks for container "
- + containerID + ", reason : " + "container doesn't exist.",
- SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
- }
- ContainerInfo containerInfo = ContainerInfo.fromProtobuf(
- HddsProtos.SCMContainerInfo.parseFrom(containerBytes));
- containerInfo.updateDeleteTransactionId(entry.getValue());
- batch.put(dbKey, containerInfo.getProtobuf().toByteArray());
- }
- containerStore.writeBatch(batch);
- containerStateManager
- .updateDeleteTransactionId(deleteTransactionMap);
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Returns the container State Manager.
- *
- * @return ContainerStateManager
- */
- @Override
- public ContainerStateManager getStateManager() {
- return containerStateManager;
- }
-
- /**
- * Return a container matching the attributes specified.
- *
- * @param sizeRequired - Space needed in the Container.
- * @param owner - Owner of the container - A specific nameservice.
- * @param type - Replication Type {StandAlone, Ratis}
- * @param factor - Replication Factor {ONE, THREE}
- * @param state - State of the Container-- {Open, Allocated etc.}
- * @return ContainerInfo, null if there is no match found.
- */
- public ContainerWithPipeline getMatchingContainerWithPipeline(
- final long sizeRequired, String owner, ReplicationType type,
- ReplicationFactor factor, LifeCycleState state) throws IOException {
- ContainerInfo containerInfo = getStateManager()
- .getMatchingContainer(sizeRequired, owner, type, factor, state);
- if (containerInfo == null) {
- return null;
- }
- Pipeline pipeline = pipelineSelector
- .getPipeline(containerInfo.getPipelineID());
- return new ContainerWithPipeline(containerInfo, pipeline);
- }
-
- /**
- * Process container report from Datanode.
- * <p>
- * Processing follows a very simple logic for time being.
- * <p>
- * 1. Datanodes report the current State -- denoted by the datanodeState
- * <p>
- * 2. We are the older SCM state from the Database -- denoted by
- * the knownState.
- * <p>
- * 3. We copy the usage etc. from currentState to newState and log that
- * newState to the DB. This allows us SCM to bootup again and read the
- * state of the world from the DB, and then reconcile the state from
- * container reports, when they arrive.
- *
- * @param reports Container report
- */
- @Override
- public void processContainerReports(DatanodeDetails datanodeDetails,
- ContainerReportsProto reports, boolean isRegisterCall)
- throws IOException {
- List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
- containerInfos = reports.getReportsList();
- PendingDeleteStatusList pendingDeleteStatusList =
- new PendingDeleteStatusList(datanodeDetails);
- for (StorageContainerDatanodeProtocolProtos.ContainerInfo contInfo :
- containerInfos) {
- // Update replica info during registration process.
- if (isRegisterCall) {
- try {
- getStateManager().addContainerReplica(ContainerID.
- valueof(contInfo.getContainerID()), datanodeDetails);
- } catch (Exception ex) {
- // Continue to next one after logging the error.
- LOG.error("Error while adding replica for containerId {}.",
- contInfo.getContainerID(), ex);
- }
- }
- byte[] dbKey = Longs.toByteArray(contInfo.getContainerID());
- lock.lock();
- try {
- byte[] containerBytes = containerStore.get(dbKey);
- if (containerBytes != null) {
- HddsProtos.SCMContainerInfo knownState =
- HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
-
- if (knownState.getState() == LifeCycleState.CLOSING
- && contInfo.getState() == LifeCycleState.CLOSED) {
-
- updateContainerState(contInfo.getContainerID(),
- LifeCycleEvent.CLOSE);
-
- //reread the container
- knownState =
- HddsProtos.SCMContainerInfo.PARSER
- .parseFrom(containerStore.get(dbKey));
- }
-
- HddsProtos.SCMContainerInfo newState =
- reconcileState(contInfo, knownState, datanodeDetails);
-
- if (knownState.getDeleteTransactionId() > contInfo
- .getDeleteTransactionId()) {
- pendingDeleteStatusList
- .addPendingDeleteStatus(contInfo.getDeleteTransactionId(),
- knownState.getDeleteTransactionId(),
- knownState.getContainerID());
- }
-
- // FIX ME: This can be optimized, we write twice to memory, where a
- // single write would work well.
- //
- // We need to write this to DB again since the closed only write
- // the updated State.
- containerStore.put(dbKey, newState.toByteArray());
-
- } else {
- // Container not found in our container db.
- LOG.error("Error while processing container report from datanode :" +
- " {}, for container: {}, reason: container doesn't exist in" +
- "container database.", datanodeDetails,
- contInfo.getContainerID());
- }
- } finally {
- lock.unlock();
- }
- }
- if (pendingDeleteStatusList.getNumPendingDeletes() > 0) {
- eventPublisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS,
- pendingDeleteStatusList);
- }
-
- }
-
- /**
- * Reconciles the state from Datanode with the state in SCM.
- *
- * @param datanodeState - State from the Datanode.
- * @param knownState - State inside SCM.
- * @param dnDetails
- * @return new SCM State for this container.
- */
- private HddsProtos.SCMContainerInfo reconcileState(
- StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState,
- SCMContainerInfo knownState, DatanodeDetails dnDetails) {
- HddsProtos.SCMContainerInfo.Builder builder =
- HddsProtos.SCMContainerInfo.newBuilder();
- builder.setContainerID(knownState.getContainerID())
- .setPipelineID(knownState.getPipelineID())
- .setReplicationType(knownState.getReplicationType())
- .setReplicationFactor(knownState.getReplicationFactor());
-
- // TODO: If current state doesn't have this DN in list of DataNodes with
- // replica then add it in list of replicas.
-
- // If used size is greater than allocated size, we will be updating
- // allocated size with used size. This update is done as a fallback
- // mechanism in case SCM crashes without properly updating allocated
- // size. Correct allocated value will be updated by
- // ContainerStateManager during SCM shutdown.
- long usedSize = datanodeState.getUsed();
- long allocated = knownState.getAllocatedBytes() > usedSize ?
- knownState.getAllocatedBytes() : usedSize;
- builder.setAllocatedBytes(allocated)
- .setUsedBytes(usedSize)
- .setNumberOfKeys(datanodeState.getKeyCount())
- .setState(knownState.getState())
- .setStateEnterTime(knownState.getStateEnterTime())
- .setContainerID(knownState.getContainerID())
- .setDeleteTransactionId(knownState.getDeleteTransactionId());
- if (knownState.getOwner() != null) {
- builder.setOwner(knownState.getOwner());
- }
- return builder.build();
- }
-
-
- /**
- * In Container is in closed state, if it is in closed, Deleting or Deleted
- * State.
- *
- * @param info - ContainerInfo.
- * @return true if is in open state, false otherwise
- */
- private boolean shouldClose(ContainerInfo info) {
- return info.getState() == HddsProtos.LifeCycleState.OPEN;
- }
-
- private boolean isClosed(ContainerInfo info) {
- return info.getState() == HddsProtos.LifeCycleState.CLOSED;
- }
-
- /**
- * Closes this stream and releases any system resources associated with it.
- * If the stream is
- * already closed then invoking this method has no effect.
- * <p>
- * <p>As noted in {@link AutoCloseable#close()}, cases where the close may
- * fail require careful
- * attention. It is strongly advised to relinquish the underlying resources
- * and to internally
- * <em>mark</em> the {@code Closeable} as closed, prior to throwing the
- * {@code IOException}.
- *
- * @throws IOException if an I/O error occurs
- */
- @Override
- public void close() throws IOException {
- if (containerLeaseManager != null) {
- containerLeaseManager.shutdown();
- }
- if (containerStateManager != null) {
- flushContainerInfo();
- containerStateManager.close();
- }
- if (containerStore != null) {
- containerStore.close();
- }
-
- if (pipelineSelector != null) {
- pipelineSelector.shutdown();
- }
- }
-
- /**
- * Since allocatedBytes of a container is only in memory, stored in
- * containerStateManager, when closing ContainerMapping, we need to update
- * this in the container store.
- *
- * @throws IOException on failure.
- */
- @VisibleForTesting
- public void flushContainerInfo() throws IOException {
- List<ContainerInfo> containers = containerStateManager.getAllContainers();
- List<Long> failedContainers = new ArrayList<>();
- for (ContainerInfo info : containers) {
- // even if some container updated failed, others can still proceed
- try {
- byte[] dbKey = Longs.toByteArray(info.getContainerID());
- byte[] containerBytes = containerStore.get(dbKey);
- // TODO : looks like when a container is deleted, the container is
- // removed from containerStore but not containerStateManager, so it can
- // return info of a deleted container. may revisit this in the future,
- // for now, just skip a not-found container
- if (containerBytes != null) {
- containerStore.put(dbKey, info.getProtobuf().toByteArray());
- } else {
- LOG.debug("Container state manager has container {} but not found " +
- "in container store, a deleted container?",
- info.getContainerID());
- }
- } catch (IOException ioe) {
- failedContainers.add(info.getContainerID());
- }
- }
- if (!failedContainers.isEmpty()) {
- throw new IOException("Error in flushing container info from container " +
- "state manager: " + failedContainers);
- }
- }
-
- @VisibleForTesting
- public MetadataStore getContainerStore() {
- return containerStore;
- }
-
- public PipelineSelector getPipelineSelector() {
- return pipelineSelector;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/095c2696/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
index 71935f0..0f824a0 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
@@ -50,21 +50,21 @@ public class ContainerReportHandler implements
private final NodeManager nodeManager;
- private final Mapping containerMapping;
+ private final ContainerManager containerManager;
private ContainerStateManager containerStateManager;
private ReplicationActivityStatus replicationStatus;
- public ContainerReportHandler(Mapping containerMapping,
+ public ContainerReportHandler(ContainerManager containerManager,
NodeManager nodeManager,
ReplicationActivityStatus replicationActivityStatus) {
- Preconditions.checkNotNull(containerMapping);
+ Preconditions.checkNotNull(containerManager);
Preconditions.checkNotNull(nodeManager);
Preconditions.checkNotNull(replicationActivityStatus);
- this.containerStateManager = containerMapping.getStateManager();
+ this.containerStateManager = containerManager.getStateManager();
this.nodeManager = nodeManager;
- this.containerMapping = containerMapping;
+ this.containerManager = containerManager;
this.replicationStatus = replicationActivityStatus;
}
@@ -80,7 +80,7 @@ public class ContainerReportHandler implements
try {
//update state in container db and trigger close container events
- containerMapping
+ containerManager
.processContainerReports(datanodeOrigin, containerReport, false);
Set<ContainerID> containerIds = containerReport.getReportsList().stream()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/095c2696/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 930c098..b8e4e89 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -136,7 +136,7 @@ public class ContainerStateManager implements Closeable {
*/
@SuppressWarnings("unchecked")
public ContainerStateManager(Configuration configuration,
- Mapping containerMapping, PipelineSelector pipelineSelector) {
+ ContainerManager containerManager, PipelineSelector pipelineSelector) {
// Initialize the container state machine.
Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
@@ -158,15 +158,15 @@ public class ContainerStateManager implements Closeable {
lastUsedMap = new ConcurrentHashMap<>();
containerCount = new AtomicLong(0);
containers = new ContainerStateMap();
- loadExistingContainers(containerMapping, pipelineSelector);
+ loadExistingContainers(containerManager, pipelineSelector);
}
- private void loadExistingContainers(Mapping containerMapping,
+ private void loadExistingContainers(ContainerManager containerManager,
PipelineSelector pipelineSelector) {
List<ContainerInfo> containerList;
try {
- containerList = containerMapping.listContainer(0, Integer.MAX_VALUE);
+ containerList = containerManager.listContainer(0, Integer.MAX_VALUE);
// if there are no container to load, let us return.
if (containerList == null || containerList.size() == 0) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/095c2696/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
deleted file mode 100644
index 5ed80cb..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container;
-
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Mapping class contains the mapping from a name to a pipeline mapping. This is
- * used by SCM when allocating new locations and when looking up a key.
- */
-public interface Mapping extends Closeable {
- /**
- * Returns the ContainerInfo from the container ID.
- *
- * @param containerID - ID of container.
- * @return - ContainerInfo such as creation state and the pipeline.
- * @throws IOException
- */
- ContainerInfo getContainer(long containerID) throws IOException;
-
- /**
- * Returns the ContainerInfo from the container ID.
- *
- * @param containerID - ID of container.
- * @return - ContainerWithPipeline such as creation state and the pipeline.
- * @throws IOException
- */
- ContainerWithPipeline getContainerWithPipeline(long containerID)
- throws IOException;
-
- /**
- * Returns containers under certain conditions.
- * Search container IDs from start ID(exclusive),
- * The max size of the searching range cannot exceed the
- * value of count.
- *
- * @param startContainerID start containerID, >=0,
- * start searching at the head if 0.
- * @param count count must be >= 0
- * Usually the count will be replace with a very big
- * value instead of being unlimited in case the db is very big.
- *
- * @return a list of container.
- * @throws IOException
- */
- List<ContainerInfo> listContainer(long startContainerID, int count)
- throws IOException;
-
- /**
- * Allocates a new container for a given keyName and replication factor.
- *
- * @param replicationFactor - replication factor of the container.
- * @param owner
- * @return - ContainerWithPipeline.
- * @throws IOException
- */
- ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType type,
- HddsProtos.ReplicationFactor replicationFactor, String owner)
- throws IOException;
-
- /**
- * Deletes a container from SCM.
- *
- * @param containerID - Container ID
- * @throws IOException
- */
- void deleteContainer(long containerID) throws IOException;
-
- /**
- * Update container state.
- * @param containerID - Container ID
- * @param event - container life cycle event
- * @return - new container state
- * @throws IOException
- */
- HddsProtos.LifeCycleState updateContainerState(long containerID,
- HddsProtos.LifeCycleEvent event) throws IOException;
-
- /**
- * Returns the container State Manager.
- * @return ContainerStateManager
- */
- ContainerStateManager getStateManager();
-
- /**
- * Process container report from Datanode.
- *
- * @param reports Container report
- */
- void processContainerReports(DatanodeDetails datanodeDetails,
- ContainerReportsProto reports, boolean isRegisterCall)
- throws IOException;
-
- /**
- * Update deleteTransactionId according to deleteTransactionMap.
- *
- * @param deleteTransactionMap Maps the containerId to latest delete
- * transaction id for the container.
- * @throws IOException
- */
- void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
- throws IOException;
-
- /**
- * Returns the ContainerWithPipeline.
- * @return NodeManager
- */
- ContainerWithPipeline getMatchingContainerWithPipeline(long size,
- String owner, ReplicationType type, ReplicationFactor factor,
- LifeCycleState state) throws IOException;
-
- PipelineSelector getPipelineSelector();
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/095c2696/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
new file mode 100644
index 0000000..df26e36
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo;
+import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.lease.Lease;
+import org.apache.hadoop.ozone.lease.LeaseException;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_SCM_CONTAINER_SIZE_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_SCM_CONTAINER_SIZE;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+ .FAILED_TO_CHANGE_CONTAINER_STATE;
+import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB;
+
+/**
+ * ContainerManager class contains the mapping from a name to a pipeline
+ * mapping. This is used by SCM when allocating new locations and when
+ * looking up a key.
+ */
+public class SCMContainerManager implements ContainerManager {
+ private static final Logger LOG = LoggerFactory.getLogger(SCMContainerManager
+ .class);
+
+ private final NodeManager nodeManager;
+ private final long cacheSize;
+ private final Lock lock;
+ private final Charset encoding = Charset.forName("UTF-8");
+ private final MetadataStore containerStore;
+ private final PipelineSelector pipelineSelector;
+ private final ContainerStateManager containerStateManager;
+ private final LeaseManager<ContainerInfo> containerLeaseManager;
+ private final EventPublisher eventPublisher;
+ private final long size;
+
+ /**
+ * Constructs a mapping class that creates mapping between container names
+ * and pipelines.
+ *
+ * @param nodeManager - NodeManager so that we can get the nodes that are
+ * healthy to place new
+ * containers.
+ * @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache
+ * its nodes. This is
+ * passed to LevelDB and this memory is allocated in Native code space.
+ * CacheSize is specified
+ * in MB.
+ * @throws IOException on Failure.
+ */
+ @SuppressWarnings("unchecked")
+ public SCMContainerManager(
+ final Configuration conf, final NodeManager nodeManager, final int
+ cacheSizeMB, EventPublisher eventPublisher) throws IOException {
+ this.nodeManager = nodeManager;
+ this.cacheSize = cacheSizeMB;
+
+ File metaDir = getOzoneMetaDirPath(conf);
+
+ // Write the container name to pipeline mapping.
+ File containerDBPath = new File(metaDir, SCM_CONTAINER_DB);
+ containerStore =
+ MetadataStoreBuilder.newBuilder()
+ .setConf(conf)
+ .setDbFile(containerDBPath)
+ .setCacheSize(this.cacheSize * OzoneConsts.MB)
+ .build();
+
+ this.lock = new ReentrantLock();
+
+ size = (long)conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE,
+ OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+
+ this.pipelineSelector = new PipelineSelector(nodeManager,
+ conf, eventPublisher, cacheSizeMB);
+
+ this.containerStateManager =
+ new ContainerStateManager(conf, this, pipelineSelector);
+ LOG.trace("Container State Manager created.");
+
+ this.eventPublisher = eventPublisher;
+
+ long containerCreationLeaseTimeout = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT,
+ ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ containerLeaseManager = new LeaseManager<>("ContainerCreation",
+ containerCreationLeaseTimeout);
+ containerLeaseManager.start();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ContainerInfo getContainer(final long containerID) throws
+ IOException {
+ ContainerInfo containerInfo;
+ lock.lock();
+ try {
+ byte[] containerBytes = containerStore.get(
+ Longs.toByteArray(containerID));
+ if (containerBytes == null) {
+ throw new SCMException(
+ "Specified key does not exist. key : " + containerID,
+ SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+ }
+
+ HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER
+ .parseFrom(containerBytes);
+ containerInfo = ContainerInfo.fromProtobuf(temp);
+ return containerInfo;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns the ContainerInfo and pipeline from the containerID. If container
+ * has no available replicas in datanodes it returns pipeline with no
+ * datanodes and empty leaderID . Pipeline#isEmpty can be used to check for
+ * an empty pipeline.
+ *
+ * @param containerID - ID of container.
+ * @return - ContainerWithPipeline such as creation state and the pipeline.
+ * @throws IOException
+ */
+ @Override
+ public ContainerWithPipeline getContainerWithPipeline(long containerID)
+ throws IOException {
+ ContainerInfo contInfo;
+ lock.lock();
+ try {
+ byte[] containerBytes = containerStore.get(
+ Longs.toByteArray(containerID));
+ if (containerBytes == null) {
+ throw new SCMException(
+ "Specified key does not exist. key : " + containerID,
+ SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+ }
+ HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER
+ .parseFrom(containerBytes);
+ contInfo = ContainerInfo.fromProtobuf(temp);
+
+ Pipeline pipeline;
+ String leaderId = "";
+ if (contInfo.isContainerOpen()) {
+ // If pipeline with given pipeline Id already exist return it
+ pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID());
+ } else {
+ // For close containers create pipeline from datanodes with replicas
+ Set<DatanodeDetails> dnWithReplicas = containerStateManager
+ .getContainerReplicas(contInfo.containerID());
+ if (!dnWithReplicas.isEmpty()) {
+ leaderId = dnWithReplicas.iterator().next().getUuidString();
+ }
+ pipeline = new Pipeline(leaderId, contInfo.getState(),
+ ReplicationType.STAND_ALONE, contInfo.getReplicationFactor(),
+ PipelineID.randomId());
+ dnWithReplicas.forEach(pipeline::addMember);
+ }
+ return new ContainerWithPipeline(contInfo, pipeline);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<ContainerInfo> listContainer(long startContainerID,
+ int count) throws IOException {
+ List<ContainerInfo> containerList = new ArrayList<>();
+ lock.lock();
+ try {
+ if (containerStore.isEmpty()) {
+ throw new IOException("No container exists in current db");
+ }
+ byte[] startKey = startContainerID <= 0 ? null :
+ Longs.toByteArray(startContainerID);
+ List<Map.Entry<byte[], byte[]>> range =
+ containerStore.getSequentialRangeKVs(startKey, count, null);
+
+ // Transform the values into the pipelines.
+ // TODO: filter by container state
+ for (Map.Entry<byte[], byte[]> entry : range) {
+ ContainerInfo containerInfo =
+ ContainerInfo.fromProtobuf(
+ HddsProtos.SCMContainerInfo.PARSER.parseFrom(
+ entry.getValue()));
+ Preconditions.checkNotNull(containerInfo);
+ containerList.add(containerInfo);
+ }
+ } finally {
+ lock.unlock();
+ }
+ return containerList;
+ }
+
+ /**
+ * Allocates a new container.
+ *
+ * @param replicationFactor - replication factor of the container.
+ * @param owner - The string name of the Service that owns this container.
+ * @return - Pipeline that makes up this container.
+ * @throws IOException - Exception
+ */
+ @Override
+ public ContainerWithPipeline allocateContainer(
+ ReplicationType type,
+ ReplicationFactor replicationFactor,
+ String owner)
+ throws IOException {
+
+ ContainerInfo containerInfo;
+ ContainerWithPipeline containerWithPipeline;
+
+ lock.lock();
+ try {
+ containerWithPipeline = containerStateManager.allocateContainer(
+ pipelineSelector, type, replicationFactor, owner);
+ containerInfo = containerWithPipeline.getContainerInfo();
+
+ byte[] containerIDBytes = Longs.toByteArray(
+ containerInfo.getContainerID());
+ containerStore.put(containerIDBytes, containerInfo.getProtobuf()
+ .toByteArray());
+ } finally {
+ lock.unlock();
+ }
+ return containerWithPipeline;
+ }
+
+ /**
+ * Deletes a container from SCM.
+ *
+ * @param containerID - Container ID
+ * @throws IOException if container doesn't exist or container store failed
+ * to delete the
+ * specified key.
+ */
+ @Override
+ public void deleteContainer(long containerID) throws IOException {
+ lock.lock();
+ try {
+ byte[] dbKey = Longs.toByteArray(containerID);
+ byte[] containerBytes = containerStore.get(dbKey);
+ if (containerBytes == null) {
+ throw new SCMException(
+ "Failed to delete container " + containerID + ", reason : " +
+ "container doesn't exist.",
+ SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+ }
+ containerStore.delete(dbKey);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * {@inheritDoc} Used by client to update container state on SCM.
+ */
+ @Override
+ public HddsProtos.LifeCycleState updateContainerState(
+ long containerID, HddsProtos.LifeCycleEvent event) throws
+ IOException {
+ ContainerInfo containerInfo;
+ lock.lock();
+ try {
+ byte[] dbKey = Longs.toByteArray(containerID);
+ byte[] containerBytes = containerStore.get(dbKey);
+ if (containerBytes == null) {
+ throw new SCMException(
+ "Failed to update container state"
+ + containerID
+ + ", reason : container doesn't exist.",
+ SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+ }
+ containerInfo =
+ ContainerInfo.fromProtobuf(HddsProtos.SCMContainerInfo.PARSER
+ .parseFrom(containerBytes));
+
+ Preconditions.checkNotNull(containerInfo);
+ switch (event) {
+ case CREATE:
+ // Acquire lease on container
+ Lease<ContainerInfo> containerLease =
+ containerLeaseManager.acquire(containerInfo);
+ // Register callback to be executed in case of timeout
+ containerLease.registerCallBack(() -> {
+ updateContainerState(containerID,
+ HddsProtos.LifeCycleEvent.TIMEOUT);
+ return null;
+ });
+ break;
+ case CREATED:
+ // Release the lease on container
+ containerLeaseManager.release(containerInfo);
+ break;
+ case FINALIZE:
+ // TODO: we don't need a lease manager here for closing as the
+ // container report will include the container state after HDFS-13008
+ // If a client failed to update the container close state, DN container
+ // report from 3 DNs will be used to close the container eventually.
+ break;
+ case CLOSE:
+ break;
+ case UPDATE:
+ break;
+ case DELETE:
+ break;
+ case TIMEOUT:
+ break;
+ case CLEANUP:
+ break;
+ default:
+ throw new SCMException("Unsupported container LifeCycleEvent.",
+ FAILED_TO_CHANGE_CONTAINER_STATE);
+ }
+ // If the below updateContainerState call fails, we should revert the
+ // changes made in switch case.
+ // Like releasing the lease in case of BEGIN_CREATE.
+ ContainerInfo updatedContainer = containerStateManager
+ .updateContainerState(containerInfo, event);
+ if (!updatedContainer.isContainerOpen()) {
+ pipelineSelector.removeContainerFromPipeline(
+ containerInfo.getPipelineID(), containerID);
+ }
+ containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
+ return updatedContainer.getState();
+ } catch (LeaseException e) {
+ throw new IOException("Lease Exception.", e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Update deleteTransactionId according to deleteTransactionMap.
+ *
+ * @param deleteTransactionMap Maps the containerId to latest delete
+ * transaction id for the container.
+ * @throws IOException
+ */
+ public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
+ throws IOException {
+ if (deleteTransactionMap == null) {
+ return;
+ }
+
+ lock.lock();
+ try {
+ BatchOperation batch = new BatchOperation();
+ for (Map.Entry<Long, Long> entry : deleteTransactionMap.entrySet()) {
+ long containerID = entry.getKey();
+ byte[] dbKey = Longs.toByteArray(containerID);
+ byte[] containerBytes = containerStore.get(dbKey);
+ if (containerBytes == null) {
+ throw new SCMException(
+ "Failed to increment number of deleted blocks for container "
+ + containerID + ", reason : " + "container doesn't exist.",
+ SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+ }
+ ContainerInfo containerInfo = ContainerInfo.fromProtobuf(
+ HddsProtos.SCMContainerInfo.parseFrom(containerBytes));
+ containerInfo.updateDeleteTransactionId(entry.getValue());
+ batch.put(dbKey, containerInfo.getProtobuf().toByteArray());
+ }
+ containerStore.writeBatch(batch);
+ containerStateManager
+ .updateDeleteTransactionId(deleteTransactionMap);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns the container State Manager.
+ *
+ * @return ContainerStateManager
+ */
+ @Override
+ public ContainerStateManager getStateManager() {
+ return containerStateManager;
+ }
+
+ /**
+ * Return a container matching the attributes specified.
+ *
+ * @param sizeRequired - Space needed in the Container.
+ * @param owner - Owner of the container - A specific nameservice.
+ * @param type - Replication Type {StandAlone, Ratis}
+ * @param factor - Replication Factor {ONE, THREE}
+ * @param state - State of the Container-- {Open, Allocated etc.}
+ * @return ContainerInfo, null if there is no match found.
+ */
+ public ContainerWithPipeline getMatchingContainerWithPipeline(
+ final long sizeRequired, String owner, ReplicationType type,
+ ReplicationFactor factor, LifeCycleState state) throws IOException {
+ ContainerInfo containerInfo = getStateManager()
+ .getMatchingContainer(sizeRequired, owner, type, factor, state);
+ if (containerInfo == null) {
+ return null;
+ }
+ Pipeline pipeline = pipelineSelector
+ .getPipeline(containerInfo.getPipelineID());
+ return new ContainerWithPipeline(containerInfo, pipeline);
+ }
+
+ /**
+ * Process container report from Datanode.
+ * <p>
+ * Processing follows a very simple logic for time being.
+ * <p>
+ * 1. Datanodes report the current State -- denoted by the datanodeState
+ * <p>
+ * 2. We are the older SCM state from the Database -- denoted by
+ * the knownState.
+ * <p>
+ * 3. We copy the usage etc. from currentState to newState and log that
+ * newState to the DB. This allows us SCM to bootup again and read the
+ * state of the world from the DB, and then reconcile the state from
+ * container reports, when they arrive.
+ *
+ * @param reports Container report
+ */
+ @Override
+ public void processContainerReports(DatanodeDetails datanodeDetails,
+ ContainerReportsProto reports, boolean isRegisterCall)
+ throws IOException {
+ List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
+ containerInfos = reports.getReportsList();
+ PendingDeleteStatusList pendingDeleteStatusList =
+ new PendingDeleteStatusList(datanodeDetails);
+ for (StorageContainerDatanodeProtocolProtos.ContainerInfo contInfo :
+ containerInfos) {
+ // Update replica info during registration process.
+ if (isRegisterCall) {
+ try {
+ getStateManager().addContainerReplica(ContainerID.
+ valueof(contInfo.getContainerID()), datanodeDetails);
+ } catch (Exception ex) {
+ // Continue to next one after logging the error.
+ LOG.error("Error while adding replica for containerId {}.",
+ contInfo.getContainerID(), ex);
+ }
+ }
+ byte[] dbKey = Longs.toByteArray(contInfo.getContainerID());
+ lock.lock();
+ try {
+ byte[] containerBytes = containerStore.get(dbKey);
+ if (containerBytes != null) {
+ HddsProtos.SCMContainerInfo knownState =
+ HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
+
+ if (knownState.getState() == LifeCycleState.CLOSING
+ && contInfo.getState() == LifeCycleState.CLOSED) {
+
+ updateContainerState(contInfo.getContainerID(),
+ LifeCycleEvent.CLOSE);
+
+ //reread the container
+ knownState =
+ HddsProtos.SCMContainerInfo.PARSER
+ .parseFrom(containerStore.get(dbKey));
+ }
+
+ HddsProtos.SCMContainerInfo newState =
+ reconcileState(contInfo, knownState, datanodeDetails);
+
+ if (knownState.getDeleteTransactionId() > contInfo
+ .getDeleteTransactionId()) {
+ pendingDeleteStatusList
+ .addPendingDeleteStatus(contInfo.getDeleteTransactionId(),
+ knownState.getDeleteTransactionId(),
+ knownState.getContainerID());
+ }
+
+ // FIX ME: This can be optimized, we write twice to memory, where a
+ // single write would work well.
+ //
+ // We need to write this to DB again since the closed only write
+ // the updated State.
+ containerStore.put(dbKey, newState.toByteArray());
+
+ } else {
+ // Container not found in our container db.
+ LOG.error("Error while processing container report from datanode :" +
+ " {}, for container: {}, reason: container doesn't exist in" +
+ "container database.", datanodeDetails,
+ contInfo.getContainerID());
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ if (pendingDeleteStatusList.getNumPendingDeletes() > 0) {
+ eventPublisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS,
+ pendingDeleteStatusList);
+ }
+
+ }
+
+ /**
+ * Reconciles the state from Datanode with the state in SCM.
+ *
+ * @param datanodeState - State from the Datanode.
+ * @param knownState - State inside SCM.
+ * @param dnDetails
+ * @return new SCM State for this container.
+ */
+ private HddsProtos.SCMContainerInfo reconcileState(
+ StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState,
+ SCMContainerInfo knownState, DatanodeDetails dnDetails) {
+ HddsProtos.SCMContainerInfo.Builder builder =
+ HddsProtos.SCMContainerInfo.newBuilder();
+ builder.setContainerID(knownState.getContainerID())
+ .setPipelineID(knownState.getPipelineID())
+ .setReplicationType(knownState.getReplicationType())
+ .setReplicationFactor(knownState.getReplicationFactor());
+
+ // TODO: If current state doesn't have this DN in list of DataNodes with
+ // replica then add it in list of replicas.
+
+ // If used size is greater than allocated size, we will be updating
+ // allocated size with used size. This update is done as a fallback
+ // mechanism in case SCM crashes without properly updating allocated
+ // size. Correct allocated value will be updated by
+ // ContainerStateManager during SCM shutdown.
+ long usedSize = datanodeState.getUsed();
+ long allocated = knownState.getAllocatedBytes() > usedSize ?
+ knownState.getAllocatedBytes() : usedSize;
+ builder.setAllocatedBytes(allocated)
+ .setUsedBytes(usedSize)
+ .setNumberOfKeys(datanodeState.getKeyCount())
+ .setState(knownState.getState())
+ .setStateEnterTime(knownState.getStateEnterTime())
+ .setContainerID(knownState.getContainerID())
+ .setDeleteTransactionId(knownState.getDeleteTransactionId());
+ if (knownState.getOwner() != null) {
+ builder.setOwner(knownState.getOwner());
+ }
+ return builder.build();
+ }
+
+
+ /**
+ * In Container is in closed state, if it is in closed, Deleting or Deleted
+ * State.
+ *
+ * @param info - ContainerInfo.
+ * @return true if is in open state, false otherwise
+ */
+ private boolean shouldClose(ContainerInfo info) {
+ return info.getState() == HddsProtos.LifeCycleState.OPEN;
+ }
+
+ private boolean isClosed(ContainerInfo info) {
+ return info.getState() == HddsProtos.LifeCycleState.CLOSED;
+ }
+
+ /**
+ * Closes this stream and releases any system resources associated with it.
+ * If the stream is
+ * already closed then invoking this method has no effect.
+ * <p>
+ * <p>As noted in {@link AutoCloseable#close()}, cases where the close may
+ * fail require careful
+ * attention. It is strongly advised to relinquish the underlying resources
+ * and to internally
+ * <em>mark</em> the {@code Closeable} as closed, prior to throwing the
+ * {@code IOException}.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+ if (containerLeaseManager != null) {
+ containerLeaseManager.shutdown();
+ }
+ if (containerStateManager != null) {
+ flushContainerInfo();
+ containerStateManager.close();
+ }
+ if (containerStore != null) {
+ containerStore.close();
+ }
+
+ if (pipelineSelector != null) {
+ pipelineSelector.shutdown();
+ }
+ }
+
+ /**
+ * Since allocatedBytes of a container is only in memory, stored in
+ * containerStateManager, when closing SCMContainerManager, we need to update
+ * this in the container store.
+ *
+ * @throws IOException on failure.
+ */
+ @VisibleForTesting
+ public void flushContainerInfo() throws IOException {
+ List<ContainerInfo> containers = containerStateManager.getAllContainers();
+ List<Long> failedContainers = new ArrayList<>();
+ for (ContainerInfo info : containers) {
+ // even if some container updated failed, others can still proceed
+ try {
+ byte[] dbKey = Longs.toByteArray(info.getContainerID());
+ byte[] containerBytes = containerStore.get(dbKey);
+ // TODO : looks like when a container is deleted, the container is
+ // removed from containerStore but not containerStateManager, so it can
+ // return info of a deleted container. may revisit this in the future,
+ // for now, just skip a not-found container
+ if (containerBytes != null) {
+ containerStore.put(dbKey, info.getProtobuf().toByteArray());
+ } else {
+ LOG.debug("Container state manager has container {} but not found " +
+ "in container store, a deleted container?",
+ info.getContainerID());
+ }
+ } catch (IOException ioe) {
+ failedContainers.add(info.getContainerID());
+ }
+ }
+ if (!failedContainers.isEmpty()) {
+ throw new IOException("Error in flushing container info from container " +
+ "state manager: " + failedContainers);
+ }
+ }
+
+ @VisibleForTesting
+ public MetadataStore getContainerStore() {
+ return containerStore;
+ }
+
+ public PipelineSelector getPipelineSelector() {
+ return pipelineSelector;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org