You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bo...@apache.org on 2018/09/19 21:00:39 UTC

[29/50] [abbrv] hadoop git commit: HDDS-476. Add Pipeline reports to make pipeline active on SCM restart. Contributed by Mukul Kumar Singh.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
index 82946bd..59d937e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
@@ -16,9 +16,12 @@
  */
 package org.apache.hadoop.hdds.scm.pipelines;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
@@ -30,6 +33,7 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
 import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl;
 import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -75,11 +79,9 @@ public class PipelineSelector {
   private static final Logger LOG =
       LoggerFactory.getLogger(PipelineSelector.class);
   private final ContainerPlacementPolicy placementPolicy;
-  private final NodeManager nodeManager;
+  private final Map<ReplicationType, PipelineManager> pipelineManagerMap;
   private final Configuration conf;
   private final EventPublisher eventPublisher;
-  private final RatisManagerImpl ratisManager;
-  private final StandaloneManagerImpl standaloneManager;
   private final long containerSize;
   private final MetadataStore pipelineStore;
   private final PipelineStateManager stateManager;
@@ -96,7 +98,6 @@ public class PipelineSelector {
    */
   public PipelineSelector(NodeManager nodeManager, Configuration conf,
       EventPublisher eventPublisher, int cacheSizeMB) throws IOException {
-    this.nodeManager = nodeManager;
     this.conf = conf;
     this.eventPublisher = eventPublisher;
     this.placementPolicy = createContainerPlacementPolicy(nodeManager, conf);
@@ -106,12 +107,14 @@ public class PipelineSelector {
         StorageUnit.BYTES);
     node2PipelineMap = new Node2PipelineMap();
     pipelineMap = new ConcurrentHashMap<>();
-    this.standaloneManager =
-        new StandaloneManagerImpl(this.nodeManager, placementPolicy,
-            containerSize);
-    this.ratisManager =
-        new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
-            conf);
+    pipelineManagerMap = new HashMap<>();
+
+    pipelineManagerMap.put(ReplicationType.STAND_ALONE,
+            new StandaloneManagerImpl(nodeManager, placementPolicy,
+            containerSize));
+    pipelineManagerMap.put(ReplicationType.RATIS,
+            new RatisManagerImpl(nodeManager, placementPolicy,
+                    containerSize, conf));
     long pipelineCreationLeaseTimeout = conf.getTimeDuration(
         ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
         ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
@@ -154,6 +157,7 @@ public class PipelineSelector {
     }
   }
 
+  @VisibleForTesting
   public Set<ContainerID> getOpenContainerIDsByPipeline(PipelineID pipelineID) {
     return pipeline2ContainerMap.get(pipelineID);
   }
@@ -227,30 +231,6 @@ public class PipelineSelector {
   }
 
   /**
-   * Return the pipeline manager from the replication type.
-   *
-   * @param replicationType - Replication Type Enum.
-   * @return pipeline Manager.
-   * @throws IllegalArgumentException If an pipeline type gets added
-   * and this function is not modified we will throw.
-   */
-  private PipelineManager getPipelineManager(ReplicationType replicationType)
-      throws IllegalArgumentException {
-    switch (replicationType) {
-    case RATIS:
-      return this.ratisManager;
-    case STAND_ALONE:
-      return this.standaloneManager;
-    case CHAINED:
-      throw new IllegalArgumentException("Not implemented yet");
-    default:
-      throw new IllegalArgumentException("Unexpected enum found. Does not" +
-          " know how to handle " + replicationType.toString());
-    }
-
-  }
-
-  /**
    * This function is called by the Container Manager while allocating a new
    * container. The client specifies what kind of replication pipeline is needed
    * and based on the replication type in the request appropriate Interface is
@@ -260,7 +240,7 @@ public class PipelineSelector {
   public Pipeline getReplicationPipeline(ReplicationType replicationType,
       HddsProtos.ReplicationFactor replicationFactor)
       throws IOException {
-    PipelineManager manager = getPipelineManager(replicationType);
+    PipelineManager manager = pipelineManagerMap.get(replicationType);
     Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
     LOG.debug("Getting replication pipeline forReplicationType {} :" +
             " ReplicationFactor {}", replicationType.toString(),
@@ -316,7 +296,7 @@ public class PipelineSelector {
    * Finalize a given pipeline.
    */
   public void finalizePipeline(Pipeline pipeline) throws IOException {
-    PipelineManager manager = getPipelineManager(pipeline.getType());
+    PipelineManager manager = pipelineManagerMap.get(pipeline.getType());
     Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
     if (pipeline.getLifeCycleState() == LifeCycleState.CLOSING ||
         pipeline.getLifeCycleState() == LifeCycleState.CLOSED) {
@@ -327,17 +307,17 @@ public class PipelineSelector {
     }
 
     // Remove the pipeline from active allocation
-    manager.finalizePipeline(pipeline);
-
-    LOG.info("Finalizing pipeline. pipelineID: {}", pipeline.getId());
-    updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.FINALIZE);
-    closePipelineIfNoOpenContainers(pipeline);
+    if (manager.finalizePipeline(pipeline)) {
+      LOG.info("Finalizing pipeline. pipelineID: {}", pipeline.getId());
+      updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.FINALIZE);
+      closePipelineIfNoOpenContainers(pipeline);
+    }
   }
 
   /**
    * Close a given pipeline.
    */
-  public void closePipelineIfNoOpenContainers(Pipeline pipeline)
+  private void closePipelineIfNoOpenContainers(Pipeline pipeline)
       throws IOException {
     if (pipeline.getLifeCycleState() != LifeCycleState.CLOSING) {
       return;
@@ -354,7 +334,7 @@ public class PipelineSelector {
    * Close a given pipeline.
    */
   private void closePipeline(Pipeline pipeline) throws IOException {
-    PipelineManager manager = getPipelineManager(pipeline.getType());
+    PipelineManager manager = pipelineManagerMap.get(pipeline.getType());
     Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
     LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getId());
     HashSet<ContainerID> containers =
@@ -367,7 +347,7 @@ public class PipelineSelector {
    * Add to a given pipeline.
    */
   private void addOpenPipeline(Pipeline pipeline) {
-    PipelineManager manager = getPipelineManager(pipeline.getType());
+    PipelineManager manager = pipelineManagerMap.get(pipeline.getType());
     Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
     LOG.debug("Adding Open pipeline. pipelineID: {}", pipeline.getId());
     manager.addOpenPipeline(pipeline);
@@ -381,7 +361,7 @@ public class PipelineSelector {
     }
   }
 
-  public Set<PipelineID> getPipelineId(UUID dnId) {
+  public Set<PipelineID> getPipelineByDnID(UUID dnId) {
     return node2PipelineMap.getPipelines(dnId);
   }
 
@@ -400,6 +380,9 @@ public class PipelineSelector {
       pipelineMap.put(pipeline.getId(), pipeline);
       pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>());
       node2PipelineMap.addPipeline(pipeline);
+      // reset the datanodes in the pipeline
+      // they will be reset on
+      pipeline.resetPipeline();
       break;
     case CLOSED:
       // if the pipeline is in closed state, nothing to do.
@@ -409,6 +392,36 @@ public class PipelineSelector {
     }
   }
 
+  public void handleStaleNode(DatanodeDetails dn) {
+    Set<PipelineID> pipelineIDs = getPipelineByDnID(dn.getUuid());
+    for (PipelineID id : pipelineIDs) {
+      LOG.info("closing pipeline {}.", id);
+      eventPublisher.fireEvent(SCMEvents.PIPELINE_CLOSE, id);
+    }
+  }
+
+  void processPipelineReport(DatanodeDetails dn,
+                                    PipelineReportsProto pipelineReport) {
+    Set<PipelineID> reportedPipelines = new HashSet<>();
+    pipelineReport.getPipelineReportList().
+            forEach(p ->
+                    reportedPipelines.add(
+                            processPipelineReport(p.getPipelineID(), dn)));
+
+    //TODO: handle missing pipelines and new pipelines later
+  }
+
+  private PipelineID processPipelineReport(
+          HddsProtos.PipelineID id, DatanodeDetails dn) {
+    PipelineID pipelineID = PipelineID.getFromProtobuf(id);
+    Pipeline pipeline = pipelineMap.get(pipelineID);
+    if (pipeline != null) {
+      pipelineManagerMap.get(pipeline.getType())
+              .processPipelineReport(pipeline, dn);
+    }
+    return pipelineID;
+  }
+
   /**
    * Update the Pipeline State to the next state.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
index d3cec88..905a5b5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
@@ -73,20 +73,19 @@ public class RatisManagerImpl extends PipelineManager {
   public Pipeline allocatePipeline(ReplicationFactor factor) {
     List<DatanodeDetails> newNodesList = new LinkedList<>();
     List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
-    int count = getReplicationCount(factor);
     //TODO: Add Raft State to the Nodes, so we can query and skip nodes from
     // data from datanode instead of maintaining a set.
     for (DatanodeDetails datanode : datanodes) {
       Preconditions.checkNotNull(datanode);
       if (!ratisMembers.contains(datanode)) {
         newNodesList.add(datanode);
-        if (newNodesList.size() == count) {
+        if (newNodesList.size() == factor.getNumber()) {
           // once a datanode has been added to a pipeline, exclude it from
           // further allocations
           ratisMembers.addAll(newNodesList);
           PipelineID pipelineID = PipelineID.randomId();
           LOG.info("Allocating a new ratis pipeline of size: {} id: {}",
-              count, pipelineID);
+                  factor.getNumber(), pipelineID);
           return PipelineSelector.newPipelineFromNodes(newNodesList,
               ReplicationType.RATIS, factor, pipelineID);
         }
@@ -103,6 +102,17 @@ public class RatisManagerImpl extends PipelineManager {
     }
   }
 
+  public void processPipelineReport(Pipeline pipeline, DatanodeDetails dn) {
+    super.processPipelineReport(pipeline, dn);
+    ratisMembers.add(dn);
+  }
+
+  public synchronized boolean finalizePipeline(Pipeline pipeline) {
+    activePipelines.get(pipeline.getFactor().ordinal())
+            .removePipeline(pipeline.getId());
+    return true;
+  }
+
   /**
    * Close the pipeline.
    */
@@ -116,29 +126,4 @@ public class RatisManagerImpl extends PipelineManager {
       Preconditions.checkArgument(ratisMembers.remove(node));
     }
   }
-
-  /**
-   * list members in the pipeline .
-   *
-   * @param pipelineID
-   * @return the datanode
-   */
-  @Override
-  public List<DatanodeDetails> getMembers(PipelineID pipelineID)
-      throws IOException {
-    return null;
-  }
-
-  /**
-   * Update the datanode list of the pipeline.
-   *
-   * @param pipelineID
-   * @param newDatanodes
-   */
-  @Override
-  public void updatePipeline(PipelineID pipelineID,
-                             List<DatanodeDetails> newDatanodes)
-      throws IOException {
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
index ed2fc2f..045afb6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -74,18 +74,19 @@ public class StandaloneManagerImpl extends PipelineManager {
   public Pipeline allocatePipeline(ReplicationFactor factor) {
     List<DatanodeDetails> newNodesList = new LinkedList<>();
     List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
-    int count = getReplicationCount(factor);
     for (DatanodeDetails datanode : datanodes) {
       Preconditions.checkNotNull(datanode);
       if (!standAloneMembers.contains(datanode)) {
         newNodesList.add(datanode);
-        if (newNodesList.size() == count) {
+        if (newNodesList.size() == factor.getNumber()) {
           // once a datanode has been added to a pipeline, exclude it from
           // further allocations
           standAloneMembers.addAll(newNodesList);
-          PipelineID pipelineID = PipelineID.randomId();
+          // Standalone pipeline use node id as pipeline
+          PipelineID pipelineID =
+                  PipelineID.valueOf(newNodesList.get(0).getUuid());
           LOG.info("Allocating a new standalone pipeline of size: {} id: {}",
-              count, pipelineID);
+              factor.getNumber(), pipelineID);
           return PipelineSelector.newPipelineFromNodes(newNodesList,
               ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineID);
         }
@@ -98,6 +99,17 @@ public class StandaloneManagerImpl extends PipelineManager {
     // Nothing to be done for standalone pipeline
   }
 
+  public void processPipelineReport(Pipeline pipeline, DatanodeDetails dn) {
+    super.processPipelineReport(pipeline, dn);
+    standAloneMembers.add(dn);
+  }
+
+  public synchronized boolean finalizePipeline(Pipeline pipeline) {
+    activePipelines.get(pipeline.getFactor().ordinal())
+            .removePipeline(pipeline.getId());
+    return false;
+  }
+
   /**
    * Close the pipeline.
    */
@@ -107,28 +119,4 @@ public class StandaloneManagerImpl extends PipelineManager {
       Preconditions.checkArgument(standAloneMembers.remove(node));
     }
   }
-
-  /**
-   * list members in the pipeline .
-   *
-   * @param pipelineID
-   * @return the datanode
-   */
-  @Override
-  public List<DatanodeDetails> getMembers(PipelineID pipelineID)
-      throws IOException {
-    return null;
-  }
-
-  /**
-   * Update the datanode list of the pipeline.
-   *
-   * @param pipelineID
-   * @param newDatanodes
-   */
-  @Override
-  public void updatePipeline(PipelineID pipelineID, List<DatanodeDetails>
-      newDatanodes) throws IOException {
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index a651f62..e65de8b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdds.scm.server;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineActionsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
@@ -46,6 +48,7 @@ import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_ACTIONS;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_REPORT;
 
 /**
  * This class is responsible for dispatching heartbeat from datanode to
@@ -103,6 +106,14 @@ public final class SCMDatanodeHeartbeatDispatcher {
               heartbeat.getContainerActions()));
     }
 
+    if (heartbeat.hasPipelineReports()) {
+      LOG.debug("Dispatching Pipeline Report.");
+      eventPublisher.fireEvent(PIPELINE_REPORT,
+              new PipelineReportFromDatanode(datanodeDetails,
+                      heartbeat.getPipelineReports()));
+
+    }
+
     if (heartbeat.hasPipelineActions()) {
       LOG.debug("Dispatching Pipeline Actions.");
       eventPublisher.fireEvent(PIPELINE_ACTIONS,
@@ -179,6 +190,18 @@ public final class SCMDatanodeHeartbeatDispatcher {
   }
 
   /**
+   * Pipeline report event payload with origin.
+   */
+  public static class PipelineReportFromDatanode
+          extends ReportFromDatanode<PipelineReportsProto> {
+
+    public PipelineReportFromDatanode(DatanodeDetails datanodeDetails,
+                                      PipelineReportsProto report) {
+      super(datanodeDetails, report);
+    }
+  }
+
+  /**
    * Pipeline action event payload with origin.
    */
   public static class PipelineActionsFromDatanode

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 8a09dc8..4a0d3e5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
@@ -74,7 +76,10 @@ import static org.apache.hadoop.hdds.protocol.proto
 
 import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .ReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+        .PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -102,6 +107,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRES
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
 
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_REPORT;
 import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
 import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
 
@@ -190,13 +196,14 @@ public class SCMDatanodeProtocolServer implements
   public SCMRegisteredResponseProto register(
       HddsProtos.DatanodeDetailsProto datanodeDetailsProto,
       NodeReportProto nodeReport,
-      ContainerReportsProto containerReportsProto)
+      ContainerReportsProto containerReportsProto,
+          PipelineReportsProto pipelineReportsProto)
       throws IOException {
     DatanodeDetails datanodeDetails = DatanodeDetails
         .getFromProtoBuf(datanodeDetailsProto);
     // TODO : Return the list of Nodes that forms the SCM HA.
     RegisteredCommand registeredCommand = scm.getScmNodeManager()
-        .register(datanodeDetails, nodeReport);
+        .register(datanodeDetails, nodeReport, pipelineReportsProto);
     if (registeredCommand.getError()
         == SCMRegisteredResponseProto.ErrorCode.success) {
       scm.getScmContainerManager().processContainerReports(datanodeDetails,
@@ -204,6 +211,9 @@ public class SCMDatanodeProtocolServer implements
       eventPublisher.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
           new NodeRegistrationContainerReport(datanodeDetails,
               containerReportsProto));
+      eventPublisher.fireEvent(PIPELINE_REPORT,
+              new PipelineReportFromDatanode(datanodeDetails,
+                      pipelineReportsProto));
     }
     return getRegisteredResponse(registeredCommand);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 8e76606..2169149 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hdds.scm.node.StaleNodeHandler;
 import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineCloseHandler;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineActionEventHandler;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineReportHandler;
 import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -217,13 +218,16 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         new CloseContainerEventHandler(scmContainerManager);
     NodeReportHandler nodeReportHandler =
         new NodeReportHandler(scmNodeManager);
-
+    PipelineReportHandler pipelineReportHandler =
+            new PipelineReportHandler(
+                    scmContainerManager.getPipelineSelector());
     CommandStatusReportHandler cmdStatusReportHandler =
         new CommandStatusReportHandler();
 
     NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap);
     StaleNodeHandler staleNodeHandler =
-        new StaleNodeHandler(node2ContainerMap, scmContainerManager);
+        new StaleNodeHandler(node2ContainerMap,
+                scmContainerManager.getPipelineSelector());
     DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap,
         getScmContainerManager().getStateManager());
     ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
@@ -240,7 +244,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         new PipelineActionEventHandler();
 
     PipelineCloseHandler pipelineCloseHandler =
-        new PipelineCloseHandler(scmContainerManager);
+        new PipelineCloseHandler(scmContainerManager.getPipelineSelector());
 
     long watcherTimeout =
         conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
@@ -300,6 +304,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS,
         (BlockManagerImpl) scmBlockManager);
     eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, clientProtocolServer);
+    eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
 
     registerMXBean();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 7af9dda..24a16c7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -17,6 +17,8 @@
 package org.apache.hadoop.hdds.scm;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.mockito.Mockito;
 import static org.mockito.Mockito.when;
 
@@ -139,7 +141,8 @@ public final class TestUtils {
   public static DatanodeDetails createRandomDatanodeAndRegister(
       SCMNodeManager nodeManager) {
     return getDatanodeDetails(
-        nodeManager.register(randomDatanodeDetails(), null));
+        nodeManager.register(randomDatanodeDetails(), null,
+                getRandomPipelineReports()));
   }
 
   /**
@@ -299,6 +302,11 @@ public final class TestUtils {
     return getContainerReports(containerInfos);
   }
 
+
+  public static PipelineReportsProto getRandomPipelineReports() {
+    return PipelineReportsProto.newBuilder().build();
+  }
+
   /**
    * Creates container report with the given ContainerInfo(s).
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 088b700..21e44a3 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -16,6 +16,8 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
@@ -356,7 +358,7 @@ public class MockNodeManager implements NodeManager {
    */
   @Override
   public RegisteredCommand register(DatanodeDetails datanodeDetails,
-      NodeReportProto nodeReport) {
+      NodeReportProto nodeReport, PipelineReportsProto pipelineReportsProto) {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
index f438c8b..cbe96ee 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
@@ -286,7 +286,8 @@ public class TestNodeManager {
         TestUtils.createStorageReport(dnId, storagePath, 100, 10, 90, null);
     try (SCMNodeManager nodemanager = createNodeManager(conf)) {
       nodemanager.register(datanodeDetails,
-          TestUtils.createNodeReport(report));
+          TestUtils.createNodeReport(report),
+          TestUtils.getRandomPipelineReports());
       List<SCMCommand> command = nodemanager.processHeartbeat(datanodeDetails);
       Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeDetails));
       Assert.assertTrue("On regular HB calls, SCM responses a "
@@ -1122,7 +1123,8 @@ public class TestNodeManager {
       eq.addHandler(DATANODE_COMMAND, nodemanager);
 
       nodemanager
-          .register(datanodeDetails, TestUtils.createNodeReport(report));
+          .register(datanodeDetails, TestUtils.createNodeReport(report),
+                  TestUtils.getRandomPipelineReports());
       eq.fireEvent(DATANODE_COMMAND,
           new CommandForDatanode<>(datanodeDetails.getUuid(),
               new CloseContainerCommand(1L, ReplicationType.STAND_ALONE,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java
index 14a74e9..ec1d527 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java
@@ -116,7 +116,7 @@ public class TestNode2ContainerMap {
     Assert.assertTrue(map.isKnownDatanode(key));
     ReportResult result = map.processReport(key, values);
     Assert.assertEquals(result.getStatus(),
-        Node2ContainerMap.ReportStatus.ALL_IS_WELL);
+        ReportResult.ReportStatus.ALL_IS_WELL);
   }
 
   @Test
@@ -181,9 +181,9 @@ public class TestNode2ContainerMap {
     UUID key = getFirstKey();
     TreeSet<ContainerID> values = testData.get(key);
     ReportResult result = map.processReport(key, values);
-    Assert.assertEquals(Node2ContainerMap.ReportStatus.NEW_DATANODE_FOUND,
+    Assert.assertEquals(ReportResult.ReportStatus.NEW_DATANODE_FOUND,
         result.getStatus());
-    Assert.assertEquals(result.getNewContainers().size(), values.size());
+    Assert.assertEquals(result.getNewEntries().size(), values.size());
   }
 
   /**
@@ -216,15 +216,15 @@ public class TestNode2ContainerMap {
     ReportResult result = map.processReport(key, newContainersSet);
 
     //Assert that expected size of missing container is same as addedContainers
-    Assert.assertEquals(Node2ContainerMap.ReportStatus.NEW_CONTAINERS_FOUND,
+    Assert.assertEquals(ReportResult.ReportStatus.NEW_ENTRIES_FOUND,
         result.getStatus());
 
     Assert.assertEquals(addedContainers.size(),
-        result.getNewContainers().size());
+        result.getNewEntries().size());
 
     // Assert that the Container IDs are the same as we added new.
     Assert.assertTrue("All objects are not removed.",
-        result.getNewContainers().removeAll(addedContainers));
+        result.getNewEntries().removeAll(addedContainers));
   }
 
   /**
@@ -261,14 +261,14 @@ public class TestNode2ContainerMap {
 
 
     //Assert that expected size of missing container is same as addedContainers
-    Assert.assertEquals(Node2ContainerMap.ReportStatus.MISSING_CONTAINERS,
+    Assert.assertEquals(ReportResult.ReportStatus.MISSING_ENTRIES,
         result.getStatus());
     Assert.assertEquals(removedContainers.size(),
-        result.getMissingContainers().size());
+        result.getMissingEntries().size());
 
     // Assert that the Container IDs are the same as we added new.
     Assert.assertTrue("All missing containers not found.",
-        result.getMissingContainers().removeAll(removedContainers));
+        result.getMissingEntries().removeAll(removedContainers));
   }
 
   @Test
@@ -307,21 +307,21 @@ public class TestNode2ContainerMap {
 
 
     Assert.assertEquals(
-        Node2ContainerMap.ReportStatus.MISSING_AND_NEW_CONTAINERS_FOUND,
+            ReportResult.ReportStatus.MISSING_AND_NEW_ENTRIES_FOUND,
         result.getStatus());
     Assert.assertEquals(removedContainers.size(),
-        result.getMissingContainers().size());
+        result.getMissingEntries().size());
 
 
     // Assert that the Container IDs are the same as we added new.
     Assert.assertTrue("All missing containers not found.",
-        result.getMissingContainers().removeAll(removedContainers));
+        result.getMissingEntries().removeAll(removedContainers));
 
     Assert.assertEquals(insertedSet.size(),
-        result.getNewContainers().size());
+        result.getNewEntries().size());
 
     // Assert that the Container IDs are the same as we added new.
     Assert.assertTrue("All inserted containers are not found.",
-        result.getNewContainers().removeAll(insertedSet));
+        result.getNewEntries().removeAll(insertedSet));
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index a513f6c..390746f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -280,7 +280,8 @@ public class TestEndPoint {
           .register(nodeToRegister.getProtoBufMessage(), TestUtils
                   .createNodeReport(
                       getStorageReports(nodeToRegister.getUuid())),
-              TestUtils.getRandomContainerReports(10));
+              TestUtils.getRandomContainerReports(10),
+                  TestUtils.getRandomPipelineReports());
       Assert.assertNotNull(responseProto);
       Assert.assertEquals(nodeToRegister.getUuidString(),
           responseProto.getDatanodeUUID());
@@ -308,6 +309,8 @@ public class TestEndPoint {
         .createNodeReport(getStorageReports(UUID.randomUUID())));
     when(ozoneContainer.getContainerReport()).thenReturn(
         TestUtils.getRandomContainerReports(10));
+    when(ozoneContainer.getPipelineReport()).thenReturn(
+            TestUtils.getRandomPipelineReports());
     RegisterEndpointTask endpointTask =
         new RegisterEndpointTask(rpcEndPoint, conf, ozoneContainer,
             mock(StateContext.class));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index a0249aa..e8a6892 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -17,6 +17,8 @@
 package org.apache.hadoop.ozone.container.testutils;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.node.CommandQueue;
@@ -252,7 +254,8 @@ public class ReplicationNodeManagerMock implements NodeManager {
    */
   @Override
   public RegisteredCommand register(DatanodeDetails dd,
-                                    NodeReportProto nodeReport) {
+                                    NodeReportProto nodeReport,
+                                    PipelineReportsProto pipelineReportsProto) {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
index aefa6b0..ad3798e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
@@ -98,7 +98,7 @@ public class TestNode2PipelineMap {
 
     // get pipeline details by dnid
     Set<PipelineID> pipelines = mapping.getPipelineSelector()
-        .getPipelineId(dns.get(0).getUuid());
+        .getPipelineByDnID(dns.get(0).getUuid());
     Assert.assertEquals(1, pipelines.size());
     pipelines.forEach(p -> Assert.assertEquals(p,
         ratisContainer.getPipeline().getId()));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index a5828e1..5eabfb9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -119,7 +119,7 @@ public class TestPipelineClose {
         HddsProtos.LifeCycleState.CLOSED);
     for (DatanodeDetails dn : ratisContainer1.getPipeline().getMachines()) {
       // Assert that the pipeline has been removed from Node2PipelineMap as well
-      Assert.assertEquals(pipelineSelector.getPipelineId(
+      Assert.assertEquals(pipelineSelector.getPipelineByDnID(
           dn.getUuid()).size(), 0);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
index 3999d76..fb94b3c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.ContainerMapping;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -87,7 +88,7 @@ public class TestSCMRestart {
   }
 
   @Test
-  public void testPipelineWithScmRestart() {
+  public void testPipelineWithScmRestart() throws IOException {
     // After restart make sure that the pipeline are still present
     Pipeline ratisPipeline1AfterRestart = newMapping.getPipelineSelector()
             .getPipeline(ratisPipeline1.getId());
@@ -97,5 +98,22 @@ public class TestSCMRestart {
     Assert.assertNotSame(ratisPipeline2AfterRestart, ratisPipeline2);
     Assert.assertEquals(ratisPipeline1AfterRestart, ratisPipeline1);
     Assert.assertEquals(ratisPipeline2AfterRestart, ratisPipeline2);
+
+    for (DatanodeDetails dn : ratisPipeline1.getMachines()) {
+      Assert.assertEquals(dn, ratisPipeline1AfterRestart.getDatanodes()
+              .get(dn.getUuidString()));
+    }
+
+    for (DatanodeDetails dn : ratisPipeline2.getMachines()) {
+      Assert.assertEquals(dn, ratisPipeline2AfterRestart.getDatanodes()
+              .get(dn.getUuidString()));
+    }
+
+    // Try creating a new ratis pipeline, it should be from the same pipeline
+    // as was before restart
+    Pipeline newRatisPipeline =
+            newMapping.allocateContainer(RATIS, THREE, "Owner1")
+                    .getPipeline();
+    Assert.assertEquals(newRatisPipeline.getId(), ratisPipeline1.getId());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
index 1cb2cda..a83c16e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
@@ -36,8 +36,12 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
 /**
  * Helpers for Ratis tests.
  */
@@ -60,6 +64,7 @@ public interface RatisTestHelper {
     public RatisTestSuite()
         throws IOException, TimeoutException, InterruptedException {
       conf = newOzoneConfiguration(RPC);
+
       cluster = newMiniOzoneCluster(NUM_DATANODES, conf);
     }
 
@@ -96,6 +101,8 @@ public interface RatisTestHelper {
   static void initRatisConf(RpcType rpc, Configuration conf) {
     conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, true);
     conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, rpc.name());
+    conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 1, TimeUnit.SECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
     LOG.info(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY
         + " = " + rpc.name());
   }
@@ -104,6 +111,8 @@ public interface RatisTestHelper {
       int numDatanodes, OzoneConfiguration conf)
       throws IOException, TimeoutException, InterruptedException {
     final MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
+        .setHbInterval(1000)
+        .setHbProcessorInterval(1000)
         .setNumDatanodes(numDatanodes).build();
     cluster.waitForClusterToBeReady();
     return cluster;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
index 6377f11..02cd985 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
@@ -136,6 +136,7 @@ public class TestKeys {
     ozoneCluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(1)
         .setHbInterval(1000)
+        .setHbProcessorInterval(1000)
         .build();
     ozoneCluster.waitForClusterToBeReady();
     client = new RpcClient(conf);
@@ -328,7 +329,6 @@ public class TestKeys {
     cluster.restartHddsDatanode(datanodeIdx);
   }
 
-  @Ignore("Causes a JVm exit")
   @Test
   public void testPutAndGetKeyWithDnRestart() throws Exception {
     runTestPutAndGetKeyWithDnRestart(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeysRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeysRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeysRatis.java
index 915d0f6..2e8f539 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeysRatis.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeysRatis.java
@@ -26,7 +26,6 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.Ignore;
 import org.junit.rules.Timeout;
 
 import static org.apache.hadoop.ozone.web.client
@@ -83,7 +82,6 @@ public class TestKeysRatis {
         getMultiPartKey(delimiter)));
   }
 
-  @Ignore("disabling for now, datanodes restart with ratis is buggy")
   @Test
   public void testPutAndGetKeyWithDnRestart() throws Exception {
     runTestPutAndGetKeyWithDnRestart(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 275ae6e..caf6d4f 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -101,7 +101,7 @@
     <ldap-api.version>1.0.0-M33</ldap-api.version>
 
     <!-- Apache Ratis version -->
-    <ratis.version>0.3.0-50588bd-SNAPSHOT</ratis.version>
+    <ratis.version>0.3.0-eca3531-SNAPSHOT</ratis.version>
     <jcache.version>1.0-alpha-1</jcache.version>
     <ehcache.version>3.3.1</ehcache.version>
     <hikari.version>2.4.12</hikari.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org