You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by br...@apache.org on 2018/10/30 06:03:16 UTC

[46/50] [abbrv] hadoop git commit: HDDS-728. Datanodes should use different ContainerStateMachine for each pipeline. Contributed by Mukul Kumar Singh.

HDDS-728. Datanodes should use different ContainerStateMachine for each pipeline.
Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/119cfe1d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/119cfe1d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/119cfe1d

Branch: refs/heads/HDFS-13891
Commit: 119cfe1d18d6f9b04670b7793cefe2c796c1943a
Parents: 7ba7898
Author: Nanda kumar <na...@apache.org>
Authored: Mon Oct 29 19:53:52 2018 +0530
Committer: Brahma Reddy Battula <br...@apache.org>
Committed: Tue Oct 30 11:31:17 2018 +0530

----------------------------------------------------------------------
 .../statemachine/DatanodeStateMachine.java      |   3 +
 .../transport/server/ratis/CSMMetrics.java      |   5 +-
 .../server/ratis/ContainerStateMachine.java     |  21 ++-
 .../server/ratis/XceiverServerRatis.java        |  26 ++--
 hadoop-hdds/pom.xml                             |   2 +-
 .../hdds/scm/container/SCMContainerManager.java |   8 +-
 .../hdds/scm/pipeline/TestNodeFailure.java      |   2 +-
 .../apache/hadoop/ozone/MiniOzoneCluster.java   |   8 +-
 .../hadoop/ozone/MiniOzoneClusterImpl.java      |  20 +--
 .../hadoop/ozone/client/rpc/TestBCSID.java      |   2 +-
 .../commandhandler/TestBlockDeletion.java       |   4 +-
 .../hadoop/ozone/web/client/TestKeys.java       |   2 +-
 hadoop-ozone/pom.xml                            |   2 +-
 .../freon/TestFreonWithDatanodeFastRestart.java | 130 +++++++++++++++++++
 .../freon/TestFreonWithDatanodeRestart.java     |  53 +++-----
 15 files changed, 208 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 85fa304..4768cf8 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
@@ -120,6 +122,7 @@ public class DatanodeStateMachine implements Closeable {
         .addPublisherFor(NodeReportProto.class)
         .addPublisherFor(ContainerReportsProto.class)
         .addPublisherFor(CommandStatusReportsProto.class)
+        .addPublisherFor(PipelineReportsProto.class)
         .build();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
index b6aed60..9ccf88a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.ratis.protocol.RaftGroupId;
 
 /**
  * This class is for maintaining Container State Machine statistics.
@@ -47,9 +48,9 @@ public class CSMMetrics {
   public CSMMetrics() {
   }
 
-  public static CSMMetrics create() {
+  public static CSMMetrics create(RaftGroupId gid) {
     MetricsSystem ms = DefaultMetricsSystem.instance();
-    return ms.register(SOURCE_NAME,
+    return ms.register(SOURCE_NAME + gid.toString(),
         "Container State Machine",
         new CSMMetrics());
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index bcbf93f..ac0833b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -66,7 +66,6 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
@@ -112,6 +111,7 @@ public class ContainerStateMachine extends BaseStateMachine {
       LoggerFactory.getLogger(ContainerStateMachine.class);
   private final SimpleStateMachineStorage storage =
       new SimpleStateMachineStorage();
+  private final RaftGroupId gid;
   private final ContainerDispatcher dispatcher;
   private ThreadPoolExecutor chunkExecutor;
   private final XceiverServerRatis ratisServer;
@@ -127,21 +127,19 @@ public class ContainerStateMachine extends BaseStateMachine {
    */
   private final CSMMetrics metrics;
 
-  public ContainerStateMachine(ContainerDispatcher dispatcher,
+  public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
       ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer,
-      int  numOfExecutors) {
+      List<ExecutorService> executors) {
+    this.gid = gid;
     this.dispatcher = dispatcher;
     this.chunkExecutor = chunkExecutor;
     this.ratisServer = ratisServer;
+    metrics = CSMMetrics.create(gid);
+    this.numExecutors = executors.size();
+    this.executors = executors.toArray(new ExecutorService[numExecutors]);
     this.writeChunkFutureMap = new ConcurrentHashMap<>();
-    metrics = CSMMetrics.create();
     this.createContainerFutureMap = new ConcurrentHashMap<>();
-    this.numExecutors = numOfExecutors;
-    executors = new ExecutorService[numExecutors];
     containerCommandCompletionMap = new ConcurrentHashMap<>();
-    for (int i = 0; i < numExecutors; i++) {
-      executors[i] = Executors.newSingleThreadExecutor();
-    }
   }
 
   @Override
@@ -207,7 +205,7 @@ public class ContainerStateMachine extends BaseStateMachine {
       throws IOException {
     final ContainerCommandRequestProto proto =
         getRequestProto(request.getMessage().getContent());
-
+    Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
     final StateMachineLogEntryProto log;
     if (proto.getCmdType() == Type.WriteChunk) {
       final WriteChunkRequestProto write = proto.getWriteChunk();
@@ -557,8 +555,5 @@ public class ContainerStateMachine extends BaseStateMachine {
 
   @Override
   public void close() throws IOException {
-    for (int i = 0; i < numExecutors; i++) {
-      executors[i].shutdown();
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index b5092d9..599f821 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -76,6 +76,8 @@ import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -94,11 +96,12 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   private final int port;
   private final RaftServer server;
   private ThreadPoolExecutor chunkExecutor;
+  private final List<ExecutorService> executors;
+  private final ContainerDispatcher dispatcher;
   private ClientId clientId = ClientId.randomId();
   private final StateContext context;
   private final ReplicationLevel replicationLevel;
   private long nodeFailureTimeoutMs;
-  private ContainerStateMachine stateMachine;
 
   private XceiverServerRatis(DatanodeDetails dd, int port,
       ContainerDispatcher dispatcher, Configuration conf, StateContext context)
@@ -121,18 +124,22 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     this.replicationLevel =
         conf.getEnum(OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY,
             OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT);
-    stateMachine = new ContainerStateMachine(dispatcher, chunkExecutor, this,
-        numContainerOpExecutors);
+    this.executors = new ArrayList<>();
+    this.dispatcher = dispatcher;
+    for (int i = 0; i < numContainerOpExecutors; i++) {
+      executors.add(Executors.newSingleThreadExecutor());
+    }
+
     this.server = RaftServer.newBuilder()
         .setServerId(RatisHelper.toRaftPeerId(dd))
         .setProperties(serverProperties)
-        .setStateMachine(stateMachine)
+        .setStateMachineRegistry(this::getStateMachine)
         .build();
   }
 
-  @VisibleForTesting
-  public ContainerStateMachine getStateMachine() {
-    return stateMachine;
+  private ContainerStateMachine getStateMachine(RaftGroupId gid) {
+    return new ContainerStateMachine(gid, dispatcher, chunkExecutor,
+            this, Collections.unmodifiableList(executors));
   }
 
   private RaftProperties newRaftProperties(Configuration conf) {
@@ -310,8 +317,11 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   @Override
   public void stop() {
     try {
-      chunkExecutor.shutdown();
+      // shutdown server before the executors as while shutting down,
+      // some of the tasks would be executed using the executors.
       server.close();
+      chunkExecutor.shutdown();
+      executors.forEach(ExecutorService::shutdown);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-hdds/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/pom.xml b/hadoop-hdds/pom.xml
index bedf78d..f960e90 100644
--- a/hadoop-hdds/pom.xml
+++ b/hadoop-hdds/pom.xml
@@ -45,7 +45,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <hdds.version>0.4.0-SNAPSHOT</hdds.version>
 
     <!-- Apache Ratis version -->
-    <ratis.version>0.3.0-aa38160-SNAPSHOT</ratis.version>
+    <ratis.version>0.3.0-2272086-SNAPSHOT</ratis.version>
 
     <bouncycastle.version>1.60</bouncycastle.version>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
index 1666b7c..0f980dc1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -523,9 +523,13 @@ public class SCMContainerManager implements ContainerManager {
       try {
         containerStateManager.updateContainerReplica(id, replica);
         ContainerInfo currentInfo = containerStateManager.getContainer(id);
-        if (newInfo.getState() == LifeCycleState.CLOSING
-            && currentInfo.getState() == LifeCycleState.CLOSED) {
+        if (newInfo.getState() == LifeCycleState.CLOSED
+            && currentInfo.getState() == LifeCycleState.CLOSING) {
           currentInfo = updateContainerStateInternal(id, LifeCycleEvent.CLOSE);
+          if (!currentInfo.isOpen()) {
+            pipelineManager.removeContainerFromPipeline(
+                currentInfo.getPipelineID(), id);
+          }
         }
 
         HddsProtos.SCMContainerInfo newState =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
index 45886c6..9a1c705 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
@@ -118,7 +118,7 @@ public class TestNodeFailure {
         pipelineManager.getPipeline(ratisContainer2.getPipeline().getId())
             .getPipelineState());
     // Now restart the datanode and make sure that a new pipeline is created.
-    cluster.restartHddsDatanode(dnToFail);
+    cluster.restartHddsDatanode(dnToFail, true);
     ContainerWithPipeline ratisContainer3 =
         containerManager.allocateContainer(RATIS, THREE, "testOwner");
     //Assert that new container is not created from the ratis 2 pipeline

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index d13efb4..3aad7f7 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -156,16 +156,16 @@ public interface MiniOzoneCluster {
    *
    * @param i index of HddsDatanode in the MiniOzoneCluster
    */
-  void restartHddsDatanode(int i) throws InterruptedException,
-      TimeoutException;
+  void restartHddsDatanode(int i, boolean waitForDatanode)
+      throws InterruptedException, TimeoutException;
 
   /**
    * Restart a particular HddsDatanode.
    *
    * @param dn HddsDatanode in the MiniOzoneCluster
    */
-  void restartHddsDatanode(DatanodeDetails dn) throws InterruptedException,
-      TimeoutException, IOException;
+  void restartHddsDatanode(DatanodeDetails dn, boolean waitForDatanode)
+      throws InterruptedException, TimeoutException, IOException;
   /**
    * Shutdown a particular HddsDatanode.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index ae52451..11bc0e0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -232,8 +232,8 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
   }
 
   @Override
-  public void restartHddsDatanode(int i) throws InterruptedException,
-      TimeoutException {
+  public void restartHddsDatanode(int i, boolean waitForDatanode)
+      throws InterruptedException, TimeoutException {
     HddsDatanodeService datanodeService = hddsDatanodes.get(i);
     datanodeService.stop();
     datanodeService.join();
@@ -248,20 +248,24 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
     conf.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort);
     conf.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false);
     hddsDatanodes.remove(i);
-    // wait for node to be removed from SCM healthy node list.
-    waitForClusterToBeReady();
+    if (waitForDatanode) {
+      // wait for node to be removed from SCM healthy node list.
+      waitForClusterToBeReady();
+    }
     HddsDatanodeService service =
         HddsDatanodeService.createHddsDatanodeService(conf);
     hddsDatanodes.add(i, service);
     service.start(null);
-    // wait for the node to be identified as a healthy node again.
-    waitForClusterToBeReady();
+    if (waitForDatanode) {
+      // wait for the node to be identified as a healthy node again.
+      waitForClusterToBeReady();
+    }
   }
 
   @Override
-  public void restartHddsDatanode(DatanodeDetails dn)
+  public void restartHddsDatanode(DatanodeDetails dn, boolean waitForDatanode)
       throws InterruptedException, TimeoutException, IOException {
-    restartHddsDatanode(getHddsDatanodeIndex(dn));
+    restartHddsDatanode(getHddsDatanodeIndex(dn), waitForDatanode);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java
index ed4629c..98099be 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java
@@ -137,7 +137,7 @@ public class TestBCSID {
         omKeyLocationInfo.getBlockCommitSequenceId());
 
     // verify that on restarting the datanode, it reloads the BCSID correctly.
-    cluster.restartHddsDatanode(0);
+    cluster.restartHddsDatanode(0, true);
     Assert.assertEquals(blockCommitSequenceId,
         cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
             .getContainer().getContainerSet()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index e4cbad5..63346d2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -177,7 +177,7 @@ public class TestBlockDeletion {
     // Containers in the DN and SCM should have same delete transactionIds
     // after DN restart. The assertion is just to verify that the state of
     // containerInfos in dn and scm is consistent after dn restart.
-    cluster.restartHddsDatanode(0);
+    cluster.restartHddsDatanode(0, true);
     matchContainerTransactionIds();
 
     // verify PENDING_DELETE_STATUS event is fired
@@ -210,7 +210,7 @@ public class TestBlockDeletion {
     GenericTestUtils.waitFor(() -> logCapturer.getOutput()
             .contains("RetriableDatanodeCommand type=deleteBlocksCommand"),
         500, 5000);
-    cluster.restartHddsDatanode(0);
+    cluster.restartHddsDatanode(0, true);
   }
 
   private void verifyTransactionsCommitted() throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
index 1ecedcc..08905eb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
@@ -326,7 +326,7 @@ public class TestKeys {
 
   private static void restartDatanode(MiniOzoneCluster cluster, int datanodeIdx)
       throws Exception {
-    cluster.restartHddsDatanode(datanodeIdx);
+    cluster.restartHddsDatanode(datanodeIdx, true);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-ozone/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-ozone/pom.xml b/hadoop-ozone/pom.xml
index 5e53134..2fcffab 100644
--- a/hadoop-ozone/pom.xml
+++ b/hadoop-ozone/pom.xml
@@ -33,7 +33,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <hadoop.version>3.2.1-SNAPSHOT</hadoop.version>
     <hdds.version>0.4.0-SNAPSHOT</hdds.version>
     <ozone.version>0.4.0-SNAPSHOT</ozone.version>
-    <ratis.version>0.3.0-aa38160-SNAPSHOT</ratis.version>
+    <ratis.version>0.3.0-2272086-SNAPSHOT</ratis.version>
     <bouncycastle.version>1.60</bouncycastle.version>
     <ozone.release>Badlands</ozone.release>
     <declared.ozone.version>${ozone.version}</declared.ozone.version>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java
new file mode 100644
index 0000000..44f6f1d
--- /dev/null
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.freon;
+
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.container.common.transport
+    .server.XceiverServerSpi;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis
+    .XceiverServerRatis;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests Freon with Datanode restarts without waiting for pipeline to close.
+ */
+public class TestFreonWithDatanodeFastRestart {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   *
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    cluster = MiniOzoneCluster.newBuilder(conf)
+      .setHbProcessorInterval(1000)
+      .setHbInterval(1000)
+      .setNumDatanodes(3)
+      .build();
+    cluster.waitForClusterToBeReady();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testRestart() throws Exception {
+    startFreon();
+    StateMachine sm = getStateMachine();
+    TermIndex termIndexBeforeRestart = sm.getLastAppliedTermIndex();
+    cluster.restartHddsDatanode(0, false);
+    sm = getStateMachine();
+    SimpleStateMachineStorage storage =
+        (SimpleStateMachineStorage)sm.getStateMachineStorage();
+    SingleFileSnapshotInfo snapshotInfo = storage.getLatestSnapshot();
+    TermIndex termInSnapshot = snapshotInfo.getTermIndex();
+    String expectedSnapFile =
+        storage.getSnapshotFile(termIndexBeforeRestart.getTerm(),
+            termIndexBeforeRestart.getIndex()).getAbsolutePath();
+    Assert.assertEquals(snapshotInfo.getFile().getPath().toString(),
+        expectedSnapFile);
+    Assert.assertEquals(termInSnapshot, termIndexBeforeRestart);
+
+    // After restart the term index might have progressed to apply pending
+    // transactions.
+    TermIndex termIndexAfterRestart = sm.getLastAppliedTermIndex();
+    Assert.assertTrue(termIndexAfterRestart.getIndex() >=
+        termIndexBeforeRestart.getIndex());
+    startFreon();
+  }
+
+  private void startFreon() throws Exception {
+    RandomKeyGenerator randomKeyGenerator =
+        new RandomKeyGenerator((OzoneConfiguration) cluster.getConf());
+    randomKeyGenerator.setNumOfVolumes(1);
+    randomKeyGenerator.setNumOfBuckets(1);
+    randomKeyGenerator.setNumOfKeys(1);
+    randomKeyGenerator.setType(ReplicationType.RATIS);
+    randomKeyGenerator.setFactor(ReplicationFactor.THREE);
+    randomKeyGenerator.setKeySize(20971520);
+    randomKeyGenerator.setValidateWrites(true);
+    randomKeyGenerator.call();
+    Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated());
+    Assert.assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated());
+    Assert.assertEquals(1, randomKeyGenerator.getNumberOfKeysAdded());
+    Assert.assertEquals(0, randomKeyGenerator.getUnsuccessfulValidationCount());
+  }
+
+  private StateMachine getStateMachine() throws Exception {
+    XceiverServerSpi server =
+        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().
+            getContainer().getServer(HddsProtos.ReplicationType.RATIS);
+    RaftServerProxy proxy =
+        (RaftServerProxy)(((XceiverServerRatis)server).getServer());
+    RaftGroupId groupId = proxy.getGroupIds().iterator().next();
+    RaftServerImpl impl = proxy.getImpl(groupId);
+    return impl.getStateMachine();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cfe1d/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java
index a1c50b6..7cb53d3 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java
@@ -18,17 +18,11 @@
 
 package org.apache.hadoop.ozone.freon;
 
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
-import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
-import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -36,7 +30,10 @@ import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_STALENODE_INTERVAL;
 
 /**
  * Tests Freon with Datanode restarts.
@@ -56,6 +53,12 @@ public class TestFreonWithDatanodeRestart {
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 5, TimeUnit.SECONDS);
+    conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
+        TimeUnit.SECONDS);
+    conf.setTimeDuration(HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL, 1,
+        TimeUnit.SECONDS);
+    conf.setTimeDuration(OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT, 5,
+        TimeUnit.SECONDS);
     cluster = MiniOzoneCluster.newBuilder(conf)
       .setHbProcessorInterval(1000)
       .setHbInterval(1000)
@@ -76,6 +79,12 @@ public class TestFreonWithDatanodeRestart {
 
   @Test
   public void testRestart() throws Exception {
+    startFreon();
+    cluster.restartHddsDatanode(0, true);
+    startFreon();
+  }
+
+  private void startFreon() throws Exception {
     RandomKeyGenerator randomKeyGenerator =
         new RandomKeyGenerator((OzoneConfiguration) cluster.getConf());
     randomKeyGenerator.setNumOfVolumes(1);
@@ -90,33 +99,5 @@ public class TestFreonWithDatanodeRestart {
     Assert.assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated());
     Assert.assertEquals(1, randomKeyGenerator.getNumberOfKeysAdded());
     Assert.assertEquals(0, randomKeyGenerator.getUnsuccessfulValidationCount());
-
-    ContainerStateMachine sm = getStateMachine();
-    TermIndex termIndexBeforeRestart = sm.getLastAppliedTermIndex();
-    cluster.restartHddsDatanode(0);
-    sm = getStateMachine();
-    SimpleStateMachineStorage storage =
-        (SimpleStateMachineStorage)sm.getStateMachineStorage();
-    SingleFileSnapshotInfo snapshotInfo = storage.getLatestSnapshot();
-    TermIndex termInSnapshot = snapshotInfo.getTermIndex();
-    String expectedSnapFile =
-        storage.getSnapshotFile(termIndexBeforeRestart.getTerm(),
-            termIndexBeforeRestart.getIndex()).getAbsolutePath();
-    Assert.assertEquals(snapshotInfo.getFile().getPath().toString(),
-        expectedSnapFile);
-    Assert.assertEquals(termInSnapshot, termIndexBeforeRestart);
-
-    // After restart the term index might have progressed to apply pending
-    // transactions.
-    TermIndex termIndexAfterRestart = sm.getLastAppliedTermIndex();
-    Assert.assertTrue(termIndexAfterRestart.getIndex() >=
-        termIndexBeforeRestart.getIndex());
-  }
-
-  private ContainerStateMachine getStateMachine() {
-    XceiverServerSpi server =
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().
-            getContainer().getServer(HddsProtos.ReplicationType.RATIS);
-    return ((XceiverServerRatis)server).getStateMachine();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org