You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2018/09/19 13:23:02 UTC

[2/2] hadoop git commit: HDDS-476. Add Pipeline reports to make pipeline active on SCM restart. Contributed by Mukul Kumar Singh.

HDDS-476. Add Pipeline reports to make pipeline active on SCM restart.
Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c0956ee2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c0956ee2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c0956ee2

Branch: refs/heads/trunk
Commit: c0956ee2a879d1f82938dd2b8bab79b09ae32eac
Parents: 0712537e
Author: Nanda kumar <na...@apache.org>
Authored: Wed Sep 19 18:49:13 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Wed Sep 19 18:52:08 2018 +0530

----------------------------------------------------------------------
 .../hadoop/hdds/scm/XceiverClientRatis.java     |   2 +-
 .../org/apache/hadoop/hdds/HddsConfigKeys.java  |   5 +
 .../scm/container/common/helpers/Pipeline.java  |  22 ++-
 .../container/common/helpers/PipelineID.java    |  13 +-
 .../common/src/main/resources/ozone-default.xml |   8 +
 .../apache/hadoop/hdds/scm/HddsServerUtil.java  |  21 +++
 .../common/report/PipelineReportPublisher.java  |  73 +++++++++
 .../common/report/ReportPublisherFactory.java   |   4 +
 .../states/endpoint/RegisterEndpointTask.java   |   8 +-
 .../transport/server/XceiverServerGrpc.java     |  16 ++
 .../transport/server/XceiverServerSpi.java      |   9 ++
 .../server/ratis/XceiverServerRatis.java        |  63 ++++----
 .../container/ozoneimpl/OzoneContainer.java     |  12 ++
 .../StorageContainerDatanodeProtocol.java       |  10 +-
 .../protocol/StorageContainerNodeProtocol.java  |   6 +-
 ...rDatanodeProtocolClientSideTranslatorPB.java |   6 +-
 ...rDatanodeProtocolServerSideTranslatorPB.java |   5 +-
 .../StorageContainerDatanodeProtocol.proto      |  10 ++
 .../ozone/container/common/ScmTestMock.java     |   8 +-
 .../hdds/scm/container/ContainerMapping.java    |  19 ---
 .../scm/container/ContainerReportHandler.java   |   6 +-
 .../hadoop/hdds/scm/container/Mapping.java      |  15 +-
 .../hadoop/hdds/scm/events/SCMEvents.java       |  15 +-
 .../hadoop/hdds/scm/node/SCMNodeManager.java    |   5 +-
 .../hadoop/hdds/scm/node/StaleNodeHandler.java  |  19 +--
 .../hdds/scm/node/states/Node2ContainerMap.java | 123 ++------------
 .../hdds/scm/node/states/Node2ObjectsMap.java   | 162 +++++++++++++++++++
 .../hdds/scm/node/states/ReportResult.java      | 105 ++++++------
 .../hdds/scm/pipelines/Node2PipelineMap.java    |  45 +-----
 .../scm/pipelines/PipelineCloseHandler.java     |  24 ++-
 .../hdds/scm/pipelines/PipelineManager.java     |  52 +++---
 .../scm/pipelines/PipelineReportHandler.java    |  59 +++++++
 .../hdds/scm/pipelines/PipelineSelector.java    | 103 ++++++------
 .../scm/pipelines/ratis/RatisManagerImpl.java   |  41 ++---
 .../standalone/StandaloneManagerImpl.java       |  44 ++---
 .../server/SCMDatanodeHeartbeatDispatcher.java  |  23 +++
 .../scm/server/SCMDatanodeProtocolServer.java   |  16 +-
 .../scm/server/StorageContainerManager.java     |  11 +-
 .../org/apache/hadoop/hdds/scm/TestUtils.java   |  10 +-
 .../hdds/scm/container/MockNodeManager.java     |   4 +-
 .../hadoop/hdds/scm/node/TestNodeManager.java   |   6 +-
 .../scm/node/states/TestNode2ContainerMap.java  |  28 ++--
 .../ozone/container/common/TestEndPoint.java    |   5 +-
 .../testutils/ReplicationNodeManagerMock.java   |   5 +-
 .../hdds/scm/pipeline/TestNode2PipelineMap.java |   2 +-
 .../hdds/scm/pipeline/TestPipelineClose.java    |   2 +-
 .../hdds/scm/pipeline/TestSCMRestart.java       |  20 ++-
 .../apache/hadoop/ozone/RatisTestHelper.java    |   9 ++
 .../hadoop/ozone/web/client/TestKeys.java       |   2 +-
 .../hadoop/ozone/web/client/TestKeysRatis.java  |   2 -
 hadoop-project/pom.xml                          |   2 +-
 51 files changed, 809 insertions(+), 476 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 946abfb..4c4de7f 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -110,7 +110,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
     LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
     callRatisRpc(pipeline.getMachines(), (raftClient, peer) -> raftClient
-        .groupRemove(group.getGroupId(), peer.getId()));
+        .groupRemove(group.getGroupId(), true, peer.getId()));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 492be82..856d113 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -46,6 +46,11 @@ public final class HddsConfigKeys {
   public static final String HDDS_CONTAINER_REPORT_INTERVAL_DEFAULT =
       "60s";
 
+  public static final String HDDS_PIPELINE_REPORT_INTERVAL =
+          "hdds.pipeline.report.interval";
+  public static final String HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT =
+          "60s";
+
   public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL =
       "hdds.command.status.report.interval";
   public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
index ef148e5..777efa7 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Map;
-import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.List;
 
 /**
@@ -83,7 +83,7 @@ public class Pipeline {
     this.type = replicationType;
     this.factor = replicationFactor;
     this.id = id;
-    datanodes = new TreeMap<>();
+    datanodes = new ConcurrentHashMap<>();
   }
 
   @Override
@@ -151,9 +151,21 @@ public class Pipeline {
     return getDatanodes().get(leaderID);
   }
 
-  public void addMember(DatanodeDetails datanodeDetails) {
-    datanodes.put(datanodeDetails.getUuid().toString(),
-        datanodeDetails);
+  /**
+   * Adds a datanode to pipeline
+   * @param datanodeDetails datanode to be added.
+   * @return true if the dn was not earlier present, false otherwise
+   */
+  public boolean addMember(DatanodeDetails datanodeDetails) {
+    return datanodes.put(datanodeDetails.getUuid().toString(),
+        datanodeDetails) == null;
+
+  }
+
+  public void resetPipeline() {
+    // reset datanodes in pipeline and learn about them through
+    // pipeline reports on SCM restart
+    datanodes.clear();
   }
 
   public Map<String, DatanodeDetails> getDatanodes() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java
index 473ebc5..6e27a71 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java
@@ -28,7 +28,7 @@ import java.util.UUID;
  * in Ratis as RaftGroupId, GroupID is used by the datanodes to initialize
  * the ratis group they are part of.
  */
-public class PipelineID {
+public final class PipelineID implements Comparable<PipelineID> {
 
   private UUID id;
   private RaftGroupId groupId;
@@ -42,8 +42,12 @@ public class PipelineID {
     return new PipelineID(UUID.randomUUID());
   }
 
+  public static PipelineID valueOf(UUID id) {
+    return new PipelineID(id);
+  }
+
   public static PipelineID valueOf(RaftGroupId groupId) {
-    return new PipelineID(groupId.getUuid());
+    return valueOf(groupId.getUuid());
   }
 
   public RaftGroupId getRaftGroupID() {
@@ -68,6 +72,11 @@ public class PipelineID {
   }
 
   @Override
+  public int compareTo(PipelineID o) {
+    return this.id.compareTo(o.id);
+  }
+
+  @Override
   public boolean equals(Object o) {
     if (this == o) {
       return true;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index a74124e..f7681e8 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -224,6 +224,14 @@
       received from SCM to SCM. Unit could be defined with postfix
       (ns,ms,s,m,h,d)</description>
   </property>
+  <property>
+    <name>hdds.pipeline.report.interval</name>
+    <value>60000ms</value>
+    <tag>OZONE, PIPELINE, MANAGEMENT</tag>
+    <description>Time interval of the datanode to send pipeline report. Each
+      datanode periodically send pipeline report to SCM. Unit could be
+      defined with postfix (ns,ms,s,m,h,d)</description>
+  </property>
   <!--Ozone Settings-->
   <property>
     <name>ozone.administrators</name>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java
index 580d027..d505be3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java
@@ -18,12 +18,15 @@
 package org.apache.hadoop.hdds.scm;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
@@ -312,4 +315,22 @@ public final class HddsServerUtil {
     services.put(OZONE_SCM_SERVICE_ID, serviceInstances);
     return services;
   }
+
+  public static String getOzoneDatanodeRatisDirectory(Configuration conf) {
+    final String ratisDir = File.separator + "ratis";
+    String storageDir = conf.get(
+            OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR);
+
+    if (Strings.isNullOrEmpty(storageDir)) {
+      storageDir = conf.get(OzoneConfigKeys
+              .OZONE_METADATA_DIRS);
+      Preconditions.checkNotNull(storageDir, "ozone.metadata.dirs " +
+              "cannot be null, Please check your configs.");
+      storageDir = storageDir.concat(ratisDir);
+      LOG.warn("Storage directory for Ratis is not configured." +
+               "Mapping Ratis storage under {}. It is a good idea " +
+               "to map this to an SSD disk.", storageDir);
+    }
+    return storageDir;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
new file mode 100644
index 0000000..e7f4347
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.report;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.scm.HddsServerUtil;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT;
+
+
+/**
+ * Publishes Pipeline which will be sent to SCM as part of heartbeat.
+ * PipelineReport consist of the following information about each containers:
+ *   - pipelineID
+ *
+ */
+public class PipelineReportPublisher extends
+    ReportPublisher<PipelineReportsProto> {
+
+  private Long pipelineReportInterval = null;
+
+  @Override
+  protected long getReportFrequency() {
+    if (pipelineReportInterval == null) {
+      pipelineReportInterval = getConf().getTimeDuration(
+          HDDS_PIPELINE_REPORT_INTERVAL,
+          HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
+          TimeUnit.MILLISECONDS);
+
+      long heartbeatFrequency = HddsServerUtil.getScmHeartbeatInterval(
+          getConf());
+
+      Preconditions.checkState(
+          heartbeatFrequency <= pipelineReportInterval,
+              HDDS_PIPELINE_REPORT_INTERVAL +
+              " cannot be configured lower than heartbeat frequency.");
+    }
+    // Add a random delay (0~30s) on top of the pipeline report
+    // interval (60s) so tha the SCM is overwhelmed by the pipeline reports
+    // sent in sync.
+    return pipelineReportInterval + getRandomReportDelay();
+  }
+
+  private long getRandomReportDelay() {
+    return RandomUtils.nextLong(0, pipelineReportInterval);
+  }
+
+  @Override
+  protected PipelineReportsProto getReport() {
+    return getContext().getParent().getContainer().getPipelineReport();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
index ea89280..1c456a0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.container.common.report;
 import com.google.protobuf.GeneratedMessage;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.proto.
+        StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
@@ -53,6 +55,8 @@ public class ReportPublisherFactory {
         ContainerReportPublisher.class);
     report2publisher.put(CommandStatusReportsProto.class,
         CommandStatusReportPublisher.class);
+    report2publisher.put(PipelineReportsProto.class,
+            PipelineReportPublisher.class);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
index ccab095..690aa01 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
@@ -21,6 +21,8 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.ozone.container.common.statemachine
     .EndpointStateMachine;
 import org.apache.hadoop.hdds.protocol.proto
@@ -108,13 +110,15 @@ public final class RegisterEndpointTask implements
     rpcEndPoint.lock();
     try {
 
-      ContainerReportsProto contianerReport = datanodeContainerManager
+      ContainerReportsProto containerReport = datanodeContainerManager
           .getContainerReport();
       NodeReportProto nodeReport = datanodeContainerManager.getNodeReport();
+      PipelineReportsProto pipelineReportsProto =
+              datanodeContainerManager.getPipelineReport();
       // TODO : Add responses to the command Queue.
       SCMRegisteredResponseProto response = rpcEndPoint.getEndPoint()
           .register(datanodeDetails.getProtoBufMessage(), nodeReport,
-              contianerReport);
+                  containerReport, pipelineReportsProto);
       Preconditions.checkState(UUID.fromString(response.getDatanodeUUID())
               .equals(datanodeDetails.getUuid()),
           "Unexpected datanode ID in the response.");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index 4a90144..83e742c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -24,6 +24,9 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReport;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 
@@ -38,6 +41,9 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
 
 /**
  * Creates a Grpc server endpoint that acts as the communication layer for
@@ -47,6 +53,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
   private static final Logger
       LOG = LoggerFactory.getLogger(XceiverServerGrpc.class);
   private int port;
+  private UUID id;
   private Server server;
   private final ContainerDispatcher storageContainer;
 
@@ -59,6 +66,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
       ContainerDispatcher dispatcher, BindableService... additionalServices) {
     Preconditions.checkNotNull(conf);
 
+    this.id = datanodeDetails.getUuid();
     this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
         OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
     // Get an available port on current node and
@@ -123,4 +131,12 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
       HddsProtos.PipelineID pipelineID) {
     storageContainer.dispatch(request);
   }
+
+  @Override
+  public List<PipelineReport> getPipelineReport() {
+    return Collections.singletonList(
+            PipelineReport.newBuilder()
+                    .setPipelineID(PipelineID.valueOf(id).getProtobuf())
+                    .build());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
index 1863f6d..8c3fa5c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
@@ -21,8 +21,11 @@ package org.apache.hadoop.ozone.container.common.transport.server;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReport;
 
 import java.io.IOException;
+import java.util.List;
 
 /** A server endpoint that acts as the communication layer for Ozone
  * containers. */
@@ -49,4 +52,10 @@ public interface XceiverServerSpi {
   void submitRequest(ContainerCommandRequestProto request,
       HddsProtos.PipelineID pipelineID)
       throws IOException;
+
+  /**
+   * Get pipeline report for the XceiverServer instance.
+   * @return list of report for each pipeline.
+   */
+  List<PipelineReport> getPipelineReport();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 24ea0b9..d88995b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -19,17 +19,18 @@
 package org.apache.hadoop.ozone.container.common.transport.server.ratis;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReport;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineAction;
+import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -68,6 +69,8 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -96,12 +99,12 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   private final ReplicationLevel replicationLevel;
   private long nodeFailureTimeoutMs;
 
-  private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir,
+  private XceiverServerRatis(DatanodeDetails dd, int port,
       ContainerDispatcher dispatcher, Configuration conf, StateContext context)
       throws IOException {
     Objects.requireNonNull(dd, "id == null");
     this.port = port;
-    RaftProperties serverProperties = newRaftProperties(conf, storageDir);
+    RaftProperties serverProperties = newRaftProperties(conf);
     final int numWriteChunkThreads = conf.getInt(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT);
@@ -118,15 +121,13 @@ public final class XceiverServerRatis implements XceiverServerSpi {
         new ContainerStateMachine(dispatcher, chunkExecutor, this);
     this.server = RaftServer.newBuilder()
         .setServerId(RatisHelper.toRaftPeerId(dd))
-        .setGroup(RatisHelper.emptyRaftGroup())
         .setProperties(serverProperties)
         .setStateMachine(stateMachine)
         .build();
   }
 
 
-  private RaftProperties newRaftProperties(Configuration conf,
-      String storageDir) {
+  private RaftProperties newRaftProperties(Configuration conf) {
     final RaftProperties properties = new RaftProperties();
 
     // Set rpc type
@@ -235,6 +236,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     nodeFailureTimeoutMs = nodeFailureTimeout.toLong(TimeUnit.MILLISECONDS);
 
     // Set the ratis storage directory
+    String storageDir = HddsServerUtil.getOzoneDatanodeRatisDirectory(conf);
     RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
 
     // For grpc set the maximum message size
@@ -253,23 +255,9 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   public static XceiverServerRatis newXceiverServerRatis(
       DatanodeDetails datanodeDetails, Configuration ozoneConf,
       ContainerDispatcher dispatcher, StateContext context) throws IOException {
-    final String ratisDir = File.separator + "ratis";
     int localPort = ozoneConf.getInt(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT);
-    String storageDir = ozoneConf.get(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR);
-
-    if (Strings.isNullOrEmpty(storageDir)) {
-      storageDir = ozoneConf.get(OzoneConfigKeys
-          .OZONE_METADATA_DIRS);
-      Preconditions.checkNotNull(storageDir, "ozone.metadata.dirs " +
-          "cannot be null, Please check your configs.");
-      storageDir = storageDir.concat(ratisDir);
-      LOG.warn("Storage directory for Ratis is not configured. Mapping Ratis " +
-              "storage under {}. It is a good idea to map this to an SSD disk.",
-          storageDir);
-    }
 
     // Get an available port on current node and
     // use that as the container port
@@ -282,13 +270,6 @@ public final class XceiverServerRatis implements XceiverServerSpi {
         socket.bind(address);
         localPort = socket.getLocalPort();
         LOG.info("Found a free port for the server : {}", localPort);
-        // If we have random local ports configured this means that it
-        // probably running under MiniOzoneCluster. Ratis locks the storage
-        // directories, so we need to pass different local directory for each
-        // local instance. So we map ratis directories under datanode ID.
-        storageDir =
-            storageDir.concat(File.separator +
-                datanodeDetails.getUuidString());
       } catch (IOException e) {
         LOG.error("Unable find a random free port for the server, "
             + "fallback to use default port {}", localPort, e);
@@ -296,7 +277,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     }
     datanodeDetails.setPort(
         DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort));
-    return new XceiverServerRatis(datanodeDetails, localPort, storageDir,
+    return new XceiverServerRatis(datanodeDetails, localPort,
         dispatcher, ozoneConf, context);
   }
 
@@ -363,7 +344,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   public void submitRequest(
       ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID)
       throws IOException {
-    // ReplicationLevel.ALL ensures the transactions corresponding to
+    // ReplicationLevel.MAJORITY ensures the transactions corresponding to
     // the request here are applied on all the raft servers.
     RaftClientRequest raftClientRequest =
         createRaftClientRequest(request, pipelineID,
@@ -427,13 +408,27 @@ public final class XceiverServerRatis implements XceiverServerSpi {
             + ".Reason : " + action.getClosePipeline().getDetailedReason());
   }
 
-  void handleNodeSlowness(
-      RaftGroup group, RoleInfoProto roleInfoProto) {
+  @Override
+  public List<PipelineReport> getPipelineReport() {
+    try {
+      Iterable<RaftGroupId> gids = server.getGroupIds();
+      List<PipelineReport> reports = new ArrayList<>();
+      for (RaftGroupId groupId : gids) {
+        reports.add(PipelineReport.newBuilder()
+                .setPipelineID(PipelineID.valueOf(groupId).getProtobuf())
+                .build());
+      }
+      return reports;
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  void handleNodeSlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
     handlePipelineFailure(group.getGroupId(), roleInfoProto);
   }
 
-  void handleNoLeader(
-      RaftGroup group, RoleInfoProto roleInfoProto) {
+  void handleNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) {
     handlePipelineFailure(group.getGroupId(), roleInfoProto);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 72a5804..ebacf75 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -164,6 +166,16 @@ public class OzoneContainer {
     return this.containerSet.getContainerReport();
   }
 
+  public PipelineReportsProto getPipelineReport() {
+    PipelineReportsProto.Builder pipelineReportsProto =
+            PipelineReportsProto.newBuilder();
+    for (XceiverServerSpi serverInstance : server) {
+      pipelineReportsProto
+              .addAllPipelineReport(serverInstance.getPipelineReport());
+    }
+    return pipelineReportsProto.build();
+  }
+
   /**
    * Submit ContainerRequest.
    * @param request

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
index a950a31..9296524 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.protocol;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
@@ -69,9 +71,11 @@ public interface StorageContainerDatanodeProtocol {
    * @param containerReportsRequestProto - Container Reports.
    * @return SCM Command.
    */
-  SCMRegisteredResponseProto register(DatanodeDetailsProto datanodeDetails,
-      NodeReportProto nodeReport, ContainerReportsProto
-      containerReportsRequestProto) throws IOException;
+  SCMRegisteredResponseProto register(
+          DatanodeDetailsProto datanodeDetails,
+          NodeReportProto nodeReport,
+          ContainerReportsProto containerReportsRequestProto,
+          PipelineReportsProto pipelineReports) throws IOException;
 
   /**
    * Used by datanode to send block deletion ACK to SCM.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
index c9ef43f..b3c3eb3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.protocol;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
@@ -51,10 +53,12 @@ public interface StorageContainerNodeProtocol {
    * Register the node if the node finds that it is not registered with any SCM.
    * @param datanodeDetails DatanodeDetails
    * @param nodeReport NodeReportProto
+   * @param pipelineReport PipelineReportsProto
    * @return  SCMHeartbeatResponseProto
    */
   RegisteredCommand register(DatanodeDetails datanodeDetails,
-                             NodeReportProto nodeReport);
+                             NodeReportProto nodeReport,
+                             PipelineReportsProto pipelineReport);
 
   /**
    * Send heartbeat to indicate the datanode is alive and doing well.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
index 40fe189..b9cf6f9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
@@ -20,6 +20,8 @@ import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
@@ -149,12 +151,14 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
   @Override
   public SCMRegisteredResponseProto register(
       DatanodeDetailsProto datanodeDetailsProto, NodeReportProto nodeReport,
-      ContainerReportsProto containerReportsRequestProto)
+      ContainerReportsProto containerReportsRequestProto,
+      PipelineReportsProto pipelineReportsProto)
       throws IOException {
     SCMRegisterRequestProto.Builder req =
         SCMRegisterRequestProto.newBuilder();
     req.setDatanodeDetails(datanodeDetailsProto);
     req.setContainerReport(containerReportsRequestProto);
+    req.setPipelineReports(pipelineReportsProto);
     req.setNodeReport(nodeReport);
     final SCMRegisteredResponseProto response;
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
index 7e8bd8a..ed01822 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.protocolPB;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto;
@@ -76,8 +78,9 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
       ContainerReportsProto containerRequestProto = request
           .getContainerReport();
       NodeReportProto dnNodeReport = request.getNodeReport();
+      PipelineReportsProto pipelineReport = request.getPipelineReports();
       return impl.register(request.getDatanodeDetails(), dnNodeReport,
-          containerRequestProto);
+          containerRequestProto, pipelineReport);
     } catch (IOException e) {
       throw new ServiceException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 0a69343..78758cb 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -52,6 +52,7 @@ message SCMRegisterRequestProto {
   required DatanodeDetailsProto datanodeDetails = 1;
   required NodeReportProto nodeReport = 2;
   required ContainerReportsProto containerReport = 3;
+  required PipelineReportsProto pipelineReports = 4;
 }
 
 /**
@@ -82,6 +83,7 @@ message SCMHeartbeatRequestProto {
   optional CommandStatusReportsProto commandStatusReport = 4;
   optional ContainerActionsProto containerActions = 5;
   optional PipelineActionsProto pipelineActions = 6;
+  optional PipelineReportsProto pipelineReports = 7;
 }
 
 /*
@@ -163,6 +165,14 @@ message ContainerAction {
   optional Reason reason = 3;
 }
 
+message PipelineReport {
+  required PipelineID pipelineID = 1;
+}
+
+message PipelineReportsProto {
+  repeated PipelineReport pipelineReport = 1;
+}
+
 message PipelineActionsProto {
   repeated PipelineAction pipelineActions = 1;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
index 751775f..27b6272 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
@@ -18,6 +18,10 @@ package org.apache.hadoop.ozone.container.common;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.CommandStatus;
 import org.apache.hadoop.hdds.scm.VersionInfo;
@@ -214,8 +218,8 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
   public StorageContainerDatanodeProtocolProtos
       .SCMRegisteredResponseProto register(
           DatanodeDetailsProto datanodeDetailsProto, NodeReportProto nodeReport,
-          StorageContainerDatanodeProtocolProtos.ContainerReportsProto
-              containerReportsRequestProto)
+          ContainerReportsProto containerReportsRequestProto,
+          PipelineReportsProto pipelineReportsProto)
       throws IOException {
     rpcCount.incrementAndGet();
     updateNodeReport(datanodeDetailsProto, nodeReport);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index 206e24b..eb0a0b4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -466,24 +466,6 @@ public class ContainerMapping implements Mapping {
     return new ContainerWithPipeline(containerInfo, pipeline);
   }
 
-  public void handlePipelineClose(PipelineID pipelineID) {
-    try {
-      Pipeline pipeline = pipelineSelector.getPipeline(pipelineID);
-      if (pipeline != null) {
-        pipelineSelector.finalizePipeline(pipeline);
-      } else {
-        LOG.debug("pipeline:{} not found", pipelineID);
-      }
-    } catch (Exception e) {
-      LOG.info("failed to close pipeline:{}", pipelineID, e);
-    }
-  }
-
-  public Set<PipelineID> getPipelineOnDatanode(
-      DatanodeDetails datanodeDetails) {
-    return pipelineSelector.getPipelineId(datanodeDetails.getUuid());
-  }
-
   /**
    * Process container report from Datanode.
    * <p>
@@ -710,7 +692,6 @@ public class ContainerMapping implements Mapping {
     return containerStore;
   }
 
-  @VisibleForTesting
   public PipelineSelector getPipelineSelector() {
     return pipelineSelector;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
index dcbd49c..3f156de 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
@@ -89,20 +89,20 @@ public class ContainerReportHandler implements
           .map(ContainerID::new)
           .collect(Collectors.toSet());
 
-      ReportResult reportResult = node2ContainerMap
+      ReportResult<ContainerID> reportResult = node2ContainerMap
           .processReport(datanodeOrigin.getUuid(), containerIds);
 
       //we have the report, so we can update the states for the next iteration.
       node2ContainerMap
           .setContainersForDatanode(datanodeOrigin.getUuid(), containerIds);
 
-      for (ContainerID containerID : reportResult.getMissingContainers()) {
+      for (ContainerID containerID : reportResult.getMissingEntries()) {
         containerStateManager
             .removeContainerReplica(containerID, datanodeOrigin);
         checkReplicationState(containerID, publisher);
       }
 
-      for (ContainerID containerID : reportResult.getNewContainers()) {
+      for (ContainerID containerID : reportResult.getNewEntries()) {
         containerStateManager.addContainerReplica(containerID, datanodeOrigin);
         checkReplicationState(containerID, publisher);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
index 1b0c57c..5ed80cb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
@@ -25,13 +25,12 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * Mapping class contains the mapping from a name to a pipeline mapping. This is
@@ -138,15 +137,5 @@ public interface Mapping extends Closeable {
       String owner, ReplicationType type, ReplicationFactor factor,
       LifeCycleState state) throws IOException;
 
-  /**
-   * Handle a pipeline close event.
-   * @param pipelineID pipeline id
-   */
-  void handlePipelineClose(PipelineID pipelineID);
-
-  /**
-   * Get set of pipeline for a specific datanode.
-   * @param datanodeDetails datanode for which pipelines needs to be fetched.
-   */
-  Set<PipelineID> getPipelineOnDatanode(DatanodeDetails datanodeDetails);
+  PipelineSelector getPipelineSelector();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 9d72eb1..745e052 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -27,10 +27,13 @@ import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
     .DeleteBlockCommandStatus;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
     .ReplicationStatus;
-import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler.CloseContainerRetryableReq;
+import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler
+        .CloseContainerRetryableReq;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+        .PipelineReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .PipelineActionsFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .ContainerActionsFromDatanode;
@@ -72,8 +75,7 @@ public final class SCMEvents {
 
   /**
    * ContainerReports are send out by Datanodes. This report is received by
-   * SCMDatanodeHeartbeatDispatcher and Container_Report Event
-   * isTestSCMDatanodeHeartbeatDispatcher generated.
+   * SCMDatanodeHeartbeatDispatcher and Container_Report Event is generated.
    */
   public static final TypedEvent<ContainerReportFromDatanode> CONTAINER_REPORT =
       new TypedEvent<>(ContainerReportFromDatanode.class, "Container_Report");
@@ -87,6 +89,13 @@ public final class SCMEvents {
       "Container_Actions");
 
   /**
+   * PipelineReports are send out by Datanodes. This report is received by
+   * SCMDatanodeHeartbeatDispatcher and Pipeline_Report Event is generated.
+   */
+  public static final TypedEvent<PipelineReportFromDatanode> PIPELINE_REPORT =
+          new TypedEvent<>(PipelineReportFromDatanode.class, "Pipeline_Report");
+
+  /**
    * PipelineActions are sent by Datanode. This event is received by
    * SCMDatanodeHeartbeatDispatcher and PIPELINE_ACTIONS event is generated.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index fca08bd..58da1cc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.node;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -363,7 +365,8 @@ public class SCMNodeManager
    */
   @Override
   public RegisteredCommand register(
-      DatanodeDetails datanodeDetails, NodeReportProto nodeReport) {
+      DatanodeDetails datanodeDetails, NodeReportProto nodeReport,
+      PipelineReportsProto pipelineReportsProto) {
 
     InetAddress dnAddress = Server.getRemoteIp();
     if (dnAddress != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
index b435e77..ddbba82 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
@@ -19,17 +19,13 @@
 package org.apache.hadoop.hdds.scm.node;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.container.Mapping;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Set;
-
 /**
  * Handles Stale node event.
  */
@@ -37,22 +33,17 @@ public class StaleNodeHandler implements EventHandler<DatanodeDetails> {
   static final Logger LOG = LoggerFactory.getLogger(StaleNodeHandler.class);
 
   private final Node2ContainerMap node2ContainerMap;
-  private final Mapping containerManager;
+  private final PipelineSelector pipelineSelector;
 
   public StaleNodeHandler(Node2ContainerMap node2ContainerMap,
-      Mapping containerManager) {
+      PipelineSelector pipelineSelector) {
     this.node2ContainerMap = node2ContainerMap;
-    this.containerManager = containerManager;
+    this.pipelineSelector = pipelineSelector;
   }
 
   @Override
   public void onMessage(DatanodeDetails datanodeDetails,
                         EventPublisher publisher) {
-    Set<PipelineID> pipelineIDs =
-        containerManager.getPipelineOnDatanode(datanodeDetails);
-    for (PipelineID id : pipelineIDs) {
-      LOG.info("closing pipeline {}.", id);
-      publisher.fireEvent(SCMEvents.PIPELINE_CLOSE, id);
-    }
+    pipelineSelector.handleStaleNode(datanodeDetails);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
index 97c254b..549080a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
@@ -18,13 +18,9 @@
 
 package org.apache.hadoop.hdds.scm.node.states;
 
-import java.util.Collections;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
@@ -32,34 +28,29 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
-    .DUPLICATE_DATANODE;
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
     .NO_SUCH_DATANODE;
 
 /**
  * This data structure maintains the list of containers that is on a datanode.
  * This information is built from the DN container reports.
  */
-public class Node2ContainerMap {
-  private final Map<UUID, Set<ContainerID>> dn2ContainerMap;
+public class Node2ContainerMap extends Node2ObjectsMap<ContainerID> {
 
   /**
    * Constructs a Node2ContainerMap Object.
    */
   public Node2ContainerMap() {
-    dn2ContainerMap = new ConcurrentHashMap<>();
+    super();
   }
 
   /**
-   * Returns true if this a datanode that is already tracked by
-   * Node2ContainerMap.
+   * Returns null if there no containers associated with this datanode ID.
    *
-   * @param datanodeID - UUID of the Datanode.
-   * @return True if this is tracked, false if this map does not know about it.
+   * @param datanode - UUID
+   * @return Set of containers or Null.
    */
-  public boolean isKnownDatanode(UUID datanodeID) {
-    Preconditions.checkNotNull(datanodeID);
-    return dn2ContainerMap.containsKey(datanodeID);
+  public Set<ContainerID> getContainers(UUID datanode) {
+    return getObjects(datanode);
   }
 
   /**
@@ -70,13 +61,7 @@ public class Node2ContainerMap {
    */
   public void insertNewDatanode(UUID datanodeID, Set<ContainerID> containerIDs)
       throws SCMException {
-    Preconditions.checkNotNull(containerIDs);
-    Preconditions.checkNotNull(datanodeID);
-    if (dn2ContainerMap.putIfAbsent(datanodeID, new HashSet<>(containerIDs))
-        != null) {
-      throw new SCMException("Node already exists in the map",
-          DUPLICATE_DATANODE);
-    }
+    super.insertNewDatanode(datanodeID, containerIDs);
   }
 
   /**
@@ -91,103 +76,15 @@ public class Node2ContainerMap {
       Set<ContainerID> containers) throws SCMException {
     Preconditions.checkNotNull(datanodeID);
     Preconditions.checkNotNull(containers);
-    if (dn2ContainerMap
+    if (dn2ObjectMap
         .computeIfPresent(datanodeID, (k, v) -> new HashSet<>(containers))
         == null) {
       throw new SCMException("No such datanode", NO_SUCH_DATANODE);
     }
   }
 
-  /**
-   * Removes datanode Entry from the map.
-   *
-   * @param datanodeID - Datanode ID.
-   */
-  public void removeDatanode(UUID datanodeID) {
-    Preconditions.checkNotNull(datanodeID);
-    dn2ContainerMap.computeIfPresent(datanodeID, (k, v) -> null);
-  }
-
-  /**
-   * Returns null if there no containers associated with this datanode ID.
-   *
-   * @param datanode - UUID
-   * @return Set of containers or Null.
-   */
-  public Set<ContainerID> getContainers(UUID datanode) {
-    Preconditions.checkNotNull(datanode);
-    return dn2ContainerMap.computeIfPresent(datanode, (k, v) ->
-        Collections.unmodifiableSet(v));
-  }
-
-  public ReportResult processReport(UUID datanodeID, Set<ContainerID>
-      containers) {
-    Preconditions.checkNotNull(datanodeID);
-    Preconditions.checkNotNull(containers);
-
-    if (!isKnownDatanode(datanodeID)) {
-      return ReportResult.ReportResultBuilder.newBuilder()
-          .setStatus(ReportStatus.NEW_DATANODE_FOUND)
-          .setNewContainers(containers)
-          .build();
-    }
-
-    // Conditions like Zero length containers should be handled by removeAll.
-    Set<ContainerID> currentSet = dn2ContainerMap.get(datanodeID);
-    TreeSet<ContainerID> newContainers = new TreeSet<>(containers);
-    newContainers.removeAll(currentSet);
-
-    TreeSet<ContainerID> missingContainers = new TreeSet<>(currentSet);
-    missingContainers.removeAll(containers);
-
-    if (newContainers.isEmpty() && missingContainers.isEmpty()) {
-      return ReportResult.ReportResultBuilder.newBuilder()
-          .setStatus(ReportStatus.ALL_IS_WELL)
-          .build();
-    }
-
-    if (newContainers.isEmpty() && !missingContainers.isEmpty()) {
-      return ReportResult.ReportResultBuilder.newBuilder()
-          .setStatus(ReportStatus.MISSING_CONTAINERS)
-          .setMissingContainers(missingContainers)
-          .build();
-    }
-
-    if (!newContainers.isEmpty() && missingContainers.isEmpty()) {
-      return ReportResult.ReportResultBuilder.newBuilder()
-          .setStatus(ReportStatus.NEW_CONTAINERS_FOUND)
-          .setNewContainers(newContainers)
-          .build();
-    }
-
-    if (!newContainers.isEmpty() && !missingContainers.isEmpty()) {
-      return ReportResult.ReportResultBuilder.newBuilder()
-          .setStatus(ReportStatus.MISSING_AND_NEW_CONTAINERS_FOUND)
-          .setNewContainers(newContainers)
-          .setMissingContainers(missingContainers)
-          .build();
-    }
-
-    // default status & Make compiler happy
-    return ReportResult.ReportResultBuilder.newBuilder()
-        .setStatus(ReportStatus.ALL_IS_WELL)
-        .build();
-  }
-
-  /**
-   * Results possible from processing a container report by
-   * Node2ContainerMapper.
-   */
-  public enum ReportStatus {
-    ALL_IS_WELL,
-    MISSING_CONTAINERS,
-    NEW_CONTAINERS_FOUND,
-    MISSING_AND_NEW_CONTAINERS_FOUND,
-    NEW_DATANODE_FOUND
-  }
-
   @VisibleForTesting
   public int size() {
-    return dn2ContainerMap.size();
+    return dn2ObjectMap.size();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
new file mode 100644
index 0000000..e49a79c
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.node.states;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+
+import java.util.UUID;
+import java.util.Set;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.HashSet;
+import java.util.Collections;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.DUPLICATE_DATANODE;
+
+/**
+ * This data structure maintains the list of containers that is on a datanode.
+ * This information is built from the DN container reports.
+ */
+public class Node2ObjectsMap<T> {
+  protected final Map<UUID, Set<T>> dn2ObjectMap;
+
+  /**
+   * Constructs a Node2ContainerMap Object.
+   */
+  public Node2ObjectsMap() {
+    dn2ObjectMap = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Returns true if this a datanode that is already tracked by
+   * Node2ContainerMap.
+   *
+   * @param datanodeID - UUID of the Datanode.
+   * @return True if this is tracked, false if this map does not know about it.
+   */
+  public boolean isKnownDatanode(UUID datanodeID) {
+    Preconditions.checkNotNull(datanodeID);
+    return dn2ObjectMap.containsKey(datanodeID);
+  }
+
+  /**
+   * Insert a new datanode into Node2Container Map.
+   *
+   * @param datanodeID   -- Datanode UUID
+   * @param containerIDs - List of ContainerIDs.
+   */
+  public void insertNewDatanode(UUID datanodeID, Set<T> containerIDs)
+      throws SCMException {
+    Preconditions.checkNotNull(containerIDs);
+    Preconditions.checkNotNull(datanodeID);
+    if (dn2ObjectMap.putIfAbsent(datanodeID, new HashSet<>(containerIDs))
+        != null) {
+      throw new SCMException("Node already exists in the map",
+          DUPLICATE_DATANODE);
+    }
+  }
+
+  /**
+   * Removes datanode Entry from the map.
+   *
+   * @param datanodeID - Datanode ID.
+   */
+  void removeDatanode(UUID datanodeID) {
+    Preconditions.checkNotNull(datanodeID);
+    dn2ObjectMap.computeIfPresent(datanodeID, (k, v) -> null);
+  }
+
+  /**
+   * Returns null if there no containers associated with this datanode ID.
+   *
+   * @param datanode - UUID
+   * @return Set of containers or Null.
+   */
+  Set<T> getObjects(UUID datanode) {
+    Preconditions.checkNotNull(datanode);
+    final Set<T> s = dn2ObjectMap.get(datanode);
+    return s != null? Collections.unmodifiableSet(s): Collections.emptySet();
+  }
+
+  public ReportResult.ReportResultBuilder<T> newBuilder() {
+    return new ReportResult.ReportResultBuilder<>();
+  }
+
+  public ReportResult<T> processReport(UUID datanodeID, Set<T> objects) {
+    Preconditions.checkNotNull(datanodeID);
+    Preconditions.checkNotNull(objects);
+
+    if (!isKnownDatanode(datanodeID)) {
+      return newBuilder()
+          .setStatus(ReportResult.ReportStatus.NEW_DATANODE_FOUND)
+          .setNewEntries(objects)
+          .build();
+    }
+
+    // Conditions like Zero length containers should be handled by removeAll.
+    Set<T> currentSet = dn2ObjectMap.get(datanodeID);
+    TreeSet<T> newObjects = new TreeSet<>(objects);
+    newObjects.removeAll(currentSet);
+
+    TreeSet<T> missingObjects = new TreeSet<>(currentSet);
+    missingObjects.removeAll(objects);
+
+    if (newObjects.isEmpty() && missingObjects.isEmpty()) {
+      return newBuilder()
+          .setStatus(ReportResult.ReportStatus.ALL_IS_WELL)
+          .build();
+    }
+
+    if (newObjects.isEmpty() && !missingObjects.isEmpty()) {
+      return newBuilder()
+          .setStatus(ReportResult.ReportStatus.MISSING_ENTRIES)
+          .setMissingEntries(missingObjects)
+          .build();
+    }
+
+    if (!newObjects.isEmpty() && missingObjects.isEmpty()) {
+      return newBuilder()
+          .setStatus(ReportResult.ReportStatus.NEW_ENTRIES_FOUND)
+          .setNewEntries(newObjects)
+          .build();
+    }
+
+    if (!newObjects.isEmpty() && !missingObjects.isEmpty()) {
+      return newBuilder()
+          .setStatus(ReportResult.ReportStatus.MISSING_AND_NEW_ENTRIES_FOUND)
+          .setNewEntries(newObjects)
+          .setMissingEntries(missingObjects)
+          .build();
+    }
+
+    // default status & Make compiler happy
+    return newBuilder()
+        .setStatus(ReportResult.ReportStatus.ALL_IS_WELL)
+        .build();
+  }
+
+  @VisibleForTesting
+  public int size() {
+    return dn2ObjectMap.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
index 9bb6cf1..0c7610f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
@@ -19,83 +19,92 @@
 
 package org.apache.hadoop.hdds.scm.node.states;
 
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-
 import java.util.Collections;
 import java.util.Set;
 
 import com.google.common.base.Preconditions;
 
 /**
- * A Container Report gets processsed by the Node2Container and returns
- * Report Result class.
+ * A Container/Pipeline Report gets processed by the
+ * Node2Container/Node2Pipeline and returns Report Result class.
  */
-public class ReportResult {
-  private Node2ContainerMap.ReportStatus status;
-  private Set<ContainerID> missingContainers;
-  private Set<ContainerID> newContainers;
-
-  ReportResult(Node2ContainerMap.ReportStatus status,
-      Set<ContainerID> missingContainers,
-      Set<ContainerID> newContainers) {
+public final class ReportResult<T> {
+  private ReportStatus status;
+  private Set<T> missingEntries;
+  private Set<T> newEntries;
+
+  private ReportResult(ReportStatus status,
+      Set<T> missingEntries,
+      Set<T> newEntries) {
     this.status = status;
-    Preconditions.checkNotNull(missingContainers);
-    Preconditions.checkNotNull(newContainers);
-    this.missingContainers = missingContainers;
-    this.newContainers = newContainers;
+    Preconditions.checkNotNull(missingEntries);
+    Preconditions.checkNotNull(newEntries);
+    this.missingEntries = missingEntries;
+    this.newEntries = newEntries;
   }
 
-  public Node2ContainerMap.ReportStatus getStatus() {
+  public ReportStatus getStatus() {
     return status;
   }
 
-  public Set<ContainerID> getMissingContainers() {
-    return missingContainers;
+  public Set<T> getMissingEntries() {
+    return missingEntries;
   }
 
-  public Set<ContainerID> getNewContainers() {
-    return newContainers;
+  public Set<T> getNewEntries() {
+    return newEntries;
   }
 
-  static class ReportResultBuilder {
-    private Node2ContainerMap.ReportStatus status;
-    private Set<ContainerID> missingContainers;
-    private Set<ContainerID> newContainers;
-
-    static ReportResultBuilder newBuilder() {
-      return new ReportResultBuilder();
-    }
-
-    public ReportResultBuilder setStatus(
-        Node2ContainerMap.ReportStatus newstatus) {
-      this.status = newstatus;
+  /**
+   * Result after processing report for node2Object map.
+   * @param <T>
+   */
+  public static class ReportResultBuilder<T> {
+    private ReportStatus status;
+    private Set<T> missingEntries;
+    private Set<T> newEntries;
+
+    public ReportResultBuilder<T> setStatus(
+        ReportStatus newStatus) {
+      this.status = newStatus;
       return this;
     }
 
-    public ReportResultBuilder setMissingContainers(
-        Set<ContainerID> missingContainersLit) {
-      this.missingContainers = missingContainersLit;
+    public ReportResultBuilder<T> setMissingEntries(
+        Set<T> missingEntriesList) {
+      this.missingEntries = missingEntriesList;
       return this;
     }
 
-    public ReportResultBuilder setNewContainers(
-        Set<ContainerID> newContainersList) {
-      this.newContainers = newContainersList;
+    public ReportResultBuilder<T> setNewEntries(
+        Set<T> newEntriesList) {
+      this.newEntries = newEntriesList;
       return this;
     }
 
-    ReportResult build() {
+    public ReportResult<T> build() {
 
-      Set<ContainerID> nullSafeMissingContainers = this.missingContainers;
-      Set<ContainerID> nullSafeNewContainers = this.newContainers;
-      if (nullSafeNewContainers == null) {
-        nullSafeNewContainers = Collections.emptySet();
+      Set<T> nullSafeMissingEntries = this.missingEntries;
+      Set<T> nullSafeNewEntries = this.newEntries;
+      if (nullSafeNewEntries == null) {
+        nullSafeNewEntries = Collections.emptySet();
       }
-      if (nullSafeMissingContainers == null) {
-        nullSafeMissingContainers = Collections.emptySet();
+      if (nullSafeMissingEntries == null) {
+        nullSafeMissingEntries = Collections.emptySet();
       }
-      return new ReportResult(status, nullSafeMissingContainers,
-          nullSafeNewContainers);
+      return new ReportResult<T>(status, nullSafeMissingEntries,
+              nullSafeNewEntries);
     }
   }
+
+  /**
+   * Results possible from processing a report.
+   */
+  public enum ReportStatus {
+    ALL_IS_WELL,
+    MISSING_ENTRIES,
+    NEW_ENTRIES_FOUND,
+    MISSING_AND_NEW_ENTRIES_FOUND,
+    NEW_DATANODE_FOUND,
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
index 363ce71..87f2222 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
@@ -16,19 +16,15 @@
  *
  */
 
-package org.apache.hadoop.hdds.scm.pipelines;
+package org.apache.hadoop.hdds.scm.node.states;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 
-import java.util.Collections;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * This data structure maintains the list of pipelines which the given datanode is a part of. This
@@ -36,33 +32,11 @@ import java.util.concurrent.ConcurrentHashMap;
  *
  * <p>TODO: this information needs to be regenerated from pipeline reports on SCM restart
  */
-public class Node2PipelineMap {
-  private final Map<UUID, Set<PipelineID>> dn2PipelineMap;
+public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> {
 
   /** Constructs a Node2PipelineMap Object. */
   public Node2PipelineMap() {
-    dn2PipelineMap = new ConcurrentHashMap<>();
-  }
-
-  /**
-   * Returns true if this a datanode that is already tracked by Node2PipelineMap.
-   *
-   * @param datanodeID - UUID of the Datanode.
-   * @return True if this is tracked, false if this map does not know about it.
-   */
-  private boolean isKnownDatanode(UUID datanodeID) {
-    Preconditions.checkNotNull(datanodeID);
-    return dn2PipelineMap.containsKey(datanodeID);
-  }
-
-  /**
-   * Removes datanode Entry from the map.
-   *
-   * @param datanodeID - Datanode ID.
-   */
-  public synchronized void removeDatanode(UUID datanodeID) {
-    Preconditions.checkNotNull(datanodeID);
-    dn2PipelineMap.computeIfPresent(datanodeID, (k, v) -> null);
+    super();
   }
 
   /**
@@ -72,9 +46,7 @@ public class Node2PipelineMap {
    * @return Set of pipelines or Null.
    */
   public Set<PipelineID> getPipelines(UUID datanode) {
-    Preconditions.checkNotNull(datanode);
-    final Set<PipelineID> s = dn2PipelineMap.get(datanode);
-    return s != null? Collections.unmodifiableSet(s): Collections.emptySet();
+    return getObjects(datanode);
   }
 
   /**
@@ -85,7 +57,7 @@ public class Node2PipelineMap {
   public synchronized void addPipeline(Pipeline pipeline) {
     for (DatanodeDetails details : pipeline.getDatanodes().values()) {
       UUID dnId = details.getUuid();
-      dn2PipelineMap.computeIfAbsent(dnId, k -> new HashSet<>())
+      dn2ObjectMap.computeIfAbsent(dnId, k -> new HashSet<>())
           .add(pipeline.getId());
     }
   }
@@ -93,16 +65,11 @@ public class Node2PipelineMap {
   public synchronized void removePipeline(Pipeline pipeline) {
     for (DatanodeDetails details : pipeline.getDatanodes().values()) {
       UUID dnId = details.getUuid();
-      dn2PipelineMap.computeIfPresent(
-          dnId,
+      dn2ObjectMap.computeIfPresent(dnId,
           (k, v) -> {
             v.remove(pipeline.getId());
             return v;
           });
     }
   }
-
-  public Map<UUID, Set<PipelineID>> getDn2PipelineMap() {
-    return Collections.unmodifiableMap(dn2PipelineMap);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
index 733dec5..e49678f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
@@ -17,22 +17,36 @@
 
 package org.apache.hadoop.hdds.scm.pipelines;
 
-import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Handles pipeline close event.
  */
 public class PipelineCloseHandler implements EventHandler<PipelineID> {
-  private final Mapping mapping;
-  public PipelineCloseHandler(Mapping mapping) {
-    this.mapping = mapping;
+  private static final Logger LOG = LoggerFactory
+          .getLogger(PipelineCloseHandler.class);
+
+  private final PipelineSelector pipelineSelector;
+  public PipelineCloseHandler(PipelineSelector pipelineSelector) {
+    this.pipelineSelector = pipelineSelector;
   }
 
   @Override
   public void onMessage(PipelineID pipelineID, EventPublisher publisher) {
-    mapping.handlePipelineClose(pipelineID);
+    Pipeline pipeline = pipelineSelector.getPipeline(pipelineID);
+    try {
+      if (pipeline != null) {
+        pipelineSelector.finalizePipeline(pipeline);
+      } else {
+        LOG.debug("pipeline:{} not found", pipelineID);
+      }
+    } catch (Exception e) {
+      LOG.info("failed to close pipeline:{}", pipelineID, e);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
index 07ff2b0..ca2e878 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
@@ -18,6 +18,8 @@ package org.apache.hadoop.hdds.scm.pipelines;
 
 import java.util.ArrayList;
 import java.util.LinkedList;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -36,7 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 public abstract class PipelineManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(PipelineManager.class);
-  private final ArrayList<ActivePipelines> activePipelines;
+  protected final ArrayList<ActivePipelines> activePipelines;
 
   public PipelineManager() {
     activePipelines = new ArrayList<>();
@@ -45,7 +47,10 @@ public abstract class PipelineManager {
     }
   }
 
-  private static class ActivePipelines {
+  /**
+   * List of active pipelines.
+   */
+  public static class ActivePipelines {
     private final List<PipelineID> activePipelines;
     private final AtomicInteger pipelineIndex;
 
@@ -55,10 +60,12 @@ public abstract class PipelineManager {
     }
 
     void addPipeline(PipelineID pipelineID) {
-      activePipelines.add(pipelineID);
+      if (!activePipelines.contains(pipelineID)) {
+        activePipelines.add(pipelineID);
+      }
     }
 
-    void removePipeline(PipelineID pipelineID) {
+    public void removePipeline(PipelineID pipelineID) {
       activePipelines.remove(pipelineID);
     }
 
@@ -117,17 +124,6 @@ public abstract class PipelineManager {
             .addPipeline(pipeline.getId());
   }
 
-  protected static int getReplicationCount(ReplicationFactor factor) {
-    switch (factor) {
-    case ONE:
-      return 1;
-    case THREE:
-      return 3;
-    default:
-      throw new IllegalArgumentException("Unexpected replication count");
-    }
-  }
-
   public abstract Pipeline allocatePipeline(
       ReplicationFactor replicationFactor);
 
@@ -137,6 +133,14 @@ public abstract class PipelineManager {
    */
   public abstract void initializePipeline(Pipeline pipeline) throws IOException;
 
+  public void processPipelineReport(Pipeline pipeline, DatanodeDetails dn) {
+    if (pipeline.addMember(dn)
+        &&(pipeline.getDatanodes().size() == pipeline.getFactor().getNumber())
+        && pipeline.getLifeCycleState() == HddsProtos.LifeCycleState.OPEN) {
+      addOpenPipeline(pipeline);
+    }
+  }
+
   /**
    * Creates a pipeline with a specified replication factor and type.
    * @param replicationFactor - Replication Factor.
@@ -157,27 +161,11 @@ public abstract class PipelineManager {
    * Remove the pipeline from active allocation.
    * @param pipeline pipeline to be finalized
    */
-  public synchronized void finalizePipeline(Pipeline pipeline) {
-    activePipelines.get(pipeline.getFactor().ordinal())
-            .removePipeline(pipeline.getId());
-  }
+  public abstract boolean finalizePipeline(Pipeline pipeline);
 
   /**
    *
    * @param pipeline
    */
   public abstract void closePipeline(Pipeline pipeline) throws IOException;
-
-  /**
-   * list members in the pipeline.
-   * @return the datanode
-   */
-  public abstract List<DatanodeDetails> getMembers(PipelineID pipelineID)
-      throws IOException;
-
-  /**
-   * Update the datanode list of the pipeline.
-   */
-  public abstract void updatePipeline(PipelineID pipelineID,
-      List<DatanodeDetails> newDatanodes) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java
new file mode 100644
index 0000000..933792b
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipelines;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.scm.server
+        .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles Node Reports from datanode.
+ */
+public class PipelineReportHandler implements
+        EventHandler<PipelineReportFromDatanode> {
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(PipelineReportHandler.class);
+  private final PipelineSelector pipelineSelector;
+
+  public PipelineReportHandler(PipelineSelector pipelineSelector) {
+    Preconditions.checkNotNull(pipelineSelector);
+    this.pipelineSelector = pipelineSelector;
+  }
+
+  @Override
+  public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
+      EventPublisher publisher) {
+    Preconditions.checkNotNull(pipelineReportFromDatanode);
+    DatanodeDetails dn = pipelineReportFromDatanode.getDatanodeDetails();
+    PipelineReportsProto pipelineReport =
+            pipelineReportFromDatanode.getReport();
+    Preconditions.checkNotNull(dn, "Pipeline Report is "
+        + "missing DatanodeDetails.");
+    LOGGER.trace("Processing pipeline report for dn: {}", dn);
+    pipelineSelector.processPipelineReport(dn, pipelineReport);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org