You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/05/29 19:49:09 UTC

[1/3] hadoop git commit: HDDS-81. Moving ContainerReport inside Datanode heartbeat. Contributed by Nanda Kumar.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 4827e9a90 -> 201440b98


http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationDatanodeStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationDatanodeStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationDatanodeStateManager.java
deleted file mode 100644
index 50fd18f..0000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationDatanodeStateManager.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.ozone.container.testutils;
-
-import com.google.common.primitives.Longs;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.node.NodePoolManager;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerInfo;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
-    .HEALTHY;
-
-/**
- * This class  manages the state of datanode
- * in conjunction with the node pool and node managers.
- */
-public class ReplicationDatanodeStateManager {
-  private final NodeManager nodeManager;
-  private final NodePoolManager poolManager;
-  private final Random r;
-
-  /**
-   * The datanode state Manager.
-   *
-   * @param nodeManager
-   * @param poolManager
-   */
-  public ReplicationDatanodeStateManager(NodeManager nodeManager,
-      NodePoolManager poolManager) {
-    this.nodeManager = nodeManager;
-    this.poolManager = poolManager;
-    r = new Random();
-  }
-
-  /**
-   * Get Container Report as if it is from a datanode in the cluster.
-   * @param containerID - Container ID.
-   * @param poolName - Pool Name.
-   * @param dataNodeCount - Datanode Count.
-   * @return List of Container Reports.
-   */
-  public List<ContainerReportsRequestProto> getContainerReport(
-      long containerID, String poolName, int dataNodeCount) {
-    List<ContainerReportsRequestProto> containerList = new LinkedList<>();
-    List<DatanodeDetails> nodesInPool = poolManager.getNodes(poolName);
-
-    if (nodesInPool == null) {
-      return containerList;
-    }
-
-    if (nodesInPool.size() < dataNodeCount) {
-      throw new IllegalStateException("Not enough datanodes to create " +
-          "required container reports");
-    }
-
-    while (containerList.size() < dataNodeCount && nodesInPool.size() > 0) {
-      DatanodeDetails id = nodesInPool.get(r.nextInt(nodesInPool.size()));
-      nodesInPool.remove(id);
-      containerID++;
-      // We return container reports only for nodes that are healthy.
-      if (nodeManager.getNodeState(id) == HEALTHY) {
-        ContainerInfo info = ContainerInfo.newBuilder()
-            .setContainerID(containerID)
-            .setFinalhash(DigestUtils.sha256Hex(
-                Longs.toByteArray(containerID)))
-            .setContainerID(containerID)
-            .build();
-        ContainerReportsRequestProto containerReport =
-            ContainerReportsRequestProto.newBuilder().addReports(info)
-            .setDatanodeDetails(id.getProtoBufMessage())
-            .setType(ContainerReportsRequestProto.reportType.fullReport)
-            .build();
-        containerList.add(containerReport);
-      }
-    }
-    return containerList;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 3f814d0..072d821 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -24,13 +24,13 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodePoolManager;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
+import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.mockito.Mockito;
 
@@ -277,12 +277,12 @@ public class ReplicationNodeManagerMock implements NodeManager {
    * Register the node if the node finds that it is not registered with any SCM.
    *
    * @param dd DatanodeDetailsProto
-   * @param nodeReport SCMNodeReport
+   * @param nodeReport NodeReportProto
    * @return SCMHeartbeatResponseProto
    */
   @Override
-  public SCMCommand register(HddsProtos.DatanodeDetailsProto dd,
-                             SCMNodeReport nodeReport) {
+  public RegisteredCommand register(DatanodeDetails dd,
+                                    NodeReportProto nodeReport) {
     return null;
   }
 
@@ -294,8 +294,8 @@ public class ReplicationNodeManagerMock implements NodeManager {
    * @return SCMheartbeat response list
    */
   @Override
-  public List<SCMCommand> sendHeartbeat(HddsProtos.DatanodeDetailsProto dd,
-      SCMNodeReport nodeReport) {
+  public List<SCMCommand> sendHeartbeat(DatanodeDetails dd,
+      NodeReportProto nodeReport) {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index a0d41a8..0c1d8f2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -32,8 +32,10 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption;
 import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
@@ -302,12 +304,11 @@ public class TestStorageContainerManager {
       NodeManager nodeManager = cluster.getStorageContainerManager()
           .getScmNodeManager();
       List<SCMCommand> commands = nodeManager.sendHeartbeat(
-          nodeManager.getNodes(NodeState.HEALTHY).get(0).getProtoBufMessage(),
-          null);
+          nodeManager.getNodes(NodeState.HEALTHY).get(0), null);
 
       if (commands != null) {
         for (SCMCommand cmd : commands) {
-          if (cmd.getType() == SCMCmdType.deleteBlocksCommand) {
+          if (cmd.getType() == SCMCommandProto.Type.deleteBlocksCommand) {
             List<DeletedBlocksTransaction> deletedTXs =
                 ((DeleteBlocksCommand) cmd).blocksTobeDeleted();
             return deletedTXs != null && deletedTXs.size() == limitSize;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java
index 1d19bb3..1dbe760 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java
@@ -32,8 +32,10 @@ import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerReport;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
 import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
@@ -75,11 +77,11 @@ public class TestSCMMetrics {
       ContainerStat stat = new ContainerStat(size, used, keyCount, readBytes,
           writeBytes, readCount, writeCount);
       StorageContainerManager scmManager = cluster.getStorageContainerManager();
-
-      ContainerReportsRequestProto request = createContainerReport(numReport,
-          stat, null);
-      String fstDatanodeUuid = request.getDatanodeDetails().getUuid();
-      scmManager.getDatanodeProtocolServer().sendContainerReport(request);
+      DatanodeDetails fstDatanodeDetails = TestUtils.getDatanodeDetails();
+      ContainerReportsProto request = createContainerReport(numReport, stat);
+      String fstDatanodeUuid = fstDatanodeDetails.getUuidString();
+      scmManager.getDatanodeProtocolServer().processContainerReports(
+          fstDatanodeDetails, request);
 
       // verify container stat metrics
       MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
@@ -100,9 +102,11 @@ public class TestSCMMetrics {
           getLongGauge("LastContainerReportWriteCount", scmMetrics));
 
       // add one new report
-      request = createContainerReport(1, stat, null);
-      String sndDatanodeUuid = request.getDatanodeDetails().getUuid();
-      scmManager.getDatanodeProtocolServer().sendContainerReport(request);
+      DatanodeDetails sndDatanodeDetails = TestUtils.getDatanodeDetails();
+      request = createContainerReport(1, stat);
+      String sndDatanodeUuid = sndDatanodeDetails.getUuidString();
+      scmManager.getDatanodeProtocolServer().processContainerReports(
+          sndDatanodeDetails, request);
 
       scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
       assertEquals(size * (numReport + 1),
@@ -124,12 +128,12 @@ public class TestSCMMetrics {
       // Re-send reports but with different value for validating
       // the aggregation.
       stat = new ContainerStat(100, 50, 3, 50, 60, 5, 6);
-      scmManager.getDatanodeProtocolServer().sendContainerReport(
-          createContainerReport(1, stat, fstDatanodeUuid));
+      scmManager.getDatanodeProtocolServer().processContainerReports(
+          fstDatanodeDetails, createContainerReport(1, stat));
 
       stat = new ContainerStat(1, 1, 1, 1, 1, 1, 1);
-      scmManager.getDatanodeProtocolServer().sendContainerReport(
-          createContainerReport(1, stat, sndDatanodeUuid));
+      scmManager.getDatanodeProtocolServer().processContainerReports(
+          sndDatanodeDetails, createContainerReport(1, stat));
 
       // the global container metrics value should be updated
       scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
@@ -170,11 +174,11 @@ public class TestSCMMetrics {
           writeBytes, readCount, writeCount);
       StorageContainerManager scmManager = cluster.getStorageContainerManager();
 
-      String datanodeUuid = cluster.getHddsDatanodes().get(0)
-          .getDatanodeDetails().getUuidString();
-      ContainerReportsRequestProto request = createContainerReport(numReport,
-          stat, datanodeUuid);
-      scmManager.getDatanodeProtocolServer().sendContainerReport(request);
+      DatanodeDetails datanodeDetails = cluster.getHddsDatanodes().get(0)
+          .getDatanodeDetails();
+      ContainerReportsProto request = createContainerReport(numReport, stat);
+      scmManager.getDatanodeProtocolServer().processContainerReports(
+          datanodeDetails, request);
 
       MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
       assertEquals(size * numReport,
@@ -216,11 +220,11 @@ public class TestSCMMetrics {
     }
   }
 
-  private ContainerReportsRequestProto createContainerReport(int numReport,
-      ContainerStat stat, String datanodeUuid) {
-    StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder
+  private ContainerReportsProto createContainerReport(int numReport,
+      ContainerStat stat) {
+    StorageContainerDatanodeProtocolProtos.ContainerReportsProto.Builder
         reportsBuilder = StorageContainerDatanodeProtocolProtos
-        .ContainerReportsRequestProto.newBuilder();
+        .ContainerReportsProto.newBuilder();
 
     for (int i = 0; i < numReport; i++) {
       ContainerReport report = new ContainerReport(
@@ -234,24 +238,6 @@ public class TestSCMMetrics {
       report.setWriteBytes(stat.getWriteBytes().get());
       reportsBuilder.addReports(report.getProtoBufMessage());
     }
-
-    DatanodeDetails datanodeDetails;
-    if (datanodeUuid == null) {
-      datanodeDetails = TestUtils.getDatanodeDetails();
-    } else {
-      datanodeDetails = DatanodeDetails.newBuilder()
-          .setUuid(datanodeUuid)
-          .setIpAddress("127.0.0.1")
-          .setHostName("localhost")
-          .setContainerPort(0)
-          .setRatisPort(0)
-          .setOzoneRestPort(0)
-          .build();
-    }
-
-    reportsBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage());
-    reportsBuilder.setType(StorageContainerDatanodeProtocolProtos
-        .ContainerReportsRequestProto.reportType.fullReport);
     return reportsBuilder.build();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[3/3] hadoop git commit: HDDS-81. Moving ContainerReport inside Datanode heartbeat. Contributed by Nanda Kumar.

Posted by ae...@apache.org.
HDDS-81. Moving ContainerReport inside Datanode heartbeat.
Contributed by Nanda Kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/201440b9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/201440b9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/201440b9

Branch: refs/heads/trunk
Commit: 201440b987d5ef3910c2045b2411c213ed6eec1f
Parents: 4827e9a
Author: Anu Engineer <ae...@apache.org>
Authored: Tue May 29 12:40:27 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Tue May 29 12:48:50 2018 -0700

----------------------------------------------------------------------
 .../common/impl/ContainerManagerImpl.java       |  22 +-
 .../common/impl/StorageLocationReport.java      |   8 +-
 .../common/interfaces/ContainerManager.java     |   8 +-
 .../statemachine/DatanodeStateMachine.java      |   7 +-
 .../common/statemachine/StateContext.java       |  16 +-
 .../CloseContainerCommandHandler.java           | 113 ++++++++
 .../commandhandler/CloseContainerHandler.java   | 113 --------
 .../commandhandler/CommandDispatcher.java       |   5 +-
 .../commandhandler/CommandHandler.java          |   8 +-
 .../DeleteBlocksCommandHandler.java             |  12 +-
 .../states/endpoint/HeartbeatEndpointTask.java  |  30 +-
 .../states/endpoint/RegisterEndpointTask.java   |  12 +-
 .../container/ozoneimpl/OzoneContainer.java     |  10 +-
 .../StorageContainerDatanodeProtocol.java       |  30 +-
 .../protocol/StorageContainerNodeProtocol.java  |  15 +-
 .../commands/CloseContainerCommand.java         |  18 +-
 .../protocol/commands/DeleteBlocksCommand.java  |  18 +-
 .../protocol/commands/RegisteredCommand.java    |  26 +-
 .../protocol/commands/ReregisterCommand.java    |  16 +-
 .../ozone/protocol/commands/SCMCommand.java     |   4 +-
 ...rDatanodeProtocolClientSideTranslatorPB.java |  50 +---
 ...rDatanodeProtocolServerSideTranslatorPB.java |  53 ++--
 .../StorageContainerDatanodeProtocol.proto      | 256 ++++++++---------
 .../ozone/container/common/ScmTestMock.java     |  78 ++----
 .../hdds/scm/container/ContainerMapping.java    |  10 +-
 .../hadoop/hdds/scm/container/Mapping.java      |   6 +-
 .../replication/ContainerSupervisor.java        |  13 +-
 .../container/replication/InProgressPool.java   |  15 +-
 .../hdds/scm/node/HeartbeatQueueItem.java       |  14 +-
 .../hadoop/hdds/scm/node/SCMNodeManager.java    |  58 ++--
 .../hdds/scm/node/SCMNodeStorageStatMap.java    |  14 +-
 .../scm/server/SCMDatanodeProtocolServer.java   | 195 +++++++------
 .../org/apache/hadoop/hdds/scm/TestUtils.java   |  19 +-
 .../hdds/scm/container/MockNodeManager.java     |  26 +-
 .../scm/container/TestContainerMapping.java     |  24 +-
 .../container/closer/TestContainerCloser.java   |  12 +-
 .../hdds/scm/node/TestContainerPlacement.java   |   6 +-
 .../hadoop/hdds/scm/node/TestNodeManager.java   |  83 +++---
 .../scm/node/TestSCMNodeStorageStatMap.java     |  16 +-
 .../ozone/container/common/TestEndPoint.java    | 113 ++------
 .../replication/TestContainerSupervisor.java    | 275 -------------------
 .../ReplicationDatanodeStateManager.java        | 101 -------
 .../testutils/ReplicationNodeManagerMock.java   |  14 +-
 .../ozone/TestStorageContainerManager.java      |  11 +-
 .../apache/hadoop/ozone/scm/TestSCMMetrics.java |  68 ++---
 45 files changed, 706 insertions(+), 1315 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
index 9355364..af47015 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
@@ -35,11 +35,11 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+    .StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -854,11 +854,11 @@ public class ContainerManagerImpl implements ContainerManager {
    * @return node report.
    */
   @Override
-  public SCMNodeReport getNodeReport() throws IOException {
+  public NodeReportProto getNodeReport() throws IOException {
     StorageLocationReport[] reports = locationManager.getLocationReport();
-    SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
+    NodeReportProto.Builder nrb = NodeReportProto.newBuilder();
     for (int i = 0; i < reports.length; i++) {
-      SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
+      StorageReportProto.Builder srb = StorageReportProto.newBuilder();
       nrb.addStorageReport(reports[i].getProtoBufMessage());
     }
     return nrb.build();
@@ -891,7 +891,7 @@ public class ContainerManagerImpl implements ContainerManager {
    * @throws IOException
    */
   @Override
-  public ContainerReportsRequestProto getContainerReport() throws IOException {
+  public ContainerReportsProto getContainerReport() throws IOException {
     LOG.debug("Starting container report iteration.");
     // No need for locking since containerMap is a ConcurrentSkipListMap
     // And we can never get the exact state since close might happen
@@ -899,12 +899,8 @@ public class ContainerManagerImpl implements ContainerManager {
     List<ContainerData> containers = containerMap.values().stream()
         .collect(Collectors.toList());
 
-    ContainerReportsRequestProto.Builder crBuilder =
-        ContainerReportsRequestProto.newBuilder();
-
-    // TODO: support delta based container report
-    crBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
-        .setType(ContainerReportsRequestProto.reportType.fullReport);
+    ContainerReportsProto.Builder crBuilder =
+        ContainerReportsProto.newBuilder();
 
     for (ContainerData container: containers) {
       long containerId = container.getContainerID();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java
index a5ad6c2..87b9656 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.ozone.container.common.impl;
 
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdds.protocol.proto.
-    StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+    StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.StorageTypeProto;
 
@@ -137,8 +137,8 @@ public class StorageLocationReport {
    * @return SCMStorageReport
    * @throws IOException In case, the storage type specified is invalid.
    */
-  public SCMStorageReport getProtoBufMessage() throws IOException{
-    SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
+  public StorageReportProto getProtoBufMessage() throws IOException{
+    StorageReportProto.Builder srb = StorageReportProto.newBuilder();
     return srb.setStorageUuid(getId())
         .setCapacity(getCapacity())
         .setScmUsed(getScmUsed())
@@ -156,7 +156,7 @@ public class StorageLocationReport {
    * @throws IOException in case of invalid storage type
    */
 
-  public static StorageLocationReport getFromProtobuf(SCMStorageReport report)
+  public static StorageLocationReport getFromProtobuf(StorageReportProto report)
       throws IOException {
     StorageLocationReport.Builder builder = StorageLocationReport.newBuilder();
     builder.setId(report.getStorageUuid())

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
index ba70953..49b68dc 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
@@ -27,9 +27,9 @@ import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.util.RwLock;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
 
 import java.io.IOException;
@@ -171,14 +171,14 @@ public interface ContainerManager extends RwLock {
    * Get the Node Report of container storage usage.
    * @return node report.
    */
-  SCMNodeReport getNodeReport() throws IOException;
+  NodeReportProto getNodeReport() throws IOException;
 
   /**
    * Gets container report.
    * @return container report.
    * @throws IOException
    */
-  ContainerReportsRequestProto getContainerReport() throws IOException;
+  ContainerReportsProto getContainerReport() throws IOException;
 
   /**
    * Gets container reports.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index a8fe494..d0a4217 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -21,8 +21,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
-    .CloseContainerHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .CommandDispatcher;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
@@ -86,7 +85,7 @@ public class DatanodeStateMachine implements Closeable {
      // When we add new handlers just adding a new handler here should do the
      // trick.
     commandDispatcher = CommandDispatcher.newBuilder()
-        .addHandler(new CloseContainerHandler())
+        .addHandler(new CloseContainerCommandHandler())
         .addHandler(new DeleteBlocksCommandHandler(
             container.getContainerManager(), conf))
         .setConnectionManager(connectionManager)
@@ -131,7 +130,7 @@ public class DatanodeStateMachine implements Closeable {
       try {
         LOG.debug("Executing cycle Number : {}", context.getExecutionCount());
         nextHB.set(Time.monotonicNow() + heartbeatFrequency);
-        context.setReportState(container.getNodeReport());
+        context.setNodeReport(container.getNodeReport());
         context.execute(executorService, heartbeatFrequency,
             TimeUnit.MILLISECONDS);
         now = Time.monotonicNow();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 27eb57e..4e3c610 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.container.common.statemachine;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.ozone.container.common.states.DatanodeState;
 import org.apache.hadoop.ozone.container.common.states.datanode
     .InitDatanodeState;
@@ -52,7 +52,7 @@ public class StateContext {
   private final AtomicLong stateExecutionCount;
   private final Configuration conf;
   private DatanodeStateMachine.DatanodeStates state;
-  private SCMNodeReport nrState;
+  private NodeReportProto dnReport;
 
   /**
    * Constructs a StateContext.
@@ -69,7 +69,7 @@ public class StateContext {
     commandQueue = new LinkedList<>();
     lock = new ReentrantLock();
     stateExecutionCount = new AtomicLong(0);
-    nrState = SCMNodeReport.getDefaultInstance();
+    dnReport = NodeReportProto.getDefaultInstance();
   }
 
   /**
@@ -144,16 +144,16 @@ public class StateContext {
    * Returns the node report of the datanode state context.
    * @return the node report.
    */
-  public SCMNodeReport getNodeReport() {
-    return nrState;
+  public NodeReportProto getNodeReport() {
+    return dnReport;
   }
 
   /**
    * Sets the storage location report of the datanode state context.
-   * @param nrReport - node report
+   * @param nodeReport node report
    */
-  public void setReportState(SCMNodeReport nrReport) {
-    this.nrState = nrReport;
+  public void setNodeReport(NodeReportProto nodeReport) {
+    this.dnReport = nodeReport;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
new file mode 100644
index 0000000..e8c602d
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handler for close container command received from SCM.
+ */
+public class CloseContainerCommandHandler implements CommandHandler {
+  static final Logger LOG =
+      LoggerFactory.getLogger(CloseContainerCommandHandler.class);
+  private int invocationCount;
+  private long totalTime;
+
+  /**
+   * Constructs a ContainerReport handler.
+   */
+  public CloseContainerCommandHandler() {
+  }
+
+  /**
+   * Handles a given SCM command.
+   *
+   * @param command           - SCM Command
+   * @param container         - Ozone Container.
+   * @param context           - Current Context.
+   * @param connectionManager - The SCMs that we are talking to.
+   */
+  @Override
+  public void handle(SCMCommand command, OzoneContainer container,
+      StateContext context, SCMConnectionManager connectionManager) {
+    LOG.debug("Processing Close Container command.");
+    invocationCount++;
+    long startTime = Time.monotonicNow();
+    // TODO: define this as INVALID_CONTAINER_ID in HddsConsts.java (TBA)
+    long containerID = -1;
+    try {
+
+      CloseContainerCommandProto
+          closeContainerProto =
+          CloseContainerCommandProto
+              .parseFrom(command.getProtoBufMessage());
+      containerID = closeContainerProto.getContainerID();
+
+      container.getContainerManager().closeContainer(containerID);
+
+    } catch (Exception e) {
+      LOG.error("Can't close container " + containerID, e);
+    } finally {
+      long endTime = Time.monotonicNow();
+      totalTime += endTime - startTime;
+    }
+  }
+
+  /**
+   * Returns the command type that this command handler handles.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getCommandType() {
+    return SCMCommandProto.Type.closeContainerCommand;
+  }
+
+  /**
+   * Returns number of times this handler has been invoked.
+   *
+   * @return int
+   */
+  @Override
+  public int getInvocationCount() {
+    return invocationCount;
+  }
+
+  /**
+   * Returns the average time this function takes to run.
+   *
+   * @return long
+   */
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount > 0) {
+      return totalTime / invocationCount;
+    }
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java
deleted file mode 100644
index d8adc7d..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCmdType;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .SCMConnectionManager;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Container Report handler.
- */
-public class CloseContainerHandler implements CommandHandler {
-  static final Logger LOG =
-      LoggerFactory.getLogger(CloseContainerHandler.class);
-  private int invocationCount;
-  private long totalTime;
-
-  /**
-   * Constructs a ContainerReport handler.
-   */
-  public CloseContainerHandler() {
-  }
-
-  /**
-   * Handles a given SCM command.
-   *
-   * @param command           - SCM Command
-   * @param container         - Ozone Container.
-   * @param context           - Current Context.
-   * @param connectionManager - The SCMs that we are talking to.
-   */
-  @Override
-  public void handle(SCMCommand command, OzoneContainer container,
-      StateContext context, SCMConnectionManager connectionManager) {
-    LOG.debug("Processing Close Container command.");
-    invocationCount++;
-    long startTime = Time.monotonicNow();
-    // TODO: define this as INVALID_CONTAINER_ID in HddsConsts.java (TBA)
-    long containerID = -1;
-    try {
-
-      SCMCloseContainerCmdResponseProto
-          closeContainerProto =
-          SCMCloseContainerCmdResponseProto
-              .parseFrom(command.getProtoBufMessage());
-      containerID = closeContainerProto.getContainerID();
-
-      container.getContainerManager().closeContainer(containerID);
-
-    } catch (Exception e) {
-      LOG.error("Can't close container " + containerID, e);
-    } finally {
-      long endTime = Time.monotonicNow();
-      totalTime += endTime - startTime;
-    }
-  }
-
-  /**
-   * Returns the command type that this command handler handles.
-   *
-   * @return Type
-   */
-  @Override
-  public SCMCmdType getCommandType() {
-    return SCMCmdType.closeContainerCommand;
-  }
-
-  /**
-   * Returns number of times this handler has been invoked.
-   *
-   * @return int
-   */
-  @Override
-  public int getInvocationCount() {
-    return invocationCount;
-  }
-
-  /**
-   * Returns the average time this function takes to run.
-   *
-   * @return long
-   */
-  @Override
-  public long getAverageRunTime() {
-    if (invocationCount > 0) {
-      return totalTime / invocationCount;
-    }
-    return 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
index 40feca3..aedd78f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
@@ -18,7 +18,8 @@
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
 import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
@@ -38,7 +39,7 @@ public final class CommandDispatcher {
   static final Logger LOG =
       LoggerFactory.getLogger(CommandDispatcher.class);
   private final StateContext context;
-  private final Map<SCMCmdType, CommandHandler> handlerMap;
+  private final Map<Type, CommandHandler> handlerMap;
   private final OzoneContainer container;
   private final SCMConnectionManager connectionManager;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
index 13d9f72..60e2dc4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
@@ -17,8 +17,10 @@
 
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType;
-import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
@@ -42,7 +44,7 @@ public interface CommandHandler {
    * Returns the command type that this command handler handles.
    * @return Type
    */
-  SCMCmdType getCommandType();
+  SCMCommandProto.Type getCommandType();
 
   /**
    * Returns number of times this handler has been invoked.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index 5231660..ab69bdc 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -18,6 +18,8 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
 import com.google.common.primitives.Longs;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
@@ -26,8 +28,6 @@ import org.apache.hadoop.hdds.protocol.proto
     .DeleteBlockTransactionResult;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCmdType;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
 import org.apache.hadoop.ozone.container.common.helpers
@@ -73,10 +73,10 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
   @Override
   public void handle(SCMCommand command, OzoneContainer container,
       StateContext context, SCMConnectionManager connectionManager) {
-    if (command.getType() != SCMCmdType.deleteBlocksCommand) {
+    if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) {
       LOG.warn("Skipping handling command, expected command "
               + "type {} but found {}",
-          SCMCmdType.deleteBlocksCommand, command.getType());
+          SCMCommandProto.Type.deleteBlocksCommand, command.getType());
       return;
     }
     LOG.debug("Processing block deletion command.");
@@ -193,8 +193,8 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
   }
 
   @Override
-  public SCMCmdType getCommandType() {
-    return SCMCmdType.deleteBlocksCommand;
+  public SCMCommandProto.Type getCommandType() {
+    return SCMCommandProto.Type.deleteBlocksCommand;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 01b4c72..337cdfb 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -23,7 +23,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
+    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
 import org.apache.hadoop.ozone.container.common.helpers
@@ -97,8 +99,13 @@ public class HeartbeatEndpointTask
     try {
       Preconditions.checkState(this.datanodeDetailsProto != null);
 
+      SCMHeartbeatRequestProto request = SCMHeartbeatRequestProto.newBuilder()
+          .setDatanodeDetails(datanodeDetailsProto)
+          .setNodeReport(context.getNodeReport())
+          .build();
+
       SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
-          .sendHeartbeat(datanodeDetailsProto, this.context.getNodeReport());
+          .sendHeartbeat(request);
       processResponse(reponse, datanodeDetailsProto);
       rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now());
       rpcEndpoint.zeroMissedCount();
@@ -125,13 +132,13 @@ public class HeartbeatEndpointTask
    */
   private void processResponse(SCMHeartbeatResponseProto response,
       final DatanodeDetailsProto datanodeDetails) {
-    for (SCMCommandResponseProto commandResponseProto : response
+    Preconditions.checkState(response.getDatanodeUUID()
+            .equalsIgnoreCase(datanodeDetails.getUuid()),
+        "Unexpected datanode ID in the response.");
+    // Verify the response is indeed for this datanode.
+    for (SCMCommandProto commandResponseProto : response
         .getCommandsList()) {
-      // Verify the response is indeed for this datanode.
-      Preconditions.checkState(commandResponseProto.getDatanodeUUID()
-          .equalsIgnoreCase(datanodeDetails.getUuid()),
-          "Unexpected datanode ID in the response.");
-      switch (commandResponseProto.getCmdType()) {
+      switch (commandResponseProto.getCommandType()) {
       case reregisterCommand:
         if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) {
           if (LOG.isDebugEnabled()) {
@@ -148,7 +155,8 @@ public class HeartbeatEndpointTask
         break;
       case deleteBlocksCommand:
         DeleteBlocksCommand db = DeleteBlocksCommand
-            .getFromProtobuf(commandResponseProto.getDeleteBlocksProto());
+            .getFromProtobuf(
+                commandResponseProto.getDeleteBlocksCommandProto());
         if (!db.blocksTobeDeleted().isEmpty()) {
           if (LOG.isDebugEnabled()) {
             LOG.debug(DeletedContainerBlocksSummary
@@ -161,7 +169,7 @@ public class HeartbeatEndpointTask
       case closeContainerCommand:
         CloseContainerCommand closeContainer =
             CloseContainerCommand.getFromProtobuf(
-                commandResponseProto.getCloseContainerProto());
+                commandResponseProto.getCloseContainerCommandProto());
         if (LOG.isDebugEnabled()) {
           LOG.debug("Received SCM container close request for container {}",
               closeContainer.getContainerID());
@@ -170,7 +178,7 @@ public class HeartbeatEndpointTask
         break;
       default:
         throw new IllegalArgumentException("Unknown response : "
-            + commandResponseProto.getCmdType().name());
+            + commandResponseProto.getCommandType().name());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
index 77a7084..12b48ab 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
@@ -24,11 +24,11 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.ozone.container.common.statemachine
     .EndpointStateMachine;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+    .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,11 +104,11 @@ public final class RegisterEndpointTask implements
     rpcEndPoint.lock();
     try {
 
-      ContainerReportsRequestProto contianerReport = datanodeContainerManager
+      ContainerReportsProto contianerReport = datanodeContainerManager
           .getContainerReport();
-      SCMNodeReport nodeReport = datanodeContainerManager.getNodeReport();
+      NodeReportProto nodeReport = datanodeContainerManager.getNodeReport();
       // TODO : Add responses to the command Queue.
-      SCMRegisteredCmdResponseProto response = rpcEndPoint.getEndPoint()
+      SCMRegisteredResponseProto response = rpcEndPoint.getEndPoint()
           .register(datanodeDetails.getProtoBufMessage(), nodeReport,
               contianerReport);
       Preconditions.checkState(UUID.fromString(response.getDatanodeUUID())

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 6758479..b357fef 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -19,14 +19,14 @@ package org.apache.hadoop.ozone.container.ozoneimpl;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
@@ -219,7 +219,7 @@ public class OzoneContainer {
   /**
    * Returns node report of container storage usage.
    */
-  public SCMNodeReport getNodeReport() throws IOException {
+  public NodeReportProto getNodeReport() throws IOException {
     return this.manager.getNodeReport();
   }
 
@@ -255,7 +255,7 @@ public class OzoneContainer {
    * @return - container report.
    * @throws IOException
    */
-  public ContainerReportsRequestProto getContainerReport() throws IOException {
+  public ContainerReportsProto getContainerReport() throws IOException {
     return this.manager.getContainerReport();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
index e2a3bf5..a950a31 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
@@ -19,20 +19,20 @@ package org.apache.hadoop.ozone.protocol;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos
     .ContainerBlocksDeletionACKResponseProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+    .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -55,13 +55,12 @@ public interface StorageContainerDatanodeProtocol {
 
   /**
    * Used by data node to send a Heartbeat.
-   * @param datanodeDetails - Datanode Details.
-   * @param nodeReport - node report state
+   * @param heartbeat Heartbeat
    * @return - SCMHeartbeatResponseProto
    * @throws IOException
    */
-  SCMHeartbeatResponseProto sendHeartbeat(DatanodeDetailsProto datanodeDetails,
-      SCMNodeReport nodeReport) throws IOException;
+  SCMHeartbeatResponseProto sendHeartbeat(SCMHeartbeatRequestProto heartbeat)
+      throws IOException;
 
   /**
    * Register Datanode.
@@ -70,20 +69,11 @@ public interface StorageContainerDatanodeProtocol {
    * @param containerReportsRequestProto - Container Reports.
    * @return SCM Command.
    */
-  SCMRegisteredCmdResponseProto register(DatanodeDetailsProto datanodeDetails,
-      SCMNodeReport nodeReport, ContainerReportsRequestProto
+  SCMRegisteredResponseProto register(DatanodeDetailsProto datanodeDetails,
+      NodeReportProto nodeReport, ContainerReportsProto
       containerReportsRequestProto) throws IOException;
 
   /**
-   * Send a container report.
-   * @param reports -- Container report.
-   * @return container reports response.
-   * @throws IOException
-   */
-  ContainerReportsResponseProto sendContainerReport(
-      ContainerReportsRequestProto reports) throws IOException;
-
-  /**
    * Used by datanode to send block deletion ACK to SCM.
    * @param request block deletion transactions.
    * @return block deletion transaction response.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
index 14038fb..790f58a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
@@ -18,11 +18,12 @@
 package org.apache.hadoop.ozone.protocol;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
 import java.util.List;
@@ -49,11 +50,11 @@ public interface StorageContainerNodeProtocol {
   /**
    * Register the node if the node finds that it is not registered with any SCM.
    * @param datanodeDetails DatanodeDetails
-   * @param nodeReport SCMNodeReport
+   * @param nodeReport NodeReportProto
    * @return  SCMHeartbeatResponseProto
    */
-  SCMCommand register(DatanodeDetailsProto datanodeDetails, SCMNodeReport
-      nodeReport);
+  RegisteredCommand register(DatanodeDetails datanodeDetails,
+                             NodeReportProto nodeReport);
 
   /**
    * Send heartbeat to indicate the datanode is alive and doing well.
@@ -61,7 +62,7 @@ public interface StorageContainerNodeProtocol {
    * @param nodeReport - node report.
    * @return SCMheartbeat response list
    */
-  List<SCMCommand> sendHeartbeat(DatanodeDetailsProto datanodeDetails,
-      SCMNodeReport nodeReport);
+  List<SCMCommand> sendHeartbeat(DatanodeDetails datanodeDetails,
+      NodeReportProto nodeReport);
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
index d1d6488..4f4f82b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
@@ -19,18 +19,16 @@ package org.apache.hadoop.ozone.protocol.commands;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto;
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCmdType;
+    .StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
 
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCmdType.closeContainerCommand;
 
 /**
  * Asks datanode to close a container.
  */
 public class CloseContainerCommand
-    extends SCMCommand<SCMCloseContainerCmdResponseProto> {
+    extends SCMCommand<CloseContainerCommandProto> {
 
   private long containerID;
 
@@ -44,8 +42,8 @@ public class CloseContainerCommand
    * @return Type
    */
   @Override
-  public SCMCmdType getType() {
-    return closeContainerCommand;
+  public SCMCommandProto.Type getType() {
+    return SCMCommandProto.Type.closeContainerCommand;
   }
 
   /**
@@ -58,13 +56,13 @@ public class CloseContainerCommand
     return getProto().toByteArray();
   }
 
-  public SCMCloseContainerCmdResponseProto getProto() {
-    return SCMCloseContainerCmdResponseProto.newBuilder()
+  public CloseContainerCommandProto getProto() {
+    return CloseContainerCommandProto.newBuilder()
         .setContainerID(containerID).build();
   }
 
   public static CloseContainerCommand getFromProtobuf(
-      SCMCloseContainerCmdResponseProto closeContainerProto) {
+      CloseContainerCommandProto closeContainerProto) {
     Preconditions.checkNotNull(closeContainerProto);
     return new CloseContainerCommand(closeContainerProto.getContainerID());
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
index a11ca25..4fa33f6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
@@ -18,11 +18,11 @@
 package org.apache.hadoop.ozone.protocol.commands;
 
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCmdType;
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMDeleteBlocksCmdResponseProto;
+    .StorageContainerDatanodeProtocolProtos.DeleteBlocksCommandProto;
 
 import java.util.List;
 
@@ -30,7 +30,7 @@ import java.util.List;
  * A SCM command asks a datanode to delete a number of blocks.
  */
 public class DeleteBlocksCommand extends
-    SCMCommand<SCMDeleteBlocksCmdResponseProto> {
+    SCMCommand<DeleteBlocksCommandProto> {
 
   private List<DeletedBlocksTransaction> blocksTobeDeleted;
 
@@ -44,8 +44,8 @@ public class DeleteBlocksCommand extends
   }
 
   @Override
-  public SCMCmdType getType() {
-    return SCMCmdType.deleteBlocksCommand;
+  public SCMCommandProto.Type getType() {
+    return SCMCommandProto.Type.deleteBlocksCommand;
   }
 
   @Override
@@ -54,13 +54,13 @@ public class DeleteBlocksCommand extends
   }
 
   public static DeleteBlocksCommand getFromProtobuf(
-      SCMDeleteBlocksCmdResponseProto deleteBlocksProto) {
+      DeleteBlocksCommandProto deleteBlocksProto) {
     return new DeleteBlocksCommand(deleteBlocksProto
         .getDeletedBlocksTransactionsList());
   }
 
-  public SCMDeleteBlocksCmdResponseProto getProto() {
-    return SCMDeleteBlocksCmdResponseProto.newBuilder()
+  public DeleteBlocksCommandProto getProto() {
+    return DeleteBlocksCommandProto.newBuilder()
         .addAllDeletedBlocksTransactions(blocksTobeDeleted).build();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java
index 69f2c18..3a5da72 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java
@@ -19,18 +19,15 @@ package org.apache.hadoop.ozone.protocol.commands;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCmdType;
+    .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto
+    .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto
     .ErrorCode;
 
 /**
  * Response to Datanode Register call.
  */
-public class RegisteredCommand extends
-    SCMCommand<SCMRegisteredCmdResponseProto> {
+public class RegisteredCommand {
   private String datanodeUUID;
   private String clusterID;
   private ErrorCode error;
@@ -60,16 +57,6 @@ public class RegisteredCommand extends
   }
 
   /**
-   * Returns the type of this command.
-   *
-   * @return Type
-   */
-  @Override
-  public SCMCmdType getType() {
-    return SCMCmdType.registeredCommand;
-  }
-
-  /**
    * Returns datanode UUID.
    *
    * @return - Datanode ID.
@@ -117,10 +104,9 @@ public class RegisteredCommand extends
    *
    * @return A protobuf message.
    */
-  @Override
   public byte[] getProtoBufMessage() {
-    SCMRegisteredCmdResponseProto.Builder builder =
-        SCMRegisteredCmdResponseProto.newBuilder()
+    SCMRegisteredResponseProto.Builder builder =
+        SCMRegisteredResponseProto.newBuilder()
             .setClusterID(this.clusterID)
             .setDatanodeUUID(this.datanodeUUID)
             .setErrorCode(this.error);
@@ -157,7 +143,7 @@ public class RegisteredCommand extends
      * @param response - RegisteredCmdResponseProto
      * @return RegisteredCommand
      */
-    public  RegisteredCommand getFromProtobuf(SCMRegisteredCmdResponseProto
+    public  RegisteredCommand getFromProtobuf(SCMRegisteredResponseProto
                                                         response) {
       Preconditions.checkNotNull(response);
       if (response.hasHostname() && response.hasIpAddress()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
index c167d59..953e31a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
@@ -18,18 +18,16 @@
 package org.apache.hadoop.ozone.protocol.commands;
 
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCmdType;
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 
 import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCmdType.reregisterCommand;
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
+    .StorageContainerDatanodeProtocolProtos.ReregisterCommandProto;
 
 /**
  * Informs a datanode to register itself with SCM again.
  */
 public class ReregisterCommand extends
-    SCMCommand<SCMReregisterCmdResponseProto>{
+    SCMCommand<ReregisterCommandProto>{
 
   /**
    * Returns the type of this command.
@@ -37,8 +35,8 @@ public class ReregisterCommand extends
    * @return Type
    */
   @Override
-  public SCMCmdType getType() {
-    return reregisterCommand;
+  public SCMCommandProto.Type getType() {
+    return SCMCommandProto.Type.reregisterCommand;
   }
 
   /**
@@ -51,8 +49,8 @@ public class ReregisterCommand extends
     return getProto().toByteArray();
   }
 
-  public SCMReregisterCmdResponseProto getProto() {
-    return SCMReregisterCmdResponseProto
+  public ReregisterCommandProto getProto() {
+    return ReregisterCommandProto
         .newBuilder()
         .build();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
index 73e4194..35ca802 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.protocol.commands;
 
 import com.google.protobuf.GeneratedMessage;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCmdType;
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 
 /**
  * A class that acts as the base class to convert between Java and SCM
@@ -31,7 +31,7 @@ public abstract class SCMCommand<T extends GeneratedMessage> {
    * Returns the type of this command.
    * @return Type
    */
-  public  abstract SCMCmdType getType();
+  public  abstract SCMCommandProto.Type getType();
 
   /**
    * Gets the protobuf message of this object.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
index a56c57a..40fe189 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
@@ -20,24 +20,23 @@ import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos
     .ContainerBlocksDeletionACKResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
+
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+    .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -123,22 +122,16 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
   /**
    * Send by datanode to SCM.
    *
-   * @param datanodeDetailsProto - Datanode Details
-   * @param nodeReport - node report
+   * @param heartbeat node heartbeat
    * @throws IOException
    */
 
   @Override
   public SCMHeartbeatResponseProto sendHeartbeat(
-      DatanodeDetailsProto datanodeDetailsProto,
-      SCMNodeReport nodeReport) throws IOException {
-    SCMHeartbeatRequestProto.Builder req = SCMHeartbeatRequestProto
-        .newBuilder();
-    req.setDatanodeDetails(datanodeDetailsProto);
-    req.setNodeReport(nodeReport);
+      SCMHeartbeatRequestProto heartbeat) throws IOException {
     final SCMHeartbeatResponseProto resp;
     try {
-      resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build());
+      resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, heartbeat);
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
@@ -154,16 +147,16 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
    * @return SCM Command.
    */
   @Override
-  public SCMRegisteredCmdResponseProto register(
-      DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport,
-      ContainerReportsRequestProto containerReportsRequestProto)
+  public SCMRegisteredResponseProto register(
+      DatanodeDetailsProto datanodeDetailsProto, NodeReportProto nodeReport,
+      ContainerReportsProto containerReportsRequestProto)
       throws IOException {
     SCMRegisterRequestProto.Builder req =
         SCMRegisterRequestProto.newBuilder();
     req.setDatanodeDetails(datanodeDetailsProto);
     req.setContainerReport(containerReportsRequestProto);
     req.setNodeReport(nodeReport);
-    final SCMRegisteredCmdResponseProto response;
+    final SCMRegisteredResponseProto response;
     try {
       response = rpcProxy.register(NULL_RPC_CONTROLLER, req.build());
     } catch (ServiceException e) {
@@ -172,25 +165,6 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
     return response;
   }
 
-  /**
-   * Send a container report.
-   *
-   * @param reports -- Container report
-   * @return HeartbeatRespose.nullcommand.
-   * @throws IOException
-   */
-  @Override
-  public ContainerReportsResponseProto sendContainerReport(
-      ContainerReportsRequestProto reports) throws IOException {
-    final ContainerReportsResponseProto resp;
-    try {
-      resp = rpcProxy.sendContainerReport(NULL_RPC_CONTROLLER, reports);
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    return resp;
-  }
-
   @Override
   public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
       ContainerBlocksDeletionACKProto deletedBlocks) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
index 07dba57..7e8bd8a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
@@ -19,18 +19,22 @@ package org.apache.hadoop.ozone.protocolPB;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos;
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+    .StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos
-    .ContainerBlocksDeletionACKResponseProto;
+    .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+    .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos
+    .ContainerBlocksDeletionACKResponseProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -55,9 +59,8 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
   }
 
   @Override
-  public StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto
-      getVersion(RpcController controller,
-      StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto request)
+  public SCMVersionResponseProto getVersion(RpcController controller,
+      SCMVersionRequestProto request)
       throws ServiceException {
     try {
       return impl.getVersion(request);
@@ -67,15 +70,13 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
   }
 
   @Override
-  public StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto
-      register(RpcController controller, StorageContainerDatanodeProtocolProtos
-      .SCMRegisterRequestProto request) throws ServiceException {
+  public SCMRegisteredResponseProto register(RpcController controller,
+      SCMRegisterRequestProto request) throws ServiceException {
     try {
-      ContainerReportsRequestProto containerRequestProto = null;
-      SCMNodeReport scmNodeReport = null;
-      containerRequestProto = request.getContainerReport();
-      scmNodeReport = request.getNodeReport();
-      return impl.register(request.getDatanodeDetails(), scmNodeReport,
+      ContainerReportsProto containerRequestProto = request
+          .getContainerReport();
+      NodeReportProto dnNodeReport = request.getNodeReport();
+      return impl.register(request.getDatanodeDetails(), dnNodeReport,
           containerRequestProto);
     } catch (IOException e) {
       throw new ServiceException(e);
@@ -83,27 +84,15 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
   }
 
   @Override
-  public SCMHeartbeatResponseProto
-      sendHeartbeat(RpcController controller,
+  public SCMHeartbeatResponseProto sendHeartbeat(RpcController controller,
       SCMHeartbeatRequestProto request) throws ServiceException {
     try {
-      return impl.sendHeartbeat(request.getDatanodeDetails(),
-          request.getNodeReport());
+      return impl.sendHeartbeat(request);
     } catch (IOException e) {
       throw new ServiceException(e);
     }
   }
 
-  @Override
-  public ContainerReportsResponseProto sendContainerReport(
-      RpcController controller, ContainerReportsRequestProto request)
-      throws ServiceException {
-    try {
-      return impl.sendContainerReport(request);
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
-  }
 
   @Override
   public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 20e6af8..cc131e0 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -34,81 +34,74 @@ package hadoop.hdds;
 
 import "hdds.proto";
 
+/**
+ * Request for version info of the software stack on the server.
+ */
+message SCMVersionRequestProto {}
 
 /**
-* This message is send by data node to indicate that it is alive or it is
-* registering with the node manager.
+* Generic response that is send to a version request. This allows keys to be
+* added on the fly and protocol to remain stable.
 */
-message SCMHeartbeatRequestProto {
-  required DatanodeDetailsProto datanodeDetails = 1;
-  optional SCMNodeReport nodeReport = 2;
+message SCMVersionResponseProto {
+  required uint32 softwareVersion = 1;
+  repeated hadoop.hdds.KeyValue keys = 2;
 }
 
-/**
-A container report contains the following information.
-*/
-message ContainerInfo {
-  optional string finalhash = 2;
-  optional int64 size = 3;
-  optional int64 used = 4;
-  optional int64 keyCount = 5;
-  // TODO: move the io count to separate message
-  optional int64 readCount = 6;
-  optional int64 writeCount = 7;
-  optional int64 readBytes = 8;
-  optional int64 writeBytes = 9;
-  required int64 containerID = 10;
-  optional hadoop.hdds.LifeCycleState state = 11;
+message SCMRegisterRequestProto {
+  required DatanodeDetailsProto datanodeDetails = 1;
+  required NodeReportProto nodeReport = 2;
+  required ContainerReportsProto containerReport = 3;
 }
 
-// The deleted blocks which are stored in deletedBlock.db of scm.
-// We don't use BlockID because this only contians multiple localIDs
-// of the same containerID.
-message DeletedBlocksTransaction {
-  required int64 txID = 1;
-  required int64 containerID = 2;
-  repeated int64 localID = 3;
-  // the retry time of sending deleting command to datanode.
-  required int32 count = 4;
+/**
+ * Datanode ID returned by the SCM. This is similar to name node
+ * registeration of a datanode.
+ */
+message SCMRegisteredResponseProto {
+  enum ErrorCode {
+    success = 1;
+    errorNodeNotPermitted = 2;
+  }
+  required ErrorCode errorCode = 1;
+  required string datanodeUUID = 2;
+  required string clusterID = 3;
+  optional SCMNodeAddressList addressList = 4;
+  optional string hostname = 5;
+  optional string ipAddress = 6;
 }
 
 /**
-A set of container reports, max count is generally set to
-8192 since that keeps the size of the reports under 1 MB.
+* This message is send by data node to indicate that it is alive or it is
+* registering with the node manager.
 */
-message ContainerReportsRequestProto {
-  enum reportType {
-    fullReport = 0;
-    deltaReport = 1;
-  }
+message SCMHeartbeatRequestProto {
   required DatanodeDetailsProto datanodeDetails = 1;
-  repeated ContainerInfo reports = 2;
-  required reportType type = 3;
+  optional NodeReportProto nodeReport = 2;
+  optional ContainerReportsProto containerReport = 3;
 }
 
-message ContainerReportsResponseProto {
+/*
+ * A group of commands for the datanode to execute
+ */
+message SCMHeartbeatResponseProto {
+  required string datanodeUUID = 1;
+  repeated SCMCommandProto commands = 2;
 }
 
-/**
-* This message is send along with the heart beat to report datanode
-* storage utilization by SCM.
-*/
-message SCMNodeReport {
-  repeated SCMStorageReport storageReport = 1;
+message SCMNodeAddressList {
+  repeated string addressList = 1;
 }
 
 /**
- * Types of recognized storage media.
- */
-enum StorageTypeProto {
-  DISK = 1;
-  SSD = 2;
-  ARCHIVE = 3;
-  RAM_DISK = 4;
-  PROVIDED = 5;
+* This message is send along with the heart beat to report datanode
+* storage utilization to SCM.
+*/
+message NodeReportProto {
+  repeated StorageReportProto storageReport = 1;
 }
 
-message SCMStorageReport {
+message StorageReportProto {
   required string storageUuid = 1;
   required string storageLocation = 2;
   optional uint64 capacity = 3 [default = 0];
@@ -118,107 +111,82 @@ message SCMStorageReport {
   optional bool failed = 7 [default = false];
 }
 
-message SCMRegisterRequestProto {
-  required DatanodeDetailsProto datanodeDetails = 1;
-  required SCMNodeReport nodeReport = 2;
-  required ContainerReportsRequestProto containerReport = 3;
-}
-
-/**
- * Request for version info of the software stack on the server.
- */
-message SCMVersionRequestProto {
-
-}
-
-/**
-* Generic response that is send to a version request. This allows keys to be
-* added on the fly and protocol to remain stable.
-*/
-message SCMVersionResponseProto {
-  required uint32 softwareVersion = 1;
-  repeated hadoop.hdds.KeyValue keys = 2;
-}
-
-message SCMNodeAddressList {
-  repeated string addressList = 1;
-}
-
 /**
- * Datanode ID returned by the SCM. This is similar to name node
- * registeration of a datanode.
+ * Types of recognized storage media.
  */
-message SCMRegisteredCmdResponseProto {
-  enum ErrorCode {
-    success = 1;
-    errorNodeNotPermitted = 2;
-  }
-  required ErrorCode errorCode = 2;
-  required string datanodeUUID = 3;
-  required string clusterID = 4;
-  optional SCMNodeAddressList addressList = 5;
-  optional string hostname = 6;
-  optional string ipAddress = 7;
+enum StorageTypeProto {
+  DISK = 1;
+  SSD = 2;
+  ARCHIVE = 3;
+  RAM_DISK = 4;
+  PROVIDED = 5;
 }
 
 /**
- * SCM informs a datanode to register itself again.
- * With recieving this command, datanode will transit to REGISTER state.
- */
-message SCMReregisterCmdResponseProto {}
-
-/**
-This command tells the data node to send in the container report when possible
+A set of container reports, max count is generally set to
+8192 since that keeps the size of the reports under 1 MB.
 */
-message SendContainerReportProto {
+message ContainerReportsProto {
+  repeated ContainerInfo reports = 2;
 }
 
-/**
-This command asks the datanode to close a specific container.
-*/
-message SCMCloseContainerCmdResponseProto {
-  required int64 containerID = 1;
-}
 
 /**
-Type of commands supported by SCM to datanode protocol.
+A container report contains the following information.
 */
-enum SCMCmdType {
-  versionCommand = 2;
-  registeredCommand = 3;
-  reregisterCommand = 4;
-  deleteBlocksCommand = 5;
-  closeContainerCommand = 6;
+message ContainerInfo {
+  optional string finalhash = 1;
+  optional int64 size = 2;
+  optional int64 used = 3;
+  optional int64 keyCount = 4;
+  // TODO: move the io count to separate message
+  optional int64 readCount = 5;
+  optional int64 writeCount = 6;
+  optional int64 readBytes = 7;
+  optional int64 writeBytes = 8;
+  required int64 containerID = 9;
+  optional hadoop.hdds.LifeCycleState state = 10;
 }
 
 /*
  * These are commands returned by SCM for to the datanode to execute.
  */
-message SCMCommandResponseProto {
-  required SCMCmdType cmdType = 2; // Type of the command
-  optional SCMRegisteredCmdResponseProto registeredProto = 3;
-  optional SCMVersionResponseProto versionProto = 4;
-  optional SCMReregisterCmdResponseProto reregisterProto = 5;
-  optional SCMDeleteBlocksCmdResponseProto deleteBlocksProto = 6;
-  required string datanodeUUID = 7;
-  optional SCMCloseContainerCmdResponseProto closeContainerProto = 8;
+message SCMCommandProto {
+  enum Type {
+    reregisterCommand = 1;
+    deleteBlocksCommand = 2;
+    closeContainerCommand = 3;
+    deleteContainerCommand = 4;
+  }
+  // TODO: once we start using protoc 3.x, refactor this message using "oneof"
+  required Type commandType = 1;
+  optional ReregisterCommandProto reregisterCommandProto = 2;
+  optional DeleteBlocksCommandProto deleteBlocksCommandProto = 3;
+  optional CloseContainerCommandProto closeContainerCommandProto = 4;
+  optional DeleteContainerCommandProto deleteContainerCommandProto = 5;
 }
 
-
-/*
- * A group of commands for the datanode to execute
+/**
+ * SCM informs a datanode to register itself again.
+ * With recieving this command, datanode will transit to REGISTER state.
  */
-message SCMHeartbeatResponseProto {
-  repeated SCMCommandResponseProto commands = 1;
-}
+message ReregisterCommandProto {}
+
 
 // HB response from SCM, contains a list of block deletion transactions.
-message SCMDeleteBlocksCmdResponseProto {
+message DeleteBlocksCommandProto {
   repeated DeletedBlocksTransaction deletedBlocksTransactions = 1;
 }
 
-// SendACK response returned by datanode to SCM, currently empty.
-message ContainerBlocksDeletionACKResponseProto {
+// The deleted blocks which are stored in deletedBlock.db of scm.
+// We don't use BlockID because this only contians multiple localIDs
+// of the same containerID.
+message DeletedBlocksTransaction {
+  required int64 txID = 1;
+  required int64 containerID = 2;
+  repeated int64 localID = 3;
+  // the retry time of sending deleting command to datanode.
+  required int32 count = 4;
 }
 
 // ACK message datanode sent to SCM, contains the result of
@@ -231,6 +199,24 @@ message ContainerBlocksDeletionACKProto {
   repeated DeleteBlockTransactionResult results = 1;
 }
 
+// SendACK response returned by datanode to SCM, currently empty.
+message ContainerBlocksDeletionACKResponseProto {
+}
+
+/**
+This command asks the datanode to close a specific container.
+*/
+message CloseContainerCommandProto {
+  required int64 containerID = 1;
+}
+
+/**
+This command asks the datanode to close a specific container.
+*/
+message DeleteContainerCommandProto {
+  required int64 containerID = 1;
+}
+
 /**
  * Protocol used from a datanode to StorageContainerManager.
  *
@@ -305,7 +291,7 @@ service StorageContainerDatanodeProtocolService {
   /**
   * Registers a data node with SCM.
   */
-  rpc register (SCMRegisterRequestProto) returns (SCMRegisteredCmdResponseProto);
+  rpc register (SCMRegisterRequestProto) returns (SCMRegisteredResponseProto);
 
   /**
    * Send heartbeat from datanode to SCM. HB's under SCM looks more
@@ -315,12 +301,6 @@ service StorageContainerDatanodeProtocolService {
   rpc sendHeartbeat (SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto);
 
   /**
-    send container reports sends the container report to SCM. This will
-    return a null command as response.
-  */
-  rpc sendContainerReport(ContainerReportsRequestProto) returns (ContainerReportsResponseProto);
-
-  /**
    * Sends the block deletion ACK to SCM.
    */
   rpc sendContainerBlocksDeletionACK (ContainerBlocksDeletionACKProto) returns (ContainerBlocksDeletionACKResponseProto);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
index c57a366..0ee6321 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
@@ -16,12 +16,12 @@
  */
 package org.apache.hadoop.ozone.container.common;
 
-import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.scm.VersionInfo;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos;
+    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -30,13 +30,13 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerInfo;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+    .StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
 
@@ -56,7 +56,7 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
   // Map of datanode to containers
   private Map<DatanodeDetails, Map<String, ContainerInfo>> nodeContainers =
       new HashMap();
-  private Map<DatanodeDetails, SCMNodeReport> nodeReports = new HashMap<>();
+  private Map<DatanodeDetails, NodeReportProto> nodeReports = new HashMap<>();
   /**
    * Returns the number of heartbeats made to this class.
    *
@@ -166,20 +166,17 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
   /**
    * Used by data node to send a Heartbeat.
    *
-   * @param datanodeDetailsProto - DatanodeDetailsProto.
-   * @param nodeReport - node report.
+   * @param heartbeat - node heartbeat.
    * @return - SCMHeartbeatResponseProto
    * @throws IOException
    */
   @Override
   public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
-      sendHeartbeat(DatanodeDetailsProto datanodeDetailsProto,
-                    SCMNodeReport nodeReport)
-      throws IOException {
+      sendHeartbeat(SCMHeartbeatRequestProto heartbeat) throws IOException {
     rpcCount.incrementAndGet();
     heartbeatCount.incrementAndGet();
     sleepIfNeeded();
-    List<SCMCommandResponseProto>
+    List<SCMCommandProto>
         cmdResponses = new LinkedList<>();
     return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses)
         .build();
@@ -193,21 +190,19 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
    */
   @Override
   public StorageContainerDatanodeProtocolProtos
-      .SCMRegisteredCmdResponseProto register(
-          DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport,
-          StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto
+      .SCMRegisteredResponseProto register(
+          DatanodeDetailsProto datanodeDetailsProto, NodeReportProto nodeReport,
+          StorageContainerDatanodeProtocolProtos.ContainerReportsProto
               containerReportsRequestProto)
       throws IOException {
     rpcCount.incrementAndGet();
-    sendContainerReport(containerReportsRequestProto);
     updateNodeReport(datanodeDetailsProto, nodeReport);
     sleepIfNeeded();
-    return StorageContainerDatanodeProtocolProtos
-        .SCMRegisteredCmdResponseProto
+    return StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto
         .newBuilder().setClusterID(UUID.randomUUID().toString())
         .setDatanodeUUID(datanodeDetailsProto.getUuid()).setErrorCode(
             StorageContainerDatanodeProtocolProtos
-                .SCMRegisteredCmdResponseProto.ErrorCode.success).build();
+                .SCMRegisteredResponseProto.ErrorCode.success).build();
   }
 
   /**
@@ -216,19 +211,19 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
    * @param nodeReport
    */
   public void updateNodeReport(DatanodeDetailsProto datanodeDetailsProto,
-      SCMNodeReport nodeReport) {
+      NodeReportProto nodeReport) {
     DatanodeDetails datanode = DatanodeDetails.getFromProtoBuf(
         datanodeDetailsProto);
-    SCMNodeReport.Builder datanodeReport = SCMNodeReport.newBuilder();
+    NodeReportProto.Builder nodeReportProto = NodeReportProto.newBuilder();
 
-    List<SCMStorageReport> storageReports =
+    List<StorageReportProto> storageReports =
         nodeReport.getStorageReportList();
 
-    for(SCMStorageReport report : storageReports) {
-      datanodeReport.addStorageReport(report);
+    for(StorageReportProto report : storageReports) {
+      nodeReportProto.addStorageReport(report);
     }
 
-    nodeReports.put(datanode, datanodeReport.build());
+    nodeReports.put(datanode, nodeReportProto.build());
 
   }
 
@@ -254,39 +249,6 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
     return 0;
   }
 
-  /**
-   * Send a container report.
-   *
-   * @param reports -- Container report
-   * @return HeartbeatResponse.nullcommand.
-   * @throws IOException
-   */
-  @Override
-  public StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto
-      sendContainerReport(StorageContainerDatanodeProtocolProtos
-      .ContainerReportsRequestProto reports) throws IOException {
-    Preconditions.checkNotNull(reports);
-    containerReportsCount.incrementAndGet();
-
-    DatanodeDetails datanode = DatanodeDetails.getFromProtoBuf(
-        reports.getDatanodeDetails());
-    if (reports.getReportsCount() > 0) {
-      Map containers = nodeContainers.get(datanode);
-      if (containers == null) {
-        containers = new LinkedHashMap();
-        nodeContainers.put(datanode, containers);
-      }
-
-      for (StorageContainerDatanodeProtocolProtos.ContainerInfo report:
-          reports.getReportsList()) {
-        containers.put(report.getContainerID(), report);
-      }
-    }
-
-    return StorageContainerDatanodeProtocolProtos
-        .ContainerReportsResponseProto.newBuilder().build();
-  }
-
   @Override
   public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
       ContainerBlocksDeletionACKProto request) throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/3] hadoop git commit: HDDS-81. Moving ContainerReport inside Datanode heartbeat. Contributed by Nanda Kumar.

Posted by ae...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index 2d88621..f5fe46a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Longs;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
@@ -33,7 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.lease.Lease;
 import org.apache.hadoop.ozone.lease.LeaseException;
@@ -368,11 +369,12 @@ public class ContainerMapping implements Mapping {
    * @param reports Container report
    */
   @Override
-  public void processContainerReports(ContainerReportsRequestProto reports)
+  public void processContainerReports(DatanodeDetails datanodeDetails,
+                                      ContainerReportsProto reports)
       throws IOException {
     List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
         containerInfos = reports.getReportsList();
-    containerSupervisor.handleContainerReport(reports);
+    containerSupervisor.handleContainerReport(datanodeDetails, reports);
     for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState :
         containerInfos) {
       byte[] dbKey = Longs.toByteArray(datanodeState.getContainerID());
@@ -402,7 +404,7 @@ public class ContainerMapping implements Mapping {
           // Container not found in our container db.
           LOG.error("Error while processing container report from datanode :" +
                   " {}, for container: {}, reason: container doesn't exist in" +
-                  "container database.", reports.getDatanodeDetails(),
+                  "container database.", datanodeDetails,
               datanodeState.getContainerID());
         }
       } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
index f560174..ee8e344 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
@@ -16,10 +16,11 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -98,7 +99,8 @@ public interface Mapping extends Closeable {
    *
    * @param reports Container report
    */
-  void processContainerReports(ContainerReportsRequestProto reports)
+  void processContainerReports(DatanodeDetails datanodeDetails,
+                               ContainerReportsProto reports)
       throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java
index c14303f..5bd0574 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodePoolManager;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
@@ -295,24 +295,21 @@ public class ContainerSupervisor implements Closeable {
    * @param containerReport  -- Container report for a specific container from
    * a datanode.
    */
-  public void handleContainerReport(
-      ContainerReportsRequestProto containerReport) {
-    DatanodeDetails datanodeDetails = DatanodeDetails.getFromProtoBuf(
-        containerReport.getDatanodeDetails());
+  public void handleContainerReport(DatanodeDetails datanodeDetails,
+      ContainerReportsProto containerReport) {
     inProgressPoolListLock.readLock().lock();
     try {
       String poolName = poolManager.getNodePool(datanodeDetails);
       for (InProgressPool ppool : inProgressPoolList) {
         if (ppool.getPoolName().equalsIgnoreCase(poolName)) {
-          ppool.handleContainerReport(containerReport);
+          ppool.handleContainerReport(datanodeDetails, containerReport);
           return;
         }
       }
       // TODO: Decide if we can do anything else with this report.
       LOG.debug("Discarding the container report for pool {}. " +
               "That pool is not currently in the pool reconciliation process." +
-              " Container Name: {}", poolName,
-          containerReport.getDatanodeDetails());
+              " Container Name: {}", poolName, datanodeDetails);
     } catch (SCMException e) {
       LOG.warn("Skipping processing container report from datanode {}, "
               + "cause: failed to get the corresponding node pool",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java
index c444e90..4b54731 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerInfo;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -178,21 +178,20 @@ public final class InProgressPool {
    *
    * @param containerReport - ContainerReport
    */
-  public void handleContainerReport(
-      ContainerReportsRequestProto containerReport) {
+  public void handleContainerReport(DatanodeDetails datanodeDetails,
+      ContainerReportsProto containerReport) {
     if (status == ProgressStatus.InProgress) {
-      executorService.submit(processContainerReport(containerReport));
+      executorService.submit(processContainerReport(datanodeDetails,
+          containerReport));
     } else {
       LOG.debug("Cannot handle container report when the pool is in {} status.",
           status);
     }
   }
 
-  private Runnable processContainerReport(
-      ContainerReportsRequestProto reports) {
+  private Runnable processContainerReport(DatanodeDetails datanodeDetails,
+      ContainerReportsProto reports) {
     return () -> {
-      DatanodeDetails datanodeDetails =
-          DatanodeDetails.getFromProtoBuf(reports.getDatanodeDetails());
       if (processedNodeSet.computeIfAbsent(datanodeDetails.getUuid(),
           (k) -> true)) {
         nodeProcessed.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java
index 05a9fc3..04658bd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hdds.scm.node;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 
 import static org.apache.hadoop.util.Time.monotonicNow;
 
@@ -31,7 +31,7 @@ import static org.apache.hadoop.util.Time.monotonicNow;
 public class HeartbeatQueueItem {
   private DatanodeDetails datanodeDetails;
   private long recvTimestamp;
-  private SCMNodeReport nodeReport;
+  private NodeReportProto nodeReport;
 
   /**
    *
@@ -40,7 +40,7 @@ public class HeartbeatQueueItem {
    * @param nodeReport - node report associated with the heartbeat if any.
    */
   HeartbeatQueueItem(DatanodeDetails datanodeDetails, long recvTimestamp,
-      SCMNodeReport nodeReport) {
+      NodeReportProto nodeReport) {
     this.datanodeDetails = datanodeDetails;
     this.recvTimestamp = recvTimestamp;
     this.nodeReport = nodeReport;
@@ -56,7 +56,7 @@ public class HeartbeatQueueItem {
   /**
    * @return node report.
    */
-  public SCMNodeReport getNodeReport() {
+  public NodeReportProto getNodeReport() {
     return nodeReport;
   }
 
@@ -72,7 +72,7 @@ public class HeartbeatQueueItem {
    */
   public static class Builder {
     private DatanodeDetails datanodeDetails;
-    private SCMNodeReport nodeReport;
+    private NodeReportProto nodeReport;
     private long recvTimestamp = monotonicNow();
 
     public Builder setDatanodeDetails(DatanodeDetails dnDetails) {
@@ -80,8 +80,8 @@ public class HeartbeatQueueItem {
       return this;
     }
 
-    public Builder setNodeReport(SCMNodeReport scmNodeReport) {
-      this.nodeReport = scmNodeReport;
+    public Builder setNodeReport(NodeReportProto report) {
+      this.nodeReport = report;
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 353a069..b339fb7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -28,15 +28,14 @@ import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto
+    .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto
     .ErrorCode;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+    .StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
 import org.apache.hadoop.ipc.Server;
@@ -592,7 +591,7 @@ public class SCMNodeManager
 
     DatanodeDetails datanodeDetails = hbItem.getDatanodeDetails();
     UUID datanodeUuid = datanodeDetails.getUuid();
-    SCMNodeReport nodeReport = hbItem.getNodeReport();
+    NodeReportProto nodeReport = hbItem.getNodeReport();
     long recvTimestamp = hbItem.getRecvTimestamp();
     long processTimestamp = Time.monotonicNow();
     if (LOG.isTraceEnabled()) {
@@ -637,7 +636,7 @@ public class SCMNodeManager
         new ReregisterCommand());
   }
 
-  private void updateNodeStat(UUID dnId, SCMNodeReport nodeReport) {
+  private void updateNodeStat(UUID dnId, NodeReportProto nodeReport) {
     SCMNodeStat stat = nodeStats.get(dnId);
     if (stat == null) {
       LOG.debug("SCM updateNodeStat based on heartbeat from previous" +
@@ -649,8 +648,9 @@ public class SCMNodeManager
       long totalCapacity = 0;
       long totalRemaining = 0;
       long totalScmUsed = 0;
-      List<SCMStorageReport> storageReports = nodeReport.getStorageReportList();
-      for (SCMStorageReport report : storageReports) {
+      List<StorageReportProto> storageReports = nodeReport
+          .getStorageReportList();
+      for (StorageReportProto report : storageReports) {
         totalCapacity += report.getCapacity();
         totalRemaining +=  report.getRemaining();
         totalScmUsed+= report.getScmUsed();
@@ -710,7 +710,7 @@ public class SCMNodeManager
    * Register the node if the node finds that it is not registered with any
    * SCM.
    *
-   * @param datanodeDetailsProto - Send datanodeDetails with Node info.
+   * @param datanodeDetails - Send datanodeDetails with Node info.
    *                   This function generates and assigns new datanode ID
    *                   for the datanode. This allows SCM to be run independent
    *                   of Namenode if required.
@@ -719,13 +719,11 @@ public class SCMNodeManager
    * @return SCMHeartbeatResponseProto
    */
   @Override
-  public SCMCommand register(DatanodeDetailsProto datanodeDetailsProto,
-                             SCMNodeReport nodeReport) {
+  public RegisteredCommand register(
+      DatanodeDetails datanodeDetails, NodeReportProto nodeReport) {
 
     String hostname = null;
     String ip = null;
-    DatanodeDetails datanodeDetails = DatanodeDetails.getFromProtoBuf(
-        datanodeDetailsProto);
     InetAddress dnAddress = Server.getRemoteIp();
     if (dnAddress != null) {
       // Mostly called inside an RPC, update ip and peer hostname
@@ -734,7 +732,7 @@ public class SCMNodeManager
       datanodeDetails.setHostName(hostname);
       datanodeDetails.setIpAddress(ip);
     }
-    SCMCommand responseCommand = verifyDatanodeUUID(datanodeDetails);
+    RegisteredCommand responseCommand = verifyDatanodeUUID(datanodeDetails);
     if (responseCommand != null) {
       return responseCommand;
     }
@@ -785,7 +783,8 @@ public class SCMNodeManager
    * @param datanodeDetails - Datanode Details.
    * @return SCMCommand
    */
-  private SCMCommand verifyDatanodeUUID(DatanodeDetails datanodeDetails) {
+  private RegisteredCommand verifyDatanodeUUID(
+      DatanodeDetails datanodeDetails) {
     if (datanodeDetails.getUuid() != null &&
         nodes.containsKey(datanodeDetails.getUuid())) {
       LOG.trace("Datanode is already registered. Datanode: {}",
@@ -802,34 +801,23 @@ public class SCMNodeManager
   /**
    * Send heartbeat to indicate the datanode is alive and doing well.
    *
-   * @param datanodeDetailsProto - DatanodeDetailsProto.
+   * @param datanodeDetails - DatanodeDetailsProto.
    * @param nodeReport - node report.
    * @return SCMheartbeat response.
    * @throws IOException
    */
   @Override
   public List<SCMCommand> sendHeartbeat(
-      DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport) {
+      DatanodeDetails datanodeDetails, NodeReportProto nodeReport) {
 
-    Preconditions.checkNotNull(datanodeDetailsProto, "Heartbeat is missing " +
+    Preconditions.checkNotNull(datanodeDetails, "Heartbeat is missing " +
         "DatanodeDetails.");
-    DatanodeDetails datanodeDetails = DatanodeDetails
-        .getFromProtoBuf(datanodeDetailsProto);
-    // Checking for NULL to make sure that we don't get
-    // an exception from ConcurrentList.
-    // This could be a problem in tests, if this function is invoked via
-    // protobuf, transport layer will guarantee that this is not null.
-    if (datanodeDetails != null) {
-      heartbeatQueue.add(
-          new HeartbeatQueueItem.Builder()
-              .setDatanodeDetails(datanodeDetails)
-              .setNodeReport(nodeReport)
-              .build());
-      return commandQueue.getCommand(datanodeDetails.getUuid());
-    } else {
-      LOG.error("Datanode ID in heartbeat is null");
-    }
-    return null;
+    heartbeatQueue.add(
+        new HeartbeatQueueItem.Builder()
+            .setDatanodeDetails(datanodeDetails)
+            .setNodeReport(nodeReport)
+            .build());
+    return commandQueue.getCommand(datanodeDetails.getUuid());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
index fa423bb..6ea83df 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
@@ -23,7 +23,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto.
-    StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+    StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -33,7 +33,11 @@ import org.slf4j.LoggerFactory;
 
 import javax.management.ObjectName;
 import java.io.IOException;
-import java.util.*;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
@@ -159,7 +163,7 @@ public class SCMNodeStorageStatMap implements SCMNodeStorageStatMXBean {
   }
 
   public StorageReportResult processNodeReport(UUID datanodeID,
-      StorageContainerDatanodeProtocolProtos.SCMNodeReport nodeReport)
+      StorageContainerDatanodeProtocolProtos.NodeReportProto nodeReport)
       throws IOException {
     Preconditions.checkNotNull(datanodeID);
     Preconditions.checkNotNull(nodeReport);
@@ -170,9 +174,9 @@ public class SCMNodeStorageStatMap implements SCMNodeStorageStatMXBean {
     Set<StorageLocationReport> storagReportSet = new HashSet<>();
     Set<StorageLocationReport> fullVolumeSet = new HashSet<>();
     Set<StorageLocationReport> failedVolumeSet = new HashSet<>();
-    List<SCMStorageReport>
+    List<StorageReportProto>
         storageReports = nodeReport.getStorageReportList();
-    for (SCMStorageReport report : storageReports) {
+    for (StorageReportProto report : storageReports) {
       StorageLocationReport storageReport =
           StorageLocationReport.getFromProtobuf(report);
       storagReportSet.add(storageReport);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 6e5b7de..1b1645d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -25,29 +25,47 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.BlockingService;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReregisterCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos
+    .ContainerBlocksDeletionACKResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos
+    .ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+
 
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto
+    .Type.closeContainerCommand;
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto
+    .Type.deleteBlocksCommand;
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto
+    .Type.reregisterCommand;
 
-import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.versionCommand;
-import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.registeredCommand;
-import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.reregisterCommand;
-import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.deleteBlocksCommand;
-import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.closeContainerCommand;
 
 
 import org.apache.hadoop.hdds.scm.HddsServerUtil;
@@ -150,96 +168,81 @@ public class SCMDatanodeProtocolServer implements
 
   @Override
   public SCMHeartbeatResponseProto sendHeartbeat(
-      HddsProtos.DatanodeDetailsProto datanodeDetails,
-      StorageContainerDatanodeProtocolProtos.SCMNodeReport nodeReport)
+      SCMHeartbeatRequestProto heartbeat)
       throws IOException {
+    // TODO: Add a heartbeat dispatcher.
+    DatanodeDetails datanodeDetails = DatanodeDetails
+        .getFromProtoBuf(heartbeat.getDatanodeDetails());
+    NodeReportProto nodeReport = heartbeat.getNodeReport();
     List<SCMCommand> commands =
         scm.getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport);
-    List<SCMCommandResponseProto> cmdResponses = new LinkedList<>();
+    List<SCMCommandProto> cmdResponses = new LinkedList<>();
     for (SCMCommand cmd : commands) {
-      cmdResponses.add(getCommandResponse(cmd, datanodeDetails.getUuid()));
+      cmdResponses.add(getCommandResponse(cmd));
     }
     return SCMHeartbeatResponseProto.newBuilder()
+        .setDatanodeUUID(datanodeDetails.getUuidString())
         .addAllCommands(cmdResponses).build();
   }
 
   @Override
-  public SCMRegisteredCmdResponseProto register(
-      HddsProtos.DatanodeDetailsProto datanodeDetails, SCMNodeReport nodeReport,
-      ContainerReportsRequestProto containerReportsRequestProto)
+  public SCMRegisteredResponseProto register(
+      HddsProtos.DatanodeDetailsProto datanodeDetailsProto,
+      NodeReportProto nodeReport,
+      ContainerReportsProto containerReportsProto)
       throws IOException {
+    DatanodeDetails datanodeDetails = DatanodeDetails
+        .getFromProtoBuf(datanodeDetailsProto);
     // TODO : Return the list of Nodes that forms the SCM HA.
-    RegisteredCommand registeredCommand = (RegisteredCommand) scm
-        .getScmNodeManager().register(datanodeDetails, nodeReport);
-    SCMCmdType type = registeredCommand.getType();
-    if (type == SCMCmdType.registeredCommand && registeredCommand.getError()
-        == SCMRegisteredCmdResponseProto.ErrorCode.success) {
-      scm.getScmContainerManager().processContainerReports(
-          containerReportsRequestProto);
+    RegisteredCommand registeredCommand = scm.getScmNodeManager()
+        .register(datanodeDetails, nodeReport);
+    if (registeredCommand.getError()
+        == SCMRegisteredResponseProto.ErrorCode.success) {
+      scm.getScmContainerManager().processContainerReports(datanodeDetails,
+          containerReportsProto);
     }
     return getRegisteredResponse(registeredCommand);
   }
 
   @VisibleForTesting
-  public static SCMRegisteredCmdResponseProto getRegisteredResponse(
-        SCMCommand cmd) {
-    Preconditions.checkState(cmd.getClass() == RegisteredCommand.class);
-    RegisteredCommand rCmd = (RegisteredCommand) cmd;
-    SCMCmdType type = cmd.getType();
-    if (type != SCMCmdType.registeredCommand) {
-      throw new IllegalArgumentException(
-          "Registered command is not well " + "formed. Internal Error.");
-    }
-    return SCMRegisteredCmdResponseProto.newBuilder()
+  public static SCMRegisteredResponseProto getRegisteredResponse(
+      RegisteredCommand cmd) {
+    return SCMRegisteredResponseProto.newBuilder()
         // TODO : Fix this later when we have multiple SCM support.
         // .setAddressList(addressList)
-        .setErrorCode(rCmd.getError())
-        .setClusterID(rCmd.getClusterID())
-        .setDatanodeUUID(rCmd.getDatanodeUUID())
+        .setErrorCode(cmd.getError())
+        .setClusterID(cmd.getClusterID())
+        .setDatanodeUUID(cmd.getDatanodeUUID())
         .build();
   }
 
-  @Override
-  public ContainerReportsResponseProto sendContainerReport(
-      ContainerReportsRequestProto reports)
+  public void processContainerReports(DatanodeDetails datanodeDetails,
+                                      ContainerReportsProto reports)
       throws IOException {
-    updateContainerReportMetrics(reports);
-
+    updateContainerReportMetrics(datanodeDetails, reports);
     // should we process container reports async?
-    scm.getScmContainerManager().processContainerReports(reports);
-    return ContainerReportsResponseProto.newBuilder().build();
+    scm.getScmContainerManager()
+        .processContainerReports(datanodeDetails, reports);
   }
 
-  private void updateContainerReportMetrics(
-      ContainerReportsRequestProto reports) {
-    ContainerStat newStat = null;
-    // TODO: We should update the logic once incremental container report
-    // type is supported.
-    if (reports
-        .getType() == StorageContainerDatanodeProtocolProtos
-        .ContainerReportsRequestProto.reportType.fullReport) {
-      newStat = new ContainerStat();
-      for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports
-          .getReportsList()) {
-        newStat.add(new ContainerStat(info.getSize(), info.getUsed(),
-            info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(),
-            info.getReadCount(), info.getWriteCount()));
-      }
-
-      // update container metrics
-      StorageContainerManager.getMetrics().setLastContainerStat(newStat);
+  private void updateContainerReportMetrics(DatanodeDetails datanodeDetails,
+                                            ContainerReportsProto reports) {
+    ContainerStat newStat = new ContainerStat();
+    for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports
+        .getReportsList()) {
+      newStat.add(new ContainerStat(info.getSize(), info.getUsed(),
+          info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(),
+          info.getReadCount(), info.getWriteCount()));
     }
+    // update container metrics
+    StorageContainerManager.getMetrics().setLastContainerStat(newStat);
 
     // Update container stat entry, this will trigger a removal operation if it
     // exists in cache.
-    synchronized (scm.getContainerReportCache()) {
-      String datanodeUuid = reports.getDatanodeDetails().getUuid();
-      if (datanodeUuid != null && newStat != null) {
-        scm.getContainerReportCache().put(datanodeUuid, newStat);
-        // update global view container metrics
-        StorageContainerManager.getMetrics().incrContainerStat(newStat);
-      }
-    }
+    String datanodeUuid = datanodeDetails.getUuidString();
+    scm.getContainerReportCache().put(datanodeUuid, newStat);
+    // update global view container metrics
+    StorageContainerManager.getMetrics().incrContainerStat(newStat);
   }
 
 
@@ -298,28 +301,15 @@ public class SCMDatanodeProtocolServer implements
    * @throws IOException
    */
   @VisibleForTesting
-  public StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto
-      getCommandResponse(
-      SCMCommand cmd, final String datanodeID) throws IOException {
-    SCMCmdType type = cmd.getType();
-    SCMCommandResponseProto.Builder builder =
-        SCMCommandResponseProto.newBuilder().setDatanodeUUID(datanodeID);
-    switch (type) {
-    case registeredCommand:
-      return builder
-          .setCmdType(registeredCommand)
-          .setRegisteredProto(SCMRegisteredCmdResponseProto
-              .getDefaultInstance())
-          .build();
-    case versionCommand:
-      return builder
-          .setCmdType(versionCommand)
-          .setVersionProto(SCMVersionResponseProto.getDefaultInstance())
-          .build();
+  public SCMCommandProto getCommandResponse(SCMCommand cmd)
+      throws IOException {
+    SCMCommandProto.Builder builder =
+        SCMCommandProto.newBuilder();
+    switch (cmd.getType()) {
     case reregisterCommand:
       return builder
-          .setCmdType(reregisterCommand)
-          .setReregisterProto(SCMReregisterCmdResponseProto
+          .setCommandType(reregisterCommand)
+          .setReregisterCommandProto(ReregisterCommandProto
               .getDefaultInstance())
           .build();
     case deleteBlocksCommand:
@@ -335,13 +325,14 @@ public class SCMDatanodeProtocolServer implements
               .collect(Collectors.toList());
       scm.getScmBlockManager().getDeletedBlockLog().incrementCount(txs);
       return builder
-          .setCmdType(deleteBlocksCommand)
-          .setDeleteBlocksProto(((DeleteBlocksCommand) cmd).getProto())
+          .setCommandType(deleteBlocksCommand)
+          .setDeleteBlocksCommandProto(((DeleteBlocksCommand) cmd).getProto())
           .build();
     case closeContainerCommand:
       return builder
-          .setCmdType(closeContainerCommand)
-          .setCloseContainerProto(((CloseContainerCommand) cmd).getProto())
+          .setCommandType(closeContainerCommand)
+          .setCloseContainerCommandProto(
+              ((CloseContainerCommand) cmd).getProto())
           .build();
     default:
       throw new IllegalArgumentException("Not implemented");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 5cf0a92..b8036d7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -18,9 +18,9 @@ package org.apache.hadoop.hdds.scm;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol
-    .proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+    .proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto
-        .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+        .StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.StorageTypeProto;
 import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
@@ -53,16 +53,17 @@ public final class TestUtils {
   public static DatanodeDetails getDatanodeDetails(SCMNodeManager nodeManager,
       String uuid) {
     DatanodeDetails datanodeDetails = getDatanodeDetails(uuid);
-    nodeManager.register(datanodeDetails.getProtoBufMessage(), null);
+    nodeManager.register(datanodeDetails, null);
     return datanodeDetails;
   }
 
   /**
    * Create Node Report object.
-   * @return SCMNodeReport
+   * @return NodeReportProto
    */
-  public static SCMNodeReport createNodeReport(List<SCMStorageReport> reports) {
-    SCMNodeReport.Builder nodeReport = SCMNodeReport.newBuilder();
+  public static NodeReportProto createNodeReport(
+      List<StorageReportProto> reports) {
+    NodeReportProto.Builder nodeReport = NodeReportProto.newBuilder();
     nodeReport.addAllStorageReport(reports);
     return nodeReport.build();
   }
@@ -71,14 +72,14 @@ public final class TestUtils {
    * Create SCM Storage Report object.
    * @return list of SCMStorageReport
    */
-  public static List<SCMStorageReport> createStorageReport(long capacity,
+  public static List<StorageReportProto> createStorageReport(long capacity,
       long used, long remaining, String path, StorageTypeProto type, String id,
       int count) {
-    List<SCMStorageReport> reportList = new ArrayList<>();
+    List<StorageReportProto> reportList = new ArrayList<>();
     for (int i = 0; i < count; i++) {
       Preconditions.checkNotNull(path);
       Preconditions.checkNotNull(id);
-      SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
+      StorageReportProto.Builder srb = StorageReportProto.newBuilder();
       srb.setStorageUuid(id).setStorageLocation(path).setCapacity(capacity)
           .setScmUsed(used).setRemaining(remaining);
       StorageTypeProto storageTypeProto =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index a46d7ba..8c59462 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -24,13 +24,14 @@ import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+    .StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
+import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.assertj.core.util.Preconditions;
 import org.mockito.Mockito;
@@ -370,13 +371,13 @@ public class MockNodeManager implements NodeManager {
    * Register the node if the node finds that it is not registered with any
    * SCM.
    *
-   * @param datanodeDetails DatanodeDetailsProto
-   * @param nodeReport SCMNodeReport
+   * @param datanodeDetails DatanodeDetails
+   * @param nodeReport NodeReportProto
    * @return SCMHeartbeatResponseProto
    */
   @Override
-  public SCMCommand register(HddsProtos.DatanodeDetailsProto datanodeDetails,
-                             SCMNodeReport nodeReport) {
+  public RegisteredCommand register(DatanodeDetails datanodeDetails,
+      NodeReportProto nodeReport) {
     return null;
   }
 
@@ -388,9 +389,8 @@ public class MockNodeManager implements NodeManager {
    * @return SCMheartbeat response list
    */
   @Override
-  public List<SCMCommand> sendHeartbeat(
-      HddsProtos.DatanodeDetailsProto datanodeDetails,
-      SCMNodeReport nodeReport) {
+  public List<SCMCommand> sendHeartbeat(DatanodeDetails datanodeDetails,
+      NodeReportProto nodeReport) {
     if ((datanodeDetails != null) && (nodeReport != null) && (nodeReport
         .getStorageReportCount() > 0)) {
       SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid());
@@ -398,8 +398,9 @@ public class MockNodeManager implements NodeManager {
       long totalCapacity = 0L;
       long totalRemaining = 0L;
       long totalScmUsed = 0L;
-      List<SCMStorageReport> storageReports = nodeReport.getStorageReportList();
-      for (SCMStorageReport report : storageReports) {
+      List<StorageReportProto> storageReports = nodeReport
+          .getStorageReportList();
+      for (StorageReportProto report : storageReports) {
         totalCapacity += report.getCapacity();
         totalRemaining += report.getRemaining();
         totalScmUsed += report.getScmUsed();
@@ -407,8 +408,7 @@ public class MockNodeManager implements NodeManager {
       aggregateStat.subtract(stat);
       stat.set(totalCapacity, totalScmUsed, totalRemaining);
       aggregateStat.add(stat);
-      nodeMetricMap.put(DatanodeDetails
-          .getFromProtoBuf(datanodeDetails).getUuid(), stat);
+      nodeMetricMap.put(datanodeDetails.getUuid(), stat);
 
     }
     return null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java
index f318316..ba2ab64 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -191,8 +191,6 @@ public class TestContainerMapping {
   public void testFullContainerReport() throws IOException {
     ContainerInfo info = createContainer();
     DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
-    ContainerReportsRequestProto.reportType reportType =
-        ContainerReportsRequestProto.reportType.fullReport;
     List<StorageContainerDatanodeProtocolProtos.ContainerInfo> reports =
         new ArrayList<>();
     StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
@@ -209,12 +207,11 @@ public class TestContainerMapping {
 
     reports.add(ciBuilder.build());
 
-    ContainerReportsRequestProto.Builder crBuilder =
-        ContainerReportsRequestProto.newBuilder();
-    crBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
-        .setType(reportType).addAllReports(reports);
+    ContainerReportsProto.Builder crBuilder = ContainerReportsProto
+        .newBuilder();
+    crBuilder.addAllReports(reports);
 
-    mapping.processContainerReports(crBuilder.build());
+    mapping.processContainerReports(datanodeDetails, crBuilder.build());
 
     ContainerInfo updatedContainer =
         mapping.getContainer(info.getContainerID());
@@ -227,8 +224,6 @@ public class TestContainerMapping {
   public void testContainerCloseWithContainerReport() throws IOException {
     ContainerInfo info = createContainer();
     DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
-    ContainerReportsRequestProto.reportType reportType =
-        ContainerReportsRequestProto.reportType.fullReport;
     List<StorageContainerDatanodeProtocolProtos.ContainerInfo> reports =
         new ArrayList<>();
 
@@ -246,12 +241,11 @@ public class TestContainerMapping {
 
     reports.add(ciBuilder.build());
 
-    ContainerReportsRequestProto.Builder crBuilder =
-        ContainerReportsRequestProto.newBuilder();
-    crBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
-        .setType(reportType).addAllReports(reports);
+    ContainerReportsProto.Builder crBuilder =
+        ContainerReportsProto.newBuilder();
+    crBuilder.addAllReports(reports);
 
-    mapping.processContainerReports(crBuilder.build());
+    mapping.processContainerReports(datanodeDetails, crBuilder.build());
 
     ContainerInfo updatedContainer =
         mapping.getContainer(info.getContainerID());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java
index 15ecbad..0a3efda 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -199,9 +199,8 @@ public class TestContainerCloser {
 
   private void sendContainerReport(ContainerInfo info, long used) throws
       IOException {
-    ContainerReportsRequestProto.Builder
-        reports =  ContainerReportsRequestProto.newBuilder();
-    reports.setType(ContainerReportsRequestProto.reportType.fullReport);
+    ContainerReportsProto.Builder
+        reports =  ContainerReportsProto.newBuilder();
 
     StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
         StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
@@ -214,9 +213,8 @@ public class TestContainerCloser {
         .setWriteCount(100000000L)
         .setReadBytes(2000000000L)
         .setWriteBytes(2000000000L);
-    reports.setDatanodeDetails(
-        TestUtils.getDatanodeDetails().getProtoBufMessage());
     reports.addReports(ciBuilder);
-    mapping.processContainerReports(reports.build());
+    mapping.processContainerReports(TestUtils.getDatanodeDetails(),
+        reports.build());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 09b6cd1..5ad28f6 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+    .StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -133,9 +133,9 @@ public class TestContainerPlacement {
       for (DatanodeDetails datanodeDetails : datanodes) {
         String id = UUID.randomUUID().toString();
         String path = testDir.getAbsolutePath() + "/" + id;
-        List<SCMStorageReport> reports = TestUtils
+        List<StorageReportProto> reports = TestUtils
             .createStorageReport(capacity, used, remaining, path, null, id, 1);
-        nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+        nodeManager.sendHeartbeat(datanodeDetails,
             TestUtils.createNodeReport(reports));
       }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
index de87e50..2b04d6b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.node;
 
 import com.google.common.base.Supplier;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
@@ -26,7 +28,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+    .StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -63,8 +65,6 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
     .HEALTHY;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCmdType;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.core.StringStartsWith.startsWith;
 import static org.junit.Assert.assertEquals;
@@ -144,7 +144,7 @@ public class TestNodeManager {
       for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) {
         DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
             nodeManager);
-        nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+        nodeManager.sendHeartbeat(datanodeDetails,
             null);
       }
 
@@ -191,8 +191,8 @@ public class TestNodeManager {
 
       // Need 100 nodes to come out of chill mode, only one node is sending HB.
       nodeManager.setMinimumChillModeNodes(100);
-      nodeManager.sendHeartbeat(TestUtils.getDatanodeDetails(nodeManager)
-          .getProtoBufMessage(), null);
+      nodeManager.sendHeartbeat(TestUtils.getDatanodeDetails(nodeManager),
+          null);
       GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
           100, 4 * 1000);
       assertFalse("Not enough heartbeat, Node manager should have" +
@@ -219,7 +219,7 @@ public class TestNodeManager {
 
       // Send 10 heartbeat from same node, and assert we never leave chill mode.
       for (int x = 0; x < 10; x++) {
-        nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+        nodeManager.sendHeartbeat(datanodeDetails,
             null);
       }
 
@@ -250,7 +250,7 @@ public class TestNodeManager {
     nodeManager.close();
 
     // These should never be processed.
-    nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+    nodeManager.sendHeartbeat(datanodeDetails,
         null);
 
     // Let us just wait for 2 seconds to prove that HBs are not processed.
@@ -274,13 +274,13 @@ public class TestNodeManager {
     DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
     String dnId = datanodeDetails.getUuidString();
     String storagePath = testDir.getAbsolutePath() + "/" + dnId;
-    List<SCMStorageReport> reports =
+    List<StorageReportProto> reports =
         TestUtils.createStorageReport(100, 10, 90, storagePath, null, dnId, 1);
     try (SCMNodeManager nodemanager = createNodeManager(conf)) {
-      nodemanager.register(datanodeDetails.getProtoBufMessage(),
+      nodemanager.register(datanodeDetails,
           TestUtils.createNodeReport(reports));
       List<SCMCommand> command = nodemanager.sendHeartbeat(
-          datanodeDetails.getProtoBufMessage(), null);
+          datanodeDetails, null);
       Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeDetails));
       Assert.assertTrue("On regular HB calls, SCM responses a "
           + "datanode with an empty command list", command.isEmpty());
@@ -298,10 +298,10 @@ public class TestNodeManager {
         GenericTestUtils.waitFor(new Supplier<Boolean>() {
           @Override public Boolean get() {
             List<SCMCommand> command =
-                nodemanager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+                nodemanager.sendHeartbeat(datanodeDetails,
                     null);
             return command.size() == 1 && command.get(0).getType()
-                .equals(SCMCmdType.reregisterCommand);
+                .equals(SCMCommandProto.Type.reregisterCommand);
           }
         }, 100, 3 * 1000);
       } catch (TimeoutException e) {
@@ -330,7 +330,7 @@ public class TestNodeManager {
       for (int x = 0; x < count; x++) {
         DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
             nodeManager);
-        nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+        nodeManager.sendHeartbeat(datanodeDetails,
             null);
       }
       GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
@@ -422,19 +422,19 @@ public class TestNodeManager {
       DatanodeDetails staleNode = TestUtils.getDatanodeDetails(nodeManager);
 
       // Heartbeat once
-      nodeManager.sendHeartbeat(staleNode.getProtoBufMessage(),
+      nodeManager.sendHeartbeat(staleNode,
           null);
 
       // Heartbeat all other nodes.
       for (DatanodeDetails dn : nodeList) {
-        nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null);
+        nodeManager.sendHeartbeat(dn, null);
       }
 
       // Wait for 2 seconds .. and heartbeat good nodes again.
       Thread.sleep(2 * 1000);
 
       for (DatanodeDetails dn : nodeList) {
-        nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null);
+        nodeManager.sendHeartbeat(dn, null);
       }
 
       // Wait for 2 seconds, wait a total of 4 seconds to make sure that the
@@ -451,7 +451,7 @@ public class TestNodeManager {
 
       // heartbeat good nodes again.
       for (DatanodeDetails dn : nodeList) {
-        nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null);
+        nodeManager.sendHeartbeat(dn, null);
       }
 
       //  6 seconds is the dead window for this test , so we wait a total of
@@ -565,11 +565,11 @@ public class TestNodeManager {
       DatanodeDetails deadNode =
           TestUtils.getDatanodeDetails(nodeManager);
       nodeManager.sendHeartbeat(
-          healthyNode.getProtoBufMessage(), null);
+          healthyNode, null);
       nodeManager.sendHeartbeat(
-          staleNode.getProtoBufMessage(), null);
+          staleNode, null);
       nodeManager.sendHeartbeat(
-          deadNode.getProtoBufMessage(), null);
+          deadNode, null);
 
       // Sleep so that heartbeat processing thread gets to run.
       Thread.sleep(500);
@@ -596,15 +596,15 @@ public class TestNodeManager {
        */
 
       nodeManager.sendHeartbeat(
-          healthyNode.getProtoBufMessage(), null);
+          healthyNode, null);
       nodeManager.sendHeartbeat(
-          staleNode.getProtoBufMessage(), null);
+          staleNode, null);
       nodeManager.sendHeartbeat(
-          deadNode.getProtoBufMessage(), null);
+          deadNode, null);
 
       Thread.sleep(1500);
       nodeManager.sendHeartbeat(
-          healthyNode.getProtoBufMessage(), null);
+          healthyNode, null);
       Thread.sleep(2 * 1000);
       assertEquals(1, nodeManager.getNodeCount(HEALTHY));
 
@@ -625,12 +625,12 @@ public class TestNodeManager {
        */
 
       nodeManager.sendHeartbeat(
-          healthyNode.getProtoBufMessage(), null);
+          healthyNode, null);
       nodeManager.sendHeartbeat(
-          staleNode.getProtoBufMessage(), null);
+          staleNode, null);
       Thread.sleep(1500);
       nodeManager.sendHeartbeat(
-          healthyNode.getProtoBufMessage(), null);
+          healthyNode, null);
       Thread.sleep(2 * 1000);
 
       // 3.5 seconds have elapsed for stale node, so it moves into Stale.
@@ -664,11 +664,11 @@ public class TestNodeManager {
        * back all the nodes in healthy state.
        */
       nodeManager.sendHeartbeat(
-          healthyNode.getProtoBufMessage(), null);
+          healthyNode, null);
       nodeManager.sendHeartbeat(
-          staleNode.getProtoBufMessage(), null);
+          staleNode, null);
       nodeManager.sendHeartbeat(
-          deadNode.getProtoBufMessage(), null);
+          deadNode, null);
       Thread.sleep(500);
       //Assert all nodes are healthy.
       assertEquals(3, nodeManager.getAllNodes().size());
@@ -689,7 +689,7 @@ public class TestNodeManager {
                                 int sleepDuration) throws InterruptedException {
     while (!Thread.currentThread().isInterrupted()) {
       for (DatanodeDetails dn : list) {
-        manager.sendHeartbeat(dn.getProtoBufMessage(), null);
+        manager.sendHeartbeat(dn, null);
       }
       Thread.sleep(sleepDuration);
     }
@@ -775,7 +775,7 @@ public class TestNodeManager {
       // No Thread just one time HBs the node manager, so that these will be
       // marked as dead nodes eventually.
       for (DatanodeDetails dn : deadNodeList) {
-        nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null);
+        nodeManager.sendHeartbeat(dn, null);
       }
 
 
@@ -940,7 +940,7 @@ public class TestNodeManager {
       DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
           nodeManager);
       nodeManager.sendHeartbeat(
-          datanodeDetails.getProtoBufMessage(), null);
+          datanodeDetails, null);
       String status = nodeManager.getChillModeStatus();
       Assert.assertThat(status, containsString("Still in chill " +
           "mode, waiting on nodes to report in."));
@@ -967,8 +967,7 @@ public class TestNodeManager {
       // Assert that node manager force enter cannot be overridden by nodes HBs.
       for (int x = 0; x < 20; x++) {
         DatanodeDetails datanode = TestUtils.getDatanodeDetails(nodeManager);
-        nodeManager.sendHeartbeat(datanode.getProtoBufMessage(),
-            null);
+        nodeManager.sendHeartbeat(datanode, null);
       }
 
       Thread.sleep(500);
@@ -1009,10 +1008,10 @@ public class TestNodeManager {
         String dnId = datanodeDetails.getUuidString();
         long free = capacity - used;
         String storagePath = testDir.getAbsolutePath() + "/" + dnId;
-        List<SCMStorageReport> reports = TestUtils
+        List<StorageReportProto> reports = TestUtils
             .createStorageReport(capacity, used, free, storagePath,
                 null, dnId, 1);
-        nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+        nodeManager.sendHeartbeat(datanodeDetails,
             TestUtils.createNodeReport(reports));
       }
       GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
@@ -1058,11 +1057,11 @@ public class TestNodeManager {
         long scmUsed = x * usedPerHeartbeat;
         long remaining = capacity - scmUsed;
         String storagePath = testDir.getAbsolutePath() + "/" + dnId;
-        List<SCMStorageReport> reports = TestUtils
+        List<StorageReportProto> reports = TestUtils
             .createStorageReport(capacity, scmUsed, remaining, storagePath,
                 null, dnId, 1);
 
-        nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+        nodeManager.sendHeartbeat(datanodeDetails,
             TestUtils.createNodeReport(reports));
         Thread.sleep(100);
       }
@@ -1140,10 +1139,10 @@ public class TestNodeManager {
 
       // Send a new report to bring the dead node back to healthy
       String storagePath = testDir.getAbsolutePath() + "/" + dnId;
-      List<SCMStorageReport> reports = TestUtils
+      List<StorageReportProto> reports = TestUtils
           .createStorageReport(capacity, expectedScmUsed, expectedRemaining,
               storagePath, null, dnId, 1);
-      nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+      nodeManager.sendHeartbeat(datanodeDetails,
           TestUtils.createNodeReport(reports));
 
       // Wait up to 5 seconds so that the dead node becomes healthy

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java
index b824412..072dee7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java
@@ -21,9 +21,9 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto.
-    StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+    StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto.
-    StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+    StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -134,7 +134,7 @@ public class TestSCMNodeStorageStatMap {
   @Test
   public void testProcessNodeReportCheckOneNode() throws IOException {
     UUID key = getFirstKey();
-    List<SCMStorageReport> reportList = new ArrayList<>();
+    List<StorageReportProto> reportList = new ArrayList<>();
     Set<StorageLocationReport> reportSet = testData.get(key);
     SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf);
     map.insertNewDatanode(key, reportSet);
@@ -146,16 +146,16 @@ public class TestSCMNodeStorageStatMap {
     long reportCapacity = report.getCapacity();
     long reportScmUsed = report.getScmUsed();
     long reportRemaining = report.getRemaining();
-    List<SCMStorageReport> reports = TestUtils
+    List<StorageReportProto> reports = TestUtils
         .createStorageReport(reportCapacity, reportScmUsed, reportRemaining,
             path, null, storageId, 1);
     StorageReportResult result =
         map.processNodeReport(key, TestUtils.createNodeReport(reports));
     Assert.assertEquals(result.getStatus(),
         SCMNodeStorageStatMap.ReportStatus.ALL_IS_WELL);
-    StorageContainerDatanodeProtocolProtos.SCMNodeReport.Builder nrb =
-        SCMNodeReport.newBuilder();
-    SCMStorageReport srb = reportSet.iterator().next().getProtoBufMessage();
+    StorageContainerDatanodeProtocolProtos.NodeReportProto.Builder nrb =
+        NodeReportProto.newBuilder();
+    StorageReportProto srb = reportSet.iterator().next().getProtoBufMessage();
     reportList.add(srb);
     result = map.processNodeReport(key, TestUtils.createNodeReport(reportList));
     Assert.assertEquals(result.getStatus(),
@@ -168,7 +168,7 @@ public class TestSCMNodeStorageStatMap {
     Assert.assertEquals(result.getStatus(),
         SCMNodeStorageStatMap.ReportStatus.STORAGE_OUT_OF_SPACE);
     // Mark a disk failed 
-    SCMStorageReport srb2 = SCMStorageReport.newBuilder()
+    StorageReportProto srb2 = StorageReportProto.newBuilder()
         .setStorageUuid(UUID.randomUUID().toString())
         .setStorageLocation(srb.getStorageLocation()).setScmUsed(reportCapacity)
         .setCapacity(reportCapacity).setRemaining(0).setFailed(true).build();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 1d92cdc..34779da 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -20,22 +20,21 @@ import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.VersionInfo;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos;
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
+    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+    .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+    .StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
 import org.apache.hadoop.ipc.RPC;
@@ -200,7 +199,7 @@ public class TestEndPoint {
     DatanodeDetails nodeToRegister = getDatanodeDetails();
     try (EndpointStateMachine rpcEndPoint = createEndpoint(
         SCMTestUtils.getConf(), serverAddress, 1000)) {
-      SCMRegisteredCmdResponseProto responseProto = rpcEndPoint.getEndPoint()
+      SCMRegisteredResponseProto responseProto = rpcEndPoint.getEndPoint()
           .register(nodeToRegister.getProtoBufMessage(), TestUtils
                   .createNodeReport(
                       getStorageReports(nodeToRegister.getUuidString())),
@@ -215,7 +214,7 @@ public class TestEndPoint {
     }
   }
 
-  private List<SCMStorageReport> getStorageReports(String id) {
+  private List<StorageReportProto> getStorageReports(String id) {
     String storagePath = testDir.getAbsolutePath() + "/" + id;
     return TestUtils.createStorageReport(100, 10, 90, storagePath, null, id, 1);
   }
@@ -293,9 +292,14 @@ public class TestEndPoint {
              createEndpoint(SCMTestUtils.getConf(),
                  serverAddress, 1000)) {
       String storageId = UUID.randomUUID().toString();
+      SCMHeartbeatRequestProto request = SCMHeartbeatRequestProto.newBuilder()
+          .setDatanodeDetails(dataNode.getProtoBufMessage())
+          .setNodeReport(TestUtils.createNodeReport(
+              getStorageReports(storageId)))
+          .build();
+
       SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
-          .sendHeartbeat(dataNode.getProtoBufMessage(),
-              TestUtils.createNodeReport(getStorageReports(storageId)));
+          .sendHeartbeat(request);
       Assert.assertNotNull(responseProto);
       Assert.assertEquals(0, responseProto.getCommandsCount());
     }
@@ -361,86 +365,11 @@ public class TestEndPoint {
         lessThanOrEqualTo(rpcTimeout + tolerance));
   }
 
-  /**
-   * Returns a new container report.
-   * @return
-   */
-  ContainerReport getRandomContainerReport() {
-    return new ContainerReport(RandomUtils.nextLong(),
-        DigestUtils.sha256Hex("Random"));
-  }
-
-  /**
-   * Creates dummy container reports.
-   * @param count - The number of closed containers to create.
-   * @return ContainerReportsProto
-   */
-  StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto
-      createDummyContainerReports(int count) {
-    StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder
-        reportsBuilder = StorageContainerDatanodeProtocolProtos
-        .ContainerReportsRequestProto.newBuilder();
-    for (int x = 0; x < count; x++) {
-      reportsBuilder.addReports(getRandomContainerReport()
-          .getProtoBufMessage());
-    }
-    reportsBuilder.setDatanodeDetails(getDatanodeDetails()
-        .getProtoBufMessage());
-    reportsBuilder.setType(StorageContainerDatanodeProtocolProtos
-        .ContainerReportsRequestProto.reportType.fullReport);
-    return reportsBuilder.build();
-  }
-
-  /**
-   * Tests that rpcEndpoint sendContainerReport works as expected.
-   * @throws Exception
-   */
-  @Test
-  public void testContainerReportSend() throws Exception {
-    final int count = 1000;
-    scmServerImpl.reset();
-    try (EndpointStateMachine rpcEndPoint =
-             createEndpoint(SCMTestUtils.getConf(),
-                 serverAddress, 1000)) {
-      ContainerReportsResponseProto responseProto = rpcEndPoint
-          .getEndPoint().sendContainerReport(createDummyContainerReports(
-              count));
-      Assert.assertNotNull(responseProto);
-    }
-    Assert.assertEquals(1, scmServerImpl.getContainerReportsCount());
-    Assert.assertEquals(count, scmServerImpl.getContainerCount());
-  }
-
-
-  /**
-   * Tests that rpcEndpoint sendContainerReport works as expected.
-   * @throws Exception
-   */
-  @Test
-  public void testContainerReport() throws Exception {
-    final int count = 1000;
-    scmServerImpl.reset();
-    try (EndpointStateMachine rpcEndPoint =
-             createEndpoint(SCMTestUtils.getConf(),
-                 serverAddress, 1000)) {
-      ContainerReportsResponseProto responseProto = rpcEndPoint
-          .getEndPoint().sendContainerReport(createContainerReport(count,
-              null));
-      Assert.assertNotNull(responseProto);
-    }
-    Assert.assertEquals(1, scmServerImpl.getContainerReportsCount());
-    Assert.assertEquals(count, scmServerImpl.getContainerCount());
-    final long expectedKeyCount = count * 1000;
-    Assert.assertEquals(expectedKeyCount, scmServerImpl.getKeyCount());
-    final long expectedBytesUsed = count * OzoneConsts.GB * 2;
-    Assert.assertEquals(expectedBytesUsed, scmServerImpl.getBytesUsed());
-  }
-
-  private ContainerReportsRequestProto createContainerReport(
+  private ContainerReportsProto createContainerReport(
       int count, DatanodeDetails datanodeDetails) {
-    StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder
+    StorageContainerDatanodeProtocolProtos.ContainerReportsProto.Builder
         reportsBuilder = StorageContainerDatanodeProtocolProtos
-        .ContainerReportsRequestProto.newBuilder();
+        .ContainerReportsProto.newBuilder();
     for (int x = 0; x < count; x++) {
       long containerID = RandomUtils.nextLong();
       ContainerReport report = new ContainerReport(containerID,
@@ -455,14 +384,6 @@ public class TestEndPoint {
 
       reportsBuilder.addReports(report.getProtoBufMessage());
     }
-    if(datanodeDetails == null) {
-      reportsBuilder.setDatanodeDetails(getDatanodeDetails()
-          .getProtoBufMessage());
-    } else {
-      reportsBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage());
-    }
-    reportsBuilder.setType(StorageContainerDatanodeProtocolProtos
-        .ContainerReportsRequestProto.reportType.fullReport);
     return reportsBuilder.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java
deleted file mode 100644
index e197886..0000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.container.replication.ContainerSupervisor;
-import org.apache.hadoop.hdds.scm.container.replication.InProgressPool;
-import org.apache.hadoop.hdds.scm.node.CommandQueue;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.node.NodePoolManager;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.ozone.container.common.SCMTestUtils;
-import org.apache.hadoop.ozone.container.testutils
-    .ReplicationDatanodeStateManager;
-import org.apache.hadoop.ozone.container.testutils.ReplicationNodeManagerMock;
-import org.apache.hadoop.ozone.container.testutils
-    .ReplicationNodePoolManagerMock;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.event.Level;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
-    .HEALTHY;
-import static org.apache.ratis.shaded.com.google.common.util.concurrent
-    .Uninterruptibles.sleepUninterruptibly;
-
-/**
- * Tests for the container manager.
- */
-public class TestContainerSupervisor {
-  final static String POOL_NAME_TEMPLATE = "Pool%d";
-  static final int MAX_DATANODES = 72;
-  static final int POOL_SIZE = 24;
-  static final int POOL_COUNT = 3;
-  private LogCapturer logCapturer = LogCapturer.captureLogs(
-      LogFactory.getLog(ContainerSupervisor.class));
-  private List<DatanodeDetails> datanodes = new LinkedList<>();
-  private NodeManager nodeManager;
-  private NodePoolManager poolManager;
-  private CommandQueue commandQueue;
-  private ContainerSupervisor containerSupervisor;
-  private ReplicationDatanodeStateManager datanodeStateManager;
-
-  @After
-  public void tearDown() throws Exception {
-    logCapturer.stopCapturing();
-    GenericTestUtils.setLogLevel(ContainerSupervisor.LOG, Level.INFO);
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    GenericTestUtils.setLogLevel(ContainerSupervisor.LOG, Level.DEBUG);
-    Map<DatanodeDetails, NodeState> nodeStateMap = new HashMap<>();
-    // We are setting up 3 pools with 24 nodes each in this cluster.
-    // First we create 72 Datanodes.
-    for (int x = 0; x < MAX_DATANODES; x++) {
-      DatanodeDetails datanode = TestUtils.getDatanodeDetails();
-      datanodes.add(datanode);
-      nodeStateMap.put(datanode, HEALTHY);
-    }
-
-    commandQueue = new CommandQueue();
-
-    // All nodes in this cluster are healthy for time being.
-    nodeManager = new ReplicationNodeManagerMock(nodeStateMap, commandQueue);
-    poolManager = new ReplicationNodePoolManagerMock();
-
-
-    Assert.assertEquals("Max datanodes should be equal to POOL_SIZE * " +
-        "POOL_COUNT", POOL_COUNT * POOL_SIZE, MAX_DATANODES);
-
-    // Start from 1 instead of zero so we can multiply and get the node index.
-    for (int y = 1; y <= POOL_COUNT; y++) {
-      String poolName = String.format(POOL_NAME_TEMPLATE, y);
-      for (int z = 0; z < POOL_SIZE; z++) {
-        DatanodeDetails id = datanodes.get(y * z);
-        poolManager.addNode(poolName, id);
-      }
-    }
-    OzoneConfiguration config = SCMTestUtils.getOzoneConf();
-    config.setTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT, 2,
-        TimeUnit.SECONDS);
-    config.setTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL, 1,
-        TimeUnit.SECONDS);
-    containerSupervisor = new ContainerSupervisor(config,
-        nodeManager, poolManager);
-    datanodeStateManager = new ReplicationDatanodeStateManager(nodeManager,
-        poolManager);
-    // Sleep for one second to make sure all threads get time to run.
-    sleepUninterruptibly(1, TimeUnit.SECONDS);
-  }
-
-  @Test
-  /**
-   * Asserts that at least one pool is picked up for processing.
-   */
-  public void testAssertPoolsAreProcessed() {
-    // This asserts that replication manager has started processing at least
-    // one pool.
-    Assert.assertTrue(containerSupervisor.getInProgressPoolCount() > 0);
-
-    // Since all datanodes are flagged as healthy in this test, for each
-    // datanode we must have queued a command.
-    Assert.assertEquals("Commands are in queue :",
-        POOL_SIZE * containerSupervisor.getInProgressPoolCount(),
-        commandQueue.getCommandsInQueue());
-  }
-
-  @Test
-  /**
-   * This test sends container reports for 2 containers to a pool in progress.
-   * Asserts that we are able to find a container with single replica and do
-   * not find container with 3 replicas.
-   */
-  public void testDetectSingleContainerReplica() throws TimeoutException,
-      InterruptedException {
-    long singleNodeContainerID = 9001;
-    long threeNodeContainerID = 9003;
-    InProgressPool ppool = containerSupervisor.getInProcessPoolList().get(0);
-    // Only single datanode reporting that "SingleNodeContainer" exists.
-    List<ContainerReportsRequestProto> clist =
-        datanodeStateManager.getContainerReport(singleNodeContainerID,
-            ppool.getPool().getPoolName(), 1);
-    ppool.handleContainerReport(clist.get(0));
-
-    // Three nodes are going to report that ThreeNodeContainer  exists.
-    clist = datanodeStateManager.getContainerReport(threeNodeContainerID,
-        ppool.getPool().getPoolName(), 3);
-
-    for (ContainerReportsRequestProto reportsProto : clist) {
-      ppool.handleContainerReport(reportsProto);
-    }
-    GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() == 4,
-        200, 1000);
-    ppool.setDoneProcessing();
-
-    List<Map.Entry<Long, Integer>> containers = ppool.filterContainer(p -> p
-        .getValue() == 1);
-    Assert.assertEquals(singleNodeContainerID,
-        containers.get(0).getKey().longValue());
-    int count = containers.get(0).getValue();
-    Assert.assertEquals(1L, count);
-  }
-
-  @Test
-  /**
-   * We create three containers, Normal,OveReplicated and WayOverReplicated
-   * containers. This test asserts that we are able to find the
-   * over replicated containers.
-   */
-  public void testDetectOverReplica() throws TimeoutException,
-      InterruptedException {
-    long normalContainerID = 9000;
-    long overReplicatedContainerID = 9001;
-    long wayOverReplicatedContainerID = 9002;
-    InProgressPool ppool = containerSupervisor.getInProcessPoolList().get(0);
-
-    List<ContainerReportsRequestProto> clist =
-        datanodeStateManager.getContainerReport(normalContainerID,
-            ppool.getPool().getPoolName(), 3);
-    ppool.handleContainerReport(clist.get(0));
-
-    clist = datanodeStateManager.getContainerReport(overReplicatedContainerID,
-        ppool.getPool().getPoolName(), 4);
-
-    for (ContainerReportsRequestProto reportsProto : clist) {
-      ppool.handleContainerReport(reportsProto);
-    }
-
-    clist = datanodeStateManager.getContainerReport(
-        wayOverReplicatedContainerID, ppool.getPool().getPoolName(), 7);
-
-    for (ContainerReportsRequestProto reportsProto : clist) {
-      ppool.handleContainerReport(reportsProto);
-    }
-
-    // We ignore container reports from the same datanodes.
-    // it is possible that these each of these containers get placed
-    // on same datanodes, so allowing for 4 duplicates in the set of 14.
-    GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() > 10,
-        200, 1000);
-    ppool.setDoneProcessing();
-
-    List<Map.Entry<Long, Integer>> containers = ppool.filterContainer(p -> p
-        .getValue() > 3);
-    Assert.assertEquals(2, containers.size());
-  }
-
-  @Test
-  /**
-   * This test verifies that all pools are picked up for replica processing.
-   *
-   */
-  public void testAllPoolsAreProcessed() throws TimeoutException,
-      InterruptedException {
-    // Verify that we saw all three pools being picked up for processing.
-    GenericTestUtils.waitFor(() -> containerSupervisor.getPoolProcessCount()
-        >= 3, 200, 15 * 1000);
-    Assert.assertTrue(logCapturer.getOutput().contains("Pool1") &&
-        logCapturer.getOutput().contains("Pool2") &&
-        logCapturer.getOutput().contains("Pool3"));
-  }
-
-  @Test
-  /**
-   * Adds a new pool and tests that we are able to pick up that new pool for
-   * processing as well as handle container reports for datanodes in that pool.
-   * @throws TimeoutException
-   * @throws InterruptedException
-   */
-  public void testAddingNewPoolWorks()
-      throws TimeoutException, InterruptedException, IOException {
-    LogCapturer inProgressLog = LogCapturer.captureLogs(
-        LogFactory.getLog(InProgressPool.class));
-    GenericTestUtils.setLogLevel(InProgressPool.LOG, Level.DEBUG);
-    try {
-      DatanodeDetails id = TestUtils.getDatanodeDetails();
-      ((ReplicationNodeManagerMock) (nodeManager)).addNode(id, HEALTHY);
-      poolManager.addNode("PoolNew", id);
-      GenericTestUtils.waitFor(() ->
-              logCapturer.getOutput().contains("PoolNew"),
-          200, 15 * 1000);
-
-      long newContainerID = 7001;
-      // Assert that we are able to send a container report to this new
-      // pool and datanode.
-      List<ContainerReportsRequestProto> clist =
-          datanodeStateManager.getContainerReport(newContainerID,
-              "PoolNew", 1);
-      containerSupervisor.handleContainerReport(clist.get(0));
-      GenericTestUtils.waitFor(() ->
-          inProgressLog.getOutput()
-              .contains(Long.toString(newContainerID)) && inProgressLog
-              .getOutput().contains(id.getUuidString()),
-          200, 10 * 1000);
-    } finally {
-      inProgressLog.stopCapturing();
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org