You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/09/07 18:20:38 UTC

hadoop git commit: HDDS-400. Check global replication state for containers of dead node. Contributed by Elek, Marton.

Repository: hadoop
Updated Branches:
  refs/heads/trunk ff64d3571 -> ab90248b3


HDDS-400. Check global replication state for containers of dead node. Contributed by Elek, Marton.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ab90248b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ab90248b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ab90248b

Branch: refs/heads/trunk
Commit: ab90248b30c2355cd8ae6660ea8af9758f95356a
Parents: ff64d35
Author: Hanisha Koneru <ha...@apache.org>
Authored: Fri Sep 7 11:20:25 2018 -0700
Committer: Hanisha Koneru <ha...@apache.org>
Committed: Fri Sep 7 11:20:25 2018 -0700

----------------------------------------------------------------------
 .../scm/container/ContainerReportHandler.java   |  49 +--
 .../scm/container/ContainerStateManager.java    |  38 +-
 .../hadoop/hdds/scm/node/DeadNodeHandler.java   |  12 +
 .../org/apache/hadoop/hdds/scm/TestUtils.java   |  45 ++
 .../container/TestContainerReportHandler.java   |  20 +-
 .../container/TestContainerStateManager.java    |  96 +++++
 .../hdds/scm/node/TestDeadNodeHandler.java      |  95 +++--
 .../container/TestContainerStateManager.java    | 415 -------------------
 .../TestContainerStateManagerIntegration.java   | 415 +++++++++++++++++++
 9 files changed, 694 insertions(+), 491 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
index 5ca2bcb..dcbd49c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hdds.scm.container;
 
 import java.io.IOException;
@@ -23,18 +22,16 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.replication
-    .ReplicationActivityStatus;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
 import org.apache.hadoop.hdds.scm.node.states.ReportResult;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
-    .ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
@@ -59,22 +56,21 @@ public class ContainerReportHandler implements
 
   private ReplicationActivityStatus replicationStatus;
 
-
   public ContainerReportHandler(Mapping containerMapping,
       Node2ContainerMap node2ContainerMap,
       ReplicationActivityStatus replicationActivityStatus) {
     Preconditions.checkNotNull(containerMapping);
     Preconditions.checkNotNull(node2ContainerMap);
     Preconditions.checkNotNull(replicationActivityStatus);
+    this.containerStateManager = containerMapping.getStateManager();
     this.containerMapping = containerMapping;
     this.node2ContainerMap = node2ContainerMap;
-    this.containerStateManager = containerMapping.getStateManager();
     this.replicationStatus = replicationActivityStatus;
   }
 
   @Override
   public void onMessage(ContainerReportFromDatanode containerReportFromDatanode,
-                        EventPublisher publisher) {
+      EventPublisher publisher) {
 
     DatanodeDetails datanodeOrigin =
         containerReportFromDatanode.getDatanodeDetails();
@@ -88,7 +84,8 @@ public class ContainerReportHandler implements
           .processContainerReports(datanodeOrigin, containerReport, false);
 
       Set<ContainerID> containerIds = containerReport.getReportsList().stream()
-          .map(containerProto -> containerProto.getContainerID())
+          .map(StorageContainerDatanodeProtocolProtos
+              .ContainerInfo::getContainerID)
           .map(ContainerID::new)
           .collect(Collectors.toSet());
 
@@ -102,13 +99,12 @@ public class ContainerReportHandler implements
       for (ContainerID containerID : reportResult.getMissingContainers()) {
         containerStateManager
             .removeContainerReplica(containerID, datanodeOrigin);
-        emitReplicationRequestEvent(containerID, publisher);
+        checkReplicationState(containerID, publisher);
       }
 
       for (ContainerID containerID : reportResult.getNewContainers()) {
         containerStateManager.addContainerReplica(containerID, datanodeOrigin);
-
-        emitReplicationRequestEvent(containerID, publisher);
+        checkReplicationState(containerID, publisher);
       }
 
     } catch (IOException e) {
@@ -119,8 +115,9 @@ public class ContainerReportHandler implements
 
   }
 
-  private void emitReplicationRequestEvent(ContainerID containerID,
-      EventPublisher publisher) throws SCMException {
+  private void checkReplicationState(ContainerID containerID,
+      EventPublisher publisher)
+      throws SCMException {
     ContainerInfo container = containerStateManager.getContainer(containerID);
 
     if (container == null) {
@@ -134,18 +131,18 @@ public class ContainerReportHandler implements
     if (container.isContainerOpen()) {
       return;
     }
-    if (replicationStatus.isReplicationEnabled()) {
-
-      int existingReplicas =
-          containerStateManager.getContainerReplicas(containerID).size();
-
-      int expectedReplicas = container.getReplicationFactor().getNumber();
-
-      if (existingReplicas != expectedReplicas) {
 
+    ReplicationRequest replicationState =
+        containerStateManager.checkReplicationState(containerID);
+    if (replicationState != null) {
+      if (replicationStatus.isReplicationEnabled()) {
         publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
-            new ReplicationRequest(containerID.getId(), existingReplicas,
-                container.getReplicationFactor().getNumber()));
+            replicationState);
+      } else {
+        LOG.warn(
+            "Over/under replicated container but the replication is not "
+                + "(yet) enabled: "
+                + replicationState.toString());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 421d34e..eb8f2e3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.container;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -27,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 import org.apache.hadoop.hdds.scm.container.states.ContainerState;
 import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
@@ -40,6 +42,7 @@ import org.apache.hadoop.ozone.common.statemachine
     .InvalidStateTransitionException;
 import org.apache.hadoop.ozone.common.statemachine.StateMachine;
 import org.apache.hadoop.util.Time;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -148,7 +151,7 @@ public class ContainerStateManager implements Closeable {
         finalStates);
     initializeStateMachine();
 
-    this.containerSize =(long)configuration.getStorageSize(
+    this.containerSize = (long) configuration.getStorageSize(
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
         StorageUnit.BYTES);
@@ -399,7 +402,7 @@ public class ContainerStateManager implements Closeable {
     // container ID.
     ContainerState key = new ContainerState(owner, type, factor);
     ContainerID lastID = lastUsedMap.get(key);
-    if(lastID == null) {
+    if (lastID == null) {
       lastID = matchingSet.first();
     }
 
@@ -426,7 +429,7 @@ public class ContainerStateManager implements Closeable {
       selectedContainer = findContainerWithSpace(size, resultSet, owner);
     }
     // Update the allocated Bytes on this container.
-    if(selectedContainer != null) {
+    if (selectedContainer != null) {
       selectedContainer.updateAllocatedBytes(size);
     }
     return selectedContainer;
@@ -539,9 +542,36 @@ public class ContainerStateManager implements Closeable {
       DatanodeDetails dn) throws SCMException {
     return containers.removeContainerReplica(containerID, dn);
   }
-  
+
+  /**
+   * Compare the existing replication number with the expected one.
+   */
+  public ReplicationRequest checkReplicationState(ContainerID containerID)
+      throws SCMException {
+    int existingReplicas = getContainerReplicas(containerID).size();
+    int expectedReplicas = getContainer(containerID)
+        .getReplicationFactor().getNumber();
+    if (existingReplicas != expectedReplicas) {
+      return new ReplicationRequest(containerID.getId(), existingReplicas,
+          expectedReplicas);
+    }
+    return null;
+  }
+
+  /**
+   * Checks if the container is open.
+   */
+  public boolean isOpen(ContainerID containerID) {
+    Preconditions.checkNotNull(containerID);
+    ContainerInfo container = Preconditions
+        .checkNotNull(getContainer(containerID),
+            "Container can't be found " + containerID);
+    return container.isContainerOpen();
+  }
+
   @VisibleForTesting
   public ContainerStateMap getContainerStateMap() {
     return containers;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
index c853b3b..d694a10 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
@@ -23,6 +23,8 @@ import java.util.Set;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
 import org.apache.hadoop.hdds.server.events.EventHandler;
@@ -62,6 +64,16 @@ public class DeadNodeHandler implements EventHandler<DatanodeDetails> {
       try {
         containerStateManager.removeContainerReplica(container,
             datanodeDetails);
+
+        if (!containerStateManager.isOpen(container)) {
+          ReplicationRequest replicationRequest =
+              containerStateManager.checkReplicationState(container);
+
+          if (replicationRequest != null) {
+            publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+                replicationRequest);
+          }
+        }
       } catch (SCMException e) {
         LOG.error("Can't remove container from containerStateMap {}", container
             .getId(), e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index d617680..7af9dda 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -17,6 +17,11 @@
 package org.apache.hadoop.hdds.scm;
 
 import com.google.common.base.Preconditions;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerInfo;
 import org.apache.hadoop.hdds.protocol.proto
@@ -31,12 +36,18 @@ import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.StorageTypeProto;
+import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -376,5 +387,39 @@ public final class TestUtils {
     return report.build();
   }
 
+  public static
+      org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo
+      allocateContainer(ContainerStateManager containerStateManager)
+      throws IOException {
+
+    PipelineSelector pipelineSelector = Mockito.mock(PipelineSelector.class);
+
+    Pipeline pipeline = new Pipeline("leader", HddsProtos.LifeCycleState.CLOSED,
+        HddsProtos.ReplicationType.STAND_ALONE,
+        HddsProtos.ReplicationFactor.THREE,
+        PipelineID.randomId());
 
+    when(pipelineSelector
+        .getReplicationPipeline(HddsProtos.ReplicationType.STAND_ALONE,
+            HddsProtos.ReplicationFactor.THREE)).thenReturn(pipeline);
+
+    return containerStateManager
+        .allocateContainer(pipelineSelector,
+            HddsProtos.ReplicationType.STAND_ALONE,
+            HddsProtos.ReplicationFactor.THREE, "root").getContainerInfo();
+
+  }
+
+  public static void closeContainer(ContainerStateManager containerStateManager,
+      org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo
+          container)
+      throws SCMException {
+
+    containerStateManager.getContainerStateMap()
+        .updateState(container, container.getState(), LifeCycleState.CLOSING);
+
+    containerStateManager.getContainerStateMap()
+        .updateState(container, container.getState(), LifeCycleState.CLOSED);
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index 66f0966..d74a32f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -71,9 +71,7 @@ public class TestContainerReportHandler implements EventPublisher {
 
   @Test
   public void test() throws IOException {
-
-    //given
-
+    //GIVEN
     OzoneConfiguration conf = new OzoneConfiguration();
     Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
     Mapping mapping = Mockito.mock(Mapping.class);
@@ -133,19 +131,9 @@ public class TestContainerReportHandler implements EventPublisher {
     long c3 = cont3.getContainerID();
 
     // Close remaining containers
-    try {
-      containerStateManager.getContainerStateMap()
-          .updateState(cont1, cont1.getState(), LifeCycleState.CLOSING);
-      containerStateManager.getContainerStateMap()
-          .updateState(cont1, cont1.getState(), LifeCycleState.CLOSED);
-      containerStateManager.getContainerStateMap()
-          .updateState(cont2, cont2.getState(), LifeCycleState.CLOSING);
-      containerStateManager.getContainerStateMap()
-          .updateState(cont2, cont2.getState(), LifeCycleState.CLOSED);
-
-    } catch (IOException e) {
-      LOG.info("Failed to change state of open containers.", e);
-    }
+    TestUtils.closeContainer(containerStateManager, cont1);
+    TestUtils.closeContainer(containerStateManager, cont2);
+
     //when
 
     //initial reports before replication is enabled. 2 containers w 3 replicas.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
new file mode 100644
index 0000000..fe92ee5
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Testing ContainerStatemanager.
+ */
+public class TestContainerStateManager {
+
+  private ContainerStateManager containerStateManager;
+
+  @Before
+  public void init() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    Mapping mapping = Mockito.mock(Mapping.class);
+    containerStateManager = new ContainerStateManager(conf, mapping);
+
+  }
+
+  @Test
+  public void checkReplicationStateOK() throws IOException {
+    //GIVEN
+    ContainerInfo c1 = TestUtils.allocateContainer(containerStateManager);
+
+    DatanodeDetails d1 = TestUtils.randomDatanodeDetails();
+    DatanodeDetails d2 = TestUtils.randomDatanodeDetails();
+    DatanodeDetails d3 = TestUtils.randomDatanodeDetails();
+
+    addReplica(c1, d1);
+    addReplica(c1, d2);
+    addReplica(c1, d3);
+
+    //WHEN
+    ReplicationRequest replicationRequest = containerStateManager
+        .checkReplicationState(new ContainerID(c1.getContainerID()));
+
+    //THEN
+    Assert.assertNull(replicationRequest);
+  }
+
+  @Test
+  public void checkReplicationStateMissingReplica() throws IOException {
+    //GIVEN
+
+    ContainerInfo c1 = TestUtils.allocateContainer(containerStateManager);
+
+    DatanodeDetails d1 = TestUtils.randomDatanodeDetails();
+    DatanodeDetails d2 = TestUtils.randomDatanodeDetails();
+
+    addReplica(c1, d1);
+    addReplica(c1, d2);
+
+    //WHEN
+    ReplicationRequest replicationRequest = containerStateManager
+        .checkReplicationState(new ContainerID(c1.getContainerID()));
+
+    Assert
+        .assertEquals(c1.getContainerID(), replicationRequest.getContainerId());
+    Assert.assertEquals(2, replicationRequest.getReplicationCount());
+    Assert.assertEquals(3, replicationRequest.getExpecReplicationCount());
+  }
+
+  private void addReplica(ContainerInfo c1, DatanodeDetails d1) {
+    containerStateManager
+        .addContainerReplica(new ContainerID(c1.getContainerID()), d1);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 4be10e1..0b69f5f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -18,76 +18,76 @@
 
 package org.apache.hadoop.hdds.scm.node;
 
-import java.util.HashSet;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerInfo;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
 import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import static org.mockito.Matchers.eq;
 import org.mockito.Mockito;
 
 /**
  * Test DeadNodeHandler.
  */
 public class TestDeadNodeHandler {
+
+  private List<ReplicationRequest> sentEvents = new ArrayList<>();
+
   @Test
-  public void testOnMessage() throws SCMException {
+  public void testOnMessage() throws IOException {
     //GIVEN
     DatanodeDetails datanode1 = TestUtils.randomDatanodeDetails();
     DatanodeDetails datanode2 = TestUtils.randomDatanodeDetails();
 
-    ContainerInfo container1 = TestUtils.getRandomContainerInfo(1);
-    ContainerInfo container2 = TestUtils.getRandomContainerInfo(2);
-    ContainerInfo container3 = TestUtils.getRandomContainerInfo(3);
-
     Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
     ContainerStateManager containerStateManager = new ContainerStateManager(
         new OzoneConfiguration(),
         Mockito.mock(Mapping.class)
     );
+
+    ContainerInfo container1 =
+        TestUtils.allocateContainer(containerStateManager);
+    ContainerInfo container2 =
+        TestUtils.allocateContainer(containerStateManager);
+    ContainerInfo container3 =
+        TestUtils.allocateContainer(containerStateManager);
+
     DeadNodeHandler handler =
         new DeadNodeHandler(node2ContainerMap, containerStateManager);
 
-    node2ContainerMap
-        .insertNewDatanode(datanode1.getUuid(), new HashSet<ContainerID>() {{
-            add(new ContainerID(container1.getContainerID()));
-            add(new ContainerID(container2.getContainerID()));
-          }});
+    registerReplicas(node2ContainerMap, datanode1, container1, container2);
+    registerReplicas(node2ContainerMap, datanode2, container1, container3);
 
-    node2ContainerMap
-        .insertNewDatanode(datanode2.getUuid(), new HashSet<ContainerID>() {{
-            add(new ContainerID(container1.getContainerID()));
-            add(new ContainerID(container3.getContainerID()));
-          }});
+    registerReplicas(containerStateManager, container1, datanode1, datanode2);
+    registerReplicas(containerStateManager, container2, datanode1);
+    registerReplicas(containerStateManager, container3, datanode2);
 
-    containerStateManager.getContainerStateMap()
-        .addContainerReplica(new ContainerID(container1.getContainerID()),
-            datanode1, datanode2);
+    TestUtils.closeContainer(containerStateManager, container1);
 
-    containerStateManager.getContainerStateMap()
-        .addContainerReplica(new ContainerID(container2.getContainerID()),
-            datanode1);
-
-    containerStateManager.getContainerStateMap()
-        .addContainerReplica(new ContainerID(container3.getContainerID()),
-            datanode2);
+    EventPublisher publisher = Mockito.mock(EventPublisher.class);
 
     //WHEN datanode1 is dead
-    handler.onMessage(datanode1, Mockito.mock(EventPublisher.class));
+    handler.onMessage(datanode1, publisher);
 
     //THEN
-
     //node2ContainerMap has not been changed
     Assert.assertEquals(2, node2ContainerMap.size());
 
@@ -108,5 +108,40 @@ public class TestDeadNodeHandler {
     Assert.assertEquals(1, container3Replicas.size());
     Assert.assertEquals(datanode2, container3Replicas.iterator().next());
 
+    ArgumentCaptor<ReplicationRequest> replicationRequestParameter =
+        ArgumentCaptor.forClass(ReplicationRequest.class);
+
+    Mockito.verify(publisher)
+        .fireEvent(eq(SCMEvents.REPLICATE_CONTAINER),
+            replicationRequestParameter.capture());
+
+    Assert
+        .assertEquals(container1.getContainerID(),
+            replicationRequestParameter.getValue().getContainerId());
+    Assert
+        .assertEquals(1,
+            replicationRequestParameter.getValue().getReplicationCount());
+    Assert
+        .assertEquals(3,
+            replicationRequestParameter.getValue().getExpecReplicationCount());
+  }
+
+  private void registerReplicas(ContainerStateManager containerStateManager,
+      ContainerInfo container, DatanodeDetails... datanodes) {
+    containerStateManager.getContainerStateMap()
+        .addContainerReplica(new ContainerID(container.getContainerID()),
+            datanodes);
   }
+
+  private void registerReplicas(Node2ContainerMap node2ContainerMap,
+      DatanodeDetails datanode,
+      ContainerInfo... containers)
+      throws SCMException {
+    node2ContainerMap
+        .insertNewDatanode(datanode.getUuid(),
+            Arrays.stream(containers)
+                .map(container -> new ContainerID(container.getContainerID()))
+                .collect(Collectors.toSet()));
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
deleted file mode 100644
index 9e209af..0000000
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
+++ /dev/null
@@ -1,415 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container;
-
-import com.google.common.primitives.Longs;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.commons.lang3.RandomUtils;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.test.LambdaTestUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NavigableSet;
-import java.util.Random;
-import org.slf4j.event.Level;
-
-/**
- * Tests for ContainerStateManager.
- */
-public class TestContainerStateManager {
-
-  private OzoneConfiguration conf;
-  private MiniOzoneCluster cluster;
-  private XceiverClientManager xceiverClientManager;
-  private StorageContainerManager scm;
-  private Mapping scmContainerMapping;
-  private ContainerStateManager containerStateManager;
-  private String containerOwner = "OZONE";
-
-
-  @Before
-  public void setup() throws Exception {
-    conf = new OzoneConfiguration();
-    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
-    cluster.waitForClusterToBeReady();
-    xceiverClientManager = new XceiverClientManager(conf);
-    scm = cluster.getStorageContainerManager();
-    scmContainerMapping = scm.getScmContainerManager();
-    containerStateManager = scmContainerMapping.getStateManager();
-  }
-
-  @After
-  public void cleanUp() {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  @Test
-  public void testAllocateContainer() throws IOException {
-    // Allocate a container and verify the container info
-    ContainerWithPipeline container1 = scm.getClientProtocolServer()
-        .allocateContainer(
-            xceiverClientManager.getType(),
-            xceiverClientManager.getFactor(), containerOwner);
-    ContainerInfo info = containerStateManager
-        .getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
-            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
-            HddsProtos.LifeCycleState.ALLOCATED);
-    Assert.assertEquals(container1.getContainerInfo().getContainerID(),
-        info.getContainerID());
-    Assert.assertEquals(OzoneConsts.GB * 3, info.getAllocatedBytes());
-    Assert.assertEquals(containerOwner, info.getOwner());
-    Assert.assertEquals(xceiverClientManager.getType(),
-        info.getReplicationType());
-    Assert.assertEquals(xceiverClientManager.getFactor(),
-        info.getReplicationFactor());
-    Assert.assertEquals(HddsProtos.LifeCycleState.ALLOCATED, info.getState());
-
-    // Check there are two containers in ALLOCATED state after allocation
-    ContainerWithPipeline container2 = scm.getClientProtocolServer()
-        .allocateContainer(
-            xceiverClientManager.getType(),
-            xceiverClientManager.getFactor(), containerOwner);
-    int numContainers = containerStateManager
-        .getMatchingContainerIDs(containerOwner,
-            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
-            HddsProtos.LifeCycleState.ALLOCATED).size();
-    Assert.assertNotEquals(container1.getContainerInfo().getContainerID(),
-        container2.getContainerInfo().getContainerID());
-    Assert.assertEquals(2, numContainers);
-  }
-
-  @Test
-  public void testContainerStateManagerRestart() throws IOException {
-    // Allocate 5 containers in ALLOCATED state and 5 in CREATING state
-
-    List<ContainerInfo> containers = new ArrayList<>();
-    for (int i = 0; i < 10; i++) {
-      ContainerWithPipeline container = scm.getClientProtocolServer()
-          .allocateContainer(
-              xceiverClientManager.getType(),
-              xceiverClientManager.getFactor(), containerOwner);
-      containers.add(container.getContainerInfo());
-      if (i >= 5) {
-        scm.getScmContainerManager().updateContainerState(container
-                .getContainerInfo().getContainerID(),
-            HddsProtos.LifeCycleEvent.CREATE);
-      }
-    }
-
-    // New instance of ContainerStateManager should load all the containers in
-    // container store.
-    ContainerStateManager stateManager =
-        new ContainerStateManager(conf, scmContainerMapping
-        );
-    int matchCount = stateManager
-        .getMatchingContainerIDs(containerOwner,
-            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
-            HddsProtos.LifeCycleState.ALLOCATED).size();
-    Assert.assertEquals(5, matchCount);
-    matchCount = stateManager.getMatchingContainerIDs(containerOwner,
-        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
-        HddsProtos.LifeCycleState.CREATING).size();
-    Assert.assertEquals(5, matchCount);
-  }
-
-  @Test
-  public void testGetMatchingContainer() throws IOException {
-    ContainerWithPipeline container1 = scm.getClientProtocolServer().
-        allocateContainer(xceiverClientManager.getType(),
-            xceiverClientManager.getFactor(), containerOwner);
-    scmContainerMapping
-        .updateContainerState(container1.getContainerInfo().getContainerID(),
-            HddsProtos.LifeCycleEvent.CREATE);
-    scmContainerMapping
-        .updateContainerState(container1.getContainerInfo().getContainerID(),
-            HddsProtos.LifeCycleEvent.CREATED);
-
-    ContainerWithPipeline container2 = scm.getClientProtocolServer().
-        allocateContainer(xceiverClientManager.getType(),
-            xceiverClientManager.getFactor(), containerOwner);
-
-    ContainerInfo info = containerStateManager
-        .getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
-            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
-            HddsProtos.LifeCycleState.OPEN);
-    Assert.assertEquals(container1.getContainerInfo().getContainerID(),
-        info.getContainerID());
-
-    info = containerStateManager
-        .getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
-            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
-            HddsProtos.LifeCycleState.ALLOCATED);
-    Assert.assertEquals(container2.getContainerInfo().getContainerID(),
-        info.getContainerID());
-
-    scmContainerMapping
-        .updateContainerState(container2.getContainerInfo().getContainerID(),
-            HddsProtos.LifeCycleEvent.CREATE);
-    scmContainerMapping
-        .updateContainerState(container2.getContainerInfo().getContainerID(),
-            HddsProtos.LifeCycleEvent.CREATED);
-
-    // space has already been allocated in container1, now container 2 should
-    // be chosen.
-    info = containerStateManager
-        .getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
-            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
-            HddsProtos.LifeCycleState.OPEN);
-    Assert.assertEquals(container2.getContainerInfo().getContainerID(),
-        info.getContainerID());
-  }
-
-  @Test
-  public void testUpdateContainerState() throws IOException {
-    NavigableSet<ContainerID> containerList = containerStateManager
-        .getMatchingContainerIDs(containerOwner,
-            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
-            HddsProtos.LifeCycleState.ALLOCATED);
-    int containers = containerList == null ? 0 : containerList.size();
-    Assert.assertEquals(0, containers);
-
-    // Allocate container1 and update its state from ALLOCATED -> CREATING ->
-    // OPEN -> CLOSING -> CLOSED -> DELETING -> DELETED
-    ContainerWithPipeline container1 = scm.getClientProtocolServer()
-        .allocateContainer(
-            xceiverClientManager.getType(),
-            xceiverClientManager.getFactor(), containerOwner);
-    containers = containerStateManager.getMatchingContainerIDs(containerOwner,
-        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
-        HddsProtos.LifeCycleState.ALLOCATED).size();
-    Assert.assertEquals(1, containers);
-
-    scmContainerMapping
-        .updateContainerState(container1.getContainerInfo().getContainerID(),
-            HddsProtos.LifeCycleEvent.CREATE);
-    containers = containerStateManager.getMatchingContainerIDs(containerOwner,
-        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
-        HddsProtos.LifeCycleState.CREATING).size();
-    Assert.assertEquals(1, containers);
-
-    scmContainerMapping
-        .updateContainerState(container1.getContainerInfo().getContainerID(),
-            HddsProtos.LifeCycleEvent.CREATED);
-    containers = containerStateManager.getMatchingContainerIDs(containerOwner,
-        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
-        HddsProtos.LifeCycleState.OPEN).size();
-    Assert.assertEquals(1, containers);
-
-    scmContainerMapping
-        .updateContainerState(container1.getContainerInfo().getContainerID(),
-            HddsProtos.LifeCycleEvent.FINALIZE);
-    containers = containerStateManager.getMatchingContainerIDs(containerOwner,
-        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
-        HddsProtos.LifeCycleState.CLOSING).size();
-    Assert.assertEquals(1, containers);
-
-    scmContainerMapping
-        .updateContainerState(container1.getContainerInfo().getContainerID(),
-            HddsProtos.LifeCycleEvent.CLOSE);
-    containers = containerStateManager.getMatchingContainerIDs(containerOwner,
-        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
-        HddsProtos.LifeCycleState.CLOSED).size();
-    Assert.assertEquals(1, containers);
-
-    scmContainerMapping
-        .updateContainerState(container1.getContainerInfo().getContainerID(),
-            HddsProtos.LifeCycleEvent.DELETE);
-    containers = containerStateManager.getMatchingContainerIDs(containerOwner,
-        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
-        HddsProtos.LifeCycleState.DELETING).size();
-    Assert.assertEquals(1, containers);
-
-    scmContainerMapping
-        .updateContainerState(container1.getContainerInfo().getContainerID(),
-            HddsProtos.LifeCycleEvent.CLEANUP);
-    containers = containerStateManager.getMatchingContainerIDs(containerOwner,
-        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
-        HddsProtos.LifeCycleState.DELETED).size();
-    Assert.assertEquals(1, containers);
-
-    // Allocate container1 and update its state from ALLOCATED -> CREATING ->
-    // DELETING
-    ContainerWithPipeline container2 = scm.getClientProtocolServer()
-        .allocateContainer(
-            xceiverClientManager.getType(),
-            xceiverClientManager.getFactor(), containerOwner);
-    scmContainerMapping
-        .updateContainerState(container2.getContainerInfo().getContainerID(),
-            HddsProtos.LifeCycleEvent.CREATE);
-    scmContainerMapping
-        .updateContainerState(container2.getContainerInfo().getContainerID(),
-            HddsProtos.LifeCycleEvent.TIMEOUT);
-    containers = containerStateManager.getMatchingContainerIDs(containerOwner,
-        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
-        HddsProtos.LifeCycleState.DELETING).size();
-    Assert.assertEquals(1, containers);
-
-    // Allocate container1 and update its state from ALLOCATED -> CREATING ->
-    // OPEN -> CLOSING -> CLOSED
-    ContainerWithPipeline container3 = scm.getClientProtocolServer()
-        .allocateContainer(
-            xceiverClientManager.getType(),
-            xceiverClientManager.getFactor(), containerOwner);
-    scmContainerMapping
-        .updateContainerState(container3.getContainerInfo().getContainerID(),
-            HddsProtos.LifeCycleEvent.CREATE);
-    scmContainerMapping
-        .updateContainerState(container3.getContainerInfo().getContainerID(),
-            HddsProtos.LifeCycleEvent.CREATED);
-    scmContainerMapping
-        .updateContainerState(container3.getContainerInfo().getContainerID(),
-            HddsProtos.LifeCycleEvent.FINALIZE);
-    scmContainerMapping
-        .updateContainerState(container3.getContainerInfo().getContainerID(),
-            HddsProtos.LifeCycleEvent.CLOSE);
-    containers = containerStateManager.getMatchingContainerIDs(containerOwner,
-        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
-        HddsProtos.LifeCycleState.CLOSED).size();
-    Assert.assertEquals(1, containers);
-  }
-
-  @Test
-  public void testUpdatingAllocatedBytes() throws Exception {
-    ContainerWithPipeline container1 = scm.getClientProtocolServer()
-        .allocateContainer(xceiverClientManager.getType(),
-        xceiverClientManager.getFactor(), containerOwner);
-    scmContainerMapping.updateContainerState(container1
-            .getContainerInfo().getContainerID(),
-        HddsProtos.LifeCycleEvent.CREATE);
-    scmContainerMapping.updateContainerState(container1
-            .getContainerInfo().getContainerID(),
-        HddsProtos.LifeCycleEvent.CREATED);
-
-    Random ran = new Random();
-    long allocatedSize = 0;
-    for (int i = 0; i<5; i++) {
-      long size = Math.abs(ran.nextLong() % OzoneConsts.GB);
-      allocatedSize += size;
-      // trigger allocating bytes by calling getMatchingContainer
-      ContainerInfo info = containerStateManager
-          .getMatchingContainer(size, containerOwner,
-              xceiverClientManager.getType(), xceiverClientManager.getFactor(),
-              HddsProtos.LifeCycleState.OPEN);
-      Assert.assertEquals(container1.getContainerInfo().getContainerID(),
-          info.getContainerID());
-
-      ContainerMapping containerMapping =
-          (ContainerMapping) scmContainerMapping;
-      // manually trigger a flush, this will persist the allocated bytes value
-      // to disk
-      containerMapping.flushContainerInfo();
-
-      // the persisted value should always be equal to allocated size.
-      byte[] containerBytes = containerMapping.getContainerStore().get(
-          Longs.toByteArray(container1.getContainerInfo().getContainerID()));
-      HddsProtos.SCMContainerInfo infoProto =
-          HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
-      ContainerInfo currentInfo = ContainerInfo.fromProtobuf(infoProto);
-      Assert.assertEquals(allocatedSize, currentInfo.getAllocatedBytes());
-    }
-  }
-
-  @Test
-  public void testReplicaMap() throws Exception {
-    GenericTestUtils.setLogLevel(ContainerStateMap.getLOG(), Level.DEBUG);
-    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
-        .captureLogs(ContainerStateMap.getLOG());
-    DatanodeDetails dn1 = DatanodeDetails.newBuilder().setHostName("host1")
-        .setIpAddress("1.1.1.1")
-        .setUuid(UUID.randomUUID().toString()).build();
-    DatanodeDetails dn2 = DatanodeDetails.newBuilder().setHostName("host2")
-        .setIpAddress("2.2.2.2")
-        .setUuid(UUID.randomUUID().toString()).build();
-
-    // Test 1: no replica's exist
-    ContainerID containerID = ContainerID.valueof(RandomUtils.nextLong());
-    Set<DatanodeDetails> replicaSet;
-    LambdaTestUtils.intercept(SCMException.class, "", () -> {
-      containerStateManager.getContainerReplicas(containerID);
-    });
-
-    // Test 2: Add replica nodes and then test
-    containerStateManager.addContainerReplica(containerID, dn1);
-    containerStateManager.addContainerReplica(containerID, dn2);
-    replicaSet = containerStateManager.getContainerReplicas(containerID);
-    Assert.assertEquals(2, replicaSet.size());
-    Assert.assertTrue(replicaSet.contains(dn1));
-    Assert.assertTrue(replicaSet.contains(dn2));
-
-    // Test 3: Remove one replica node and then test
-    containerStateManager.removeContainerReplica(containerID, dn1);
-    replicaSet = containerStateManager.getContainerReplicas(containerID);
-    Assert.assertEquals(1, replicaSet.size());
-    Assert.assertFalse(replicaSet.contains(dn1));
-    Assert.assertTrue(replicaSet.contains(dn2));
-
-    // Test 3: Remove second replica node and then test
-    containerStateManager.removeContainerReplica(containerID, dn2);
-    replicaSet = containerStateManager.getContainerReplicas(containerID);
-    Assert.assertEquals(0, replicaSet.size());
-    Assert.assertFalse(replicaSet.contains(dn1));
-    Assert.assertFalse(replicaSet.contains(dn2));
-
-    // Test 4: Re-insert dn1
-    containerStateManager.addContainerReplica(containerID, dn1);
-    replicaSet = containerStateManager.getContainerReplicas(containerID);
-    Assert.assertEquals(1, replicaSet.size());
-    Assert.assertTrue(replicaSet.contains(dn1));
-    Assert.assertFalse(replicaSet.contains(dn2));
-
-    // Re-insert dn2
-    containerStateManager.addContainerReplica(containerID, dn2);
-    replicaSet = containerStateManager.getContainerReplicas(containerID);
-    Assert.assertEquals(2, replicaSet.size());
-    Assert.assertTrue(replicaSet.contains(dn1));
-    Assert.assertTrue(replicaSet.contains(dn2));
-
-    Assert.assertFalse(logCapturer.getOutput().contains(
-        "ReplicaMap already contains entry for container Id: " + containerID
-            .toString() + ",DataNode: " + dn1.toString()));
-    // Re-insert dn1
-    containerStateManager.addContainerReplica(containerID, dn1);
-    replicaSet = containerStateManager.getContainerReplicas(containerID);
-    Assert.assertEquals(2, replicaSet.size());
-    Assert.assertTrue(replicaSet.contains(dn1));
-    Assert.assertTrue(replicaSet.contains(dn2));
-    Assert.assertTrue(logCapturer.getOutput().contains(
-        "ReplicaMap already contains entry for container Id: " + containerID
-            .toString() + ",DataNode: " + dn1.toString()));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
new file mode 100644
index 0000000..c6e819b
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.primitives.Longs;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Random;
+import org.slf4j.event.Level;
+
+/**
+ * Tests for ContainerStateManager.
+ */
+public class TestContainerStateManagerIntegration {
+
+  private OzoneConfiguration conf;
+  private MiniOzoneCluster cluster;
+  private XceiverClientManager xceiverClientManager;
+  private StorageContainerManager scm;
+  private Mapping scmContainerMapping;
+  private ContainerStateManager containerStateManager;
+  private String containerOwner = "OZONE";
+
+
+  @Before
+  public void setup() throws Exception {
+    conf = new OzoneConfiguration();
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
+    cluster.waitForClusterToBeReady();
+    xceiverClientManager = new XceiverClientManager(conf);
+    scm = cluster.getStorageContainerManager();
+    scmContainerMapping = scm.getScmContainerManager();
+    containerStateManager = scmContainerMapping.getStateManager();
+  }
+
+  @After
+  public void cleanUp() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testAllocateContainer() throws IOException {
+    // Allocate a container and verify the container info
+    ContainerWithPipeline container1 = scm.getClientProtocolServer()
+        .allocateContainer(
+            xceiverClientManager.getType(),
+            xceiverClientManager.getFactor(), containerOwner);
+    ContainerInfo info = containerStateManager
+        .getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
+            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+            HddsProtos.LifeCycleState.ALLOCATED);
+    Assert.assertEquals(container1.getContainerInfo().getContainerID(),
+        info.getContainerID());
+    Assert.assertEquals(OzoneConsts.GB * 3, info.getAllocatedBytes());
+    Assert.assertEquals(containerOwner, info.getOwner());
+    Assert.assertEquals(xceiverClientManager.getType(),
+        info.getReplicationType());
+    Assert.assertEquals(xceiverClientManager.getFactor(),
+        info.getReplicationFactor());
+    Assert.assertEquals(HddsProtos.LifeCycleState.ALLOCATED, info.getState());
+
+    // Check there are two containers in ALLOCATED state after allocation
+    ContainerWithPipeline container2 = scm.getClientProtocolServer()
+        .allocateContainer(
+            xceiverClientManager.getType(),
+            xceiverClientManager.getFactor(), containerOwner);
+    int numContainers = containerStateManager
+        .getMatchingContainerIDs(containerOwner,
+            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+            HddsProtos.LifeCycleState.ALLOCATED).size();
+    Assert.assertNotEquals(container1.getContainerInfo().getContainerID(),
+        container2.getContainerInfo().getContainerID());
+    Assert.assertEquals(2, numContainers);
+  }
+
+  @Test
+  public void testContainerStateManagerRestart() throws IOException {
+    // Allocate 5 containers in ALLOCATED state and 5 in CREATING state
+
+    List<ContainerInfo> containers = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      ContainerWithPipeline container = scm.getClientProtocolServer()
+          .allocateContainer(
+              xceiverClientManager.getType(),
+              xceiverClientManager.getFactor(), containerOwner);
+      containers.add(container.getContainerInfo());
+      if (i >= 5) {
+        scm.getScmContainerManager().updateContainerState(container
+                .getContainerInfo().getContainerID(),
+            HddsProtos.LifeCycleEvent.CREATE);
+      }
+    }
+
+    // New instance of ContainerStateManager should load all the containers in
+    // container store.
+    ContainerStateManager stateManager =
+        new ContainerStateManager(conf, scmContainerMapping
+        );
+    int matchCount = stateManager
+        .getMatchingContainerIDs(containerOwner,
+            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+            HddsProtos.LifeCycleState.ALLOCATED).size();
+    Assert.assertEquals(5, matchCount);
+    matchCount = stateManager.getMatchingContainerIDs(containerOwner,
+        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+        HddsProtos.LifeCycleState.CREATING).size();
+    Assert.assertEquals(5, matchCount);
+  }
+
+  @Test
+  public void testGetMatchingContainer() throws IOException {
+    ContainerWithPipeline container1 = scm.getClientProtocolServer().
+        allocateContainer(xceiverClientManager.getType(),
+            xceiverClientManager.getFactor(), containerOwner);
+    scmContainerMapping
+        .updateContainerState(container1.getContainerInfo().getContainerID(),
+            HddsProtos.LifeCycleEvent.CREATE);
+    scmContainerMapping
+        .updateContainerState(container1.getContainerInfo().getContainerID(),
+            HddsProtos.LifeCycleEvent.CREATED);
+
+    ContainerWithPipeline container2 = scm.getClientProtocolServer().
+        allocateContainer(xceiverClientManager.getType(),
+            xceiverClientManager.getFactor(), containerOwner);
+
+    ContainerInfo info = containerStateManager
+        .getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
+            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+            HddsProtos.LifeCycleState.OPEN);
+    Assert.assertEquals(container1.getContainerInfo().getContainerID(),
+        info.getContainerID());
+
+    info = containerStateManager
+        .getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
+            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+            HddsProtos.LifeCycleState.ALLOCATED);
+    Assert.assertEquals(container2.getContainerInfo().getContainerID(),
+        info.getContainerID());
+
+    scmContainerMapping
+        .updateContainerState(container2.getContainerInfo().getContainerID(),
+            HddsProtos.LifeCycleEvent.CREATE);
+    scmContainerMapping
+        .updateContainerState(container2.getContainerInfo().getContainerID(),
+            HddsProtos.LifeCycleEvent.CREATED);
+
+    // space has already been allocated in container1, now container 2 should
+    // be chosen.
+    info = containerStateManager
+        .getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
+            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+            HddsProtos.LifeCycleState.OPEN);
+    Assert.assertEquals(container2.getContainerInfo().getContainerID(),
+        info.getContainerID());
+  }
+
+  @Test
+  public void testUpdateContainerState() throws IOException {
+    NavigableSet<ContainerID> containerList = containerStateManager
+        .getMatchingContainerIDs(containerOwner,
+            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+            HddsProtos.LifeCycleState.ALLOCATED);
+    int containers = containerList == null ? 0 : containerList.size();
+    Assert.assertEquals(0, containers);
+
+    // Allocate container1 and update its state from ALLOCATED -> CREATING ->
+    // OPEN -> CLOSING -> CLOSED -> DELETING -> DELETED
+    ContainerWithPipeline container1 = scm.getClientProtocolServer()
+        .allocateContainer(
+            xceiverClientManager.getType(),
+            xceiverClientManager.getFactor(), containerOwner);
+    containers = containerStateManager.getMatchingContainerIDs(containerOwner,
+        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+        HddsProtos.LifeCycleState.ALLOCATED).size();
+    Assert.assertEquals(1, containers);
+
+    scmContainerMapping
+        .updateContainerState(container1.getContainerInfo().getContainerID(),
+            HddsProtos.LifeCycleEvent.CREATE);
+    containers = containerStateManager.getMatchingContainerIDs(containerOwner,
+        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+        HddsProtos.LifeCycleState.CREATING).size();
+    Assert.assertEquals(1, containers);
+
+    scmContainerMapping
+        .updateContainerState(container1.getContainerInfo().getContainerID(),
+            HddsProtos.LifeCycleEvent.CREATED);
+    containers = containerStateManager.getMatchingContainerIDs(containerOwner,
+        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+        HddsProtos.LifeCycleState.OPEN).size();
+    Assert.assertEquals(1, containers);
+
+    scmContainerMapping
+        .updateContainerState(container1.getContainerInfo().getContainerID(),
+            HddsProtos.LifeCycleEvent.FINALIZE);
+    containers = containerStateManager.getMatchingContainerIDs(containerOwner,
+        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+        HddsProtos.LifeCycleState.CLOSING).size();
+    Assert.assertEquals(1, containers);
+
+    scmContainerMapping
+        .updateContainerState(container1.getContainerInfo().getContainerID(),
+            HddsProtos.LifeCycleEvent.CLOSE);
+    containers = containerStateManager.getMatchingContainerIDs(containerOwner,
+        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+        HddsProtos.LifeCycleState.CLOSED).size();
+    Assert.assertEquals(1, containers);
+
+    scmContainerMapping
+        .updateContainerState(container1.getContainerInfo().getContainerID(),
+            HddsProtos.LifeCycleEvent.DELETE);
+    containers = containerStateManager.getMatchingContainerIDs(containerOwner,
+        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+        HddsProtos.LifeCycleState.DELETING).size();
+    Assert.assertEquals(1, containers);
+
+    scmContainerMapping
+        .updateContainerState(container1.getContainerInfo().getContainerID(),
+            HddsProtos.LifeCycleEvent.CLEANUP);
+    containers = containerStateManager.getMatchingContainerIDs(containerOwner,
+        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+        HddsProtos.LifeCycleState.DELETED).size();
+    Assert.assertEquals(1, containers);
+
+    // Allocate container1 and update its state from ALLOCATED -> CREATING ->
+    // DELETING
+    ContainerWithPipeline container2 = scm.getClientProtocolServer()
+        .allocateContainer(
+            xceiverClientManager.getType(),
+            xceiverClientManager.getFactor(), containerOwner);
+    scmContainerMapping
+        .updateContainerState(container2.getContainerInfo().getContainerID(),
+            HddsProtos.LifeCycleEvent.CREATE);
+    scmContainerMapping
+        .updateContainerState(container2.getContainerInfo().getContainerID(),
+            HddsProtos.LifeCycleEvent.TIMEOUT);
+    containers = containerStateManager.getMatchingContainerIDs(containerOwner,
+        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+        HddsProtos.LifeCycleState.DELETING).size();
+    Assert.assertEquals(1, containers);
+
+    // Allocate container1 and update its state from ALLOCATED -> CREATING ->
+    // OPEN -> CLOSING -> CLOSED
+    ContainerWithPipeline container3 = scm.getClientProtocolServer()
+        .allocateContainer(
+            xceiverClientManager.getType(),
+            xceiverClientManager.getFactor(), containerOwner);
+    scmContainerMapping
+        .updateContainerState(container3.getContainerInfo().getContainerID(),
+            HddsProtos.LifeCycleEvent.CREATE);
+    scmContainerMapping
+        .updateContainerState(container3.getContainerInfo().getContainerID(),
+            HddsProtos.LifeCycleEvent.CREATED);
+    scmContainerMapping
+        .updateContainerState(container3.getContainerInfo().getContainerID(),
+            HddsProtos.LifeCycleEvent.FINALIZE);
+    scmContainerMapping
+        .updateContainerState(container3.getContainerInfo().getContainerID(),
+            HddsProtos.LifeCycleEvent.CLOSE);
+    containers = containerStateManager.getMatchingContainerIDs(containerOwner,
+        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+        HddsProtos.LifeCycleState.CLOSED).size();
+    Assert.assertEquals(1, containers);
+  }
+
+  @Test
+  public void testUpdatingAllocatedBytes() throws Exception {
+    ContainerWithPipeline container1 = scm.getClientProtocolServer()
+        .allocateContainer(xceiverClientManager.getType(),
+        xceiverClientManager.getFactor(), containerOwner);
+    scmContainerMapping.updateContainerState(container1
+            .getContainerInfo().getContainerID(),
+        HddsProtos.LifeCycleEvent.CREATE);
+    scmContainerMapping.updateContainerState(container1
+            .getContainerInfo().getContainerID(),
+        HddsProtos.LifeCycleEvent.CREATED);
+
+    Random ran = new Random();
+    long allocatedSize = 0;
+    for (int i = 0; i<5; i++) {
+      long size = Math.abs(ran.nextLong() % OzoneConsts.GB);
+      allocatedSize += size;
+      // trigger allocating bytes by calling getMatchingContainer
+      ContainerInfo info = containerStateManager
+          .getMatchingContainer(size, containerOwner,
+              xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+              HddsProtos.LifeCycleState.OPEN);
+      Assert.assertEquals(container1.getContainerInfo().getContainerID(),
+          info.getContainerID());
+
+      ContainerMapping containerMapping =
+          (ContainerMapping) scmContainerMapping;
+      // manually trigger a flush, this will persist the allocated bytes value
+      // to disk
+      containerMapping.flushContainerInfo();
+
+      // the persisted value should always be equal to allocated size.
+      byte[] containerBytes = containerMapping.getContainerStore().get(
+          Longs.toByteArray(container1.getContainerInfo().getContainerID()));
+      HddsProtos.SCMContainerInfo infoProto =
+          HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
+      ContainerInfo currentInfo = ContainerInfo.fromProtobuf(infoProto);
+      Assert.assertEquals(allocatedSize, currentInfo.getAllocatedBytes());
+    }
+  }
+
+  @Test
+  public void testReplicaMap() throws Exception {
+    GenericTestUtils.setLogLevel(ContainerStateMap.getLOG(), Level.DEBUG);
+    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+        .captureLogs(ContainerStateMap.getLOG());
+    DatanodeDetails dn1 = DatanodeDetails.newBuilder().setHostName("host1")
+        .setIpAddress("1.1.1.1")
+        .setUuid(UUID.randomUUID().toString()).build();
+    DatanodeDetails dn2 = DatanodeDetails.newBuilder().setHostName("host2")
+        .setIpAddress("2.2.2.2")
+        .setUuid(UUID.randomUUID().toString()).build();
+
+    // Test 1: no replica's exist
+    ContainerID containerID = ContainerID.valueof(RandomUtils.nextLong());
+    Set<DatanodeDetails> replicaSet;
+    LambdaTestUtils.intercept(SCMException.class, "", () -> {
+      containerStateManager.getContainerReplicas(containerID);
+    });
+
+    // Test 2: Add replica nodes and then test
+    containerStateManager.addContainerReplica(containerID, dn1);
+    containerStateManager.addContainerReplica(containerID, dn2);
+    replicaSet = containerStateManager.getContainerReplicas(containerID);
+    Assert.assertEquals(2, replicaSet.size());
+    Assert.assertTrue(replicaSet.contains(dn1));
+    Assert.assertTrue(replicaSet.contains(dn2));
+
+    // Test 3: Remove one replica node and then test
+    containerStateManager.removeContainerReplica(containerID, dn1);
+    replicaSet = containerStateManager.getContainerReplicas(containerID);
+    Assert.assertEquals(1, replicaSet.size());
+    Assert.assertFalse(replicaSet.contains(dn1));
+    Assert.assertTrue(replicaSet.contains(dn2));
+
+    // Test 3: Remove second replica node and then test
+    containerStateManager.removeContainerReplica(containerID, dn2);
+    replicaSet = containerStateManager.getContainerReplicas(containerID);
+    Assert.assertEquals(0, replicaSet.size());
+    Assert.assertFalse(replicaSet.contains(dn1));
+    Assert.assertFalse(replicaSet.contains(dn2));
+
+    // Test 4: Re-insert dn1
+    containerStateManager.addContainerReplica(containerID, dn1);
+    replicaSet = containerStateManager.getContainerReplicas(containerID);
+    Assert.assertEquals(1, replicaSet.size());
+    Assert.assertTrue(replicaSet.contains(dn1));
+    Assert.assertFalse(replicaSet.contains(dn2));
+
+    // Re-insert dn2
+    containerStateManager.addContainerReplica(containerID, dn2);
+    replicaSet = containerStateManager.getContainerReplicas(containerID);
+    Assert.assertEquals(2, replicaSet.size());
+    Assert.assertTrue(replicaSet.contains(dn1));
+    Assert.assertTrue(replicaSet.contains(dn2));
+
+    Assert.assertFalse(logCapturer.getOutput().contains(
+        "ReplicaMap already contains entry for container Id: " + containerID
+            .toString() + ",DataNode: " + dn1.toString()));
+    // Re-insert dn1
+    containerStateManager.addContainerReplica(containerID, dn1);
+    replicaSet = containerStateManager.getContainerReplicas(containerID);
+    Assert.assertEquals(2, replicaSet.size());
+    Assert.assertTrue(replicaSet.contains(dn1));
+    Assert.assertTrue(replicaSet.contains(dn2));
+    Assert.assertTrue(logCapturer.getOutput().contains(
+        "ReplicaMap already contains entry for container Id: " + containerID
+            .toString() + ",DataNode: " + dn1.toString()));
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org