You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2018/07/20 10:32:47 UTC

[15/50] hadoop git commit: HDDS-187. Command status publisher for datanode. Contributed by Ajay Kumar.

HDDS-187. Command status publisher for datanode.
Contributed by Ajay Kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f89e2659
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f89e2659
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f89e2659

Branch: refs/heads/HDFS-13572
Commit: f89e265905f39c8e51263a3946a8b8e6ab4ebad9
Parents: 87eeb26
Author: Anu Engineer <ae...@apache.org>
Authored: Thu Jul 12 21:34:32 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Thu Jul 12 21:35:12 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdds/HddsConfigKeys.java  |   8 +
 .../org/apache/hadoop/hdds/HddsIdFactory.java   |  53 ++++++
 .../common/src/main/resources/ozone-default.xml |   9 +
 .../apache/hadoop/utils/TestHddsIdFactory.java  |  77 +++++++++
 .../report/CommandStatusReportPublisher.java    |  71 ++++++++
 .../common/report/ReportPublisher.java          |   9 +
 .../common/report/ReportPublisherFactory.java   |   4 +
 .../statemachine/DatanodeStateMachine.java      |   2 +
 .../common/statemachine/StateContext.java       |  70 ++++++++
 .../CloseContainerCommandHandler.java           |   5 +-
 .../commandhandler/CommandHandler.java          |  11 ++
 .../DeleteBlocksCommandHandler.java             | 166 ++++++++++---------
 .../ReplicateContainerCommandHandler.java       |   7 +-
 .../commands/CloseContainerCommand.java         |  36 ++--
 .../ozone/protocol/commands/CommandStatus.java  | 141 ++++++++++++++++
 .../protocol/commands/DeleteBlocksCommand.java  |  13 +-
 .../commands/ReplicateContainerCommand.java     |  20 ++-
 .../protocol/commands/ReregisterCommand.java    |  10 ++
 .../ozone/protocol/commands/SCMCommand.java     |  19 +++
 .../StorageContainerDatanodeProtocol.proto      |  21 +++
 .../ozone/container/common/ScmTestMock.java     |  33 +++-
 .../common/report/TestReportPublisher.java      |  75 ++++++++-
 .../hadoop/hdds/scm/events/SCMEvents.java       |  57 ++++---
 .../server/SCMDatanodeHeartbeatDispatcher.java  |  23 ++-
 .../TestSCMDatanodeHeartbeatDispatcher.java     |  25 ++-
 .../ozone/container/common/TestEndPoint.java    | 111 ++++++++++++-
 26 files changed, 935 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index dec2c1c..8b449fb 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -17,7 +17,15 @@
  */
 package org.apache.hadoop.hdds;
 
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+/**
+ * Config class for HDDS.
+ */
 public final class HddsConfigKeys {
   private HddsConfigKeys() {
   }
+  public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL =
+      "hdds.command.status.report.interval";
+  public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT =
+      ScmConfigKeys.OZONE_SCM_HEARBEAT_INTERVAL_DEFAULT;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsIdFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsIdFactory.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsIdFactory.java
new file mode 100644
index 0000000..b244b8c
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsIdFactory.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * HDDS Id generator.
+ */
+public final class HddsIdFactory {
+  private HddsIdFactory() {
+  }
+
+  private static final AtomicLong LONG_COUNTER = new AtomicLong(
+      System.currentTimeMillis());
+
+  /**
+   * Returns an incrementing long. This class doesn't
+   * persist initial value for long Id's, so incremental id's after restart
+   * may collide with previously generated Id's.
+   *
+   * @return long
+   */
+  public static long getLongId() {
+    return LONG_COUNTER.incrementAndGet();
+  }
+
+  /**
+   * Returns a uuid.
+   *
+   * @return UUID.
+   */
+  public static UUID getUUId() {
+    return UUID.randomUUID();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index d5ce9e6..1b6fb33 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1061,4 +1061,13 @@
     </description>
   </property>
 
+  <property>
+    <name>hdds.command.status.report.interval</name>
+    <value>30s</value>
+    <tag>OZONE, DATANODE, MANAGEMENT</tag>
+    <description>Time interval of the datanode to send status of commands
+      executed since last report. Unit could be defined with
+      postfix (ns,ms,s,m,h,d)</description>
+  </property>
+
 </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestHddsIdFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestHddsIdFactory.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestHddsIdFactory.java
new file mode 100644
index 0000000..a341ccc
--- /dev/null
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestHddsIdFactory.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.hadoop.hdds.HddsIdFactory;
+import org.junit.After;
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test the JMX interface for the rocksdb metastore implementation.
+ */
+public class TestHddsIdFactory {
+
+  private static final Set<Long> ID_SET = ConcurrentHashMap.newKeySet();
+  private static final int IDS_PER_THREAD = 10000;
+  private static final int NUM_OF_THREADS = 5;
+
+  @After
+  public void cleanup() {
+    ID_SET.clear();
+  }
+
+  @Test
+  public void testGetLongId() throws Exception {
+
+    ExecutorService executor = Executors.newFixedThreadPool(5);
+    List<Callable<Integer>> tasks = new ArrayList<>(5);
+    addTasks(tasks);
+    List<Future<Integer>> result = executor.invokeAll(tasks);
+    assertEquals(IDS_PER_THREAD * NUM_OF_THREADS, ID_SET.size());
+    for (Future<Integer> r : result) {
+      assertEquals(r.get().intValue(), IDS_PER_THREAD);
+    }
+  }
+
+  private void addTasks(List<Callable<Integer>> tasks) {
+    for (int i = 0; i < NUM_OF_THREADS; i++) {
+      Callable<Integer> task = () -> {
+        for (int idNum = 0; idNum < IDS_PER_THREAD; idNum++) {
+          long var = HddsIdFactory.getLongId();
+          if (ID_SET.contains(var)) {
+            Assert.fail("Duplicate id found");
+          }
+          ID_SET.add(var);
+        }
+        return IDS_PER_THREAD;
+      };
+      tasks.add(task);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
new file mode 100644
index 0000000..ca5174a
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.report;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
+
+/**
+ * Publishes CommandStatusReport which will be sent to SCM as part of
+ * heartbeat. CommandStatusReport consist of the following information:
+ * - type       : type of command.
+ * - status     : status of command execution (PENDING, EXECUTED, FAILURE).
+ * - cmdId      : Command id.
+ * - msg        : optional message.
+ */
+public class CommandStatusReportPublisher extends
+    ReportPublisher<CommandStatusReportsProto> {
+
+  private long cmdStatusReportInterval = -1;
+
+  @Override
+  protected long getReportFrequency() {
+    if (cmdStatusReportInterval == -1) {
+      cmdStatusReportInterval = getConf().getTimeDuration(
+          HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL,
+          HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT,
+          TimeUnit.MILLISECONDS);
+    }
+    return cmdStatusReportInterval;
+  }
+
+  @Override
+  protected CommandStatusReportsProto getReport() {
+    Map<Long, CommandStatus> map = this.getContext()
+        .getCommandStatusMap();
+    Iterator<Long> iterator = map.keySet().iterator();
+    CommandStatusReportsProto.Builder builder = CommandStatusReportsProto
+        .newBuilder();
+
+    iterator.forEachRemaining(key -> {
+      CommandStatus cmdStatus = map.get(key);
+      builder.addCmdStatus(cmdStatus.getProtoBufMessage());
+      // If status is still pending then don't remove it from map as
+      // CommandHandler will change its status when it works on this command.
+      if (!cmdStatus.getStatus().equals(Status.PENDING)) {
+        map.remove(key);
+      }
+    });
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
index 4ff47a0..105f073 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
@@ -93,4 +93,13 @@ public abstract class ReportPublisher<T extends GeneratedMessage>
    */
   protected abstract T getReport();
 
+  /**
+   * Returns {@link StateContext}.
+   *
+   * @return stateContext report
+   */
+  protected StateContext getContext() {
+    return context;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
index dc246d9..ea89280 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.container.common.report;
 
 import com.google.protobuf.GeneratedMessage;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -49,6 +51,8 @@ public class ReportPublisherFactory {
     report2publisher.put(NodeReportProto.class, NodeReportPublisher.class);
     report2publisher.put(ContainerReportsProto.class,
         ContainerReportPublisher.class);
+    report2publisher.put(CommandStatusReportsProto.class,
+        CommandStatusReportPublisher.class);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 245d76f..69a243e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -21,6 +21,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -107,6 +108,7 @@ public class DatanodeStateMachine implements Closeable {
         .setStateContext(context)
         .addPublisherFor(NodeReportProto.class)
         .addPublisherFor(ContainerReportsProto.class)
+        .addPublisherFor(CommandStatusReportsProto.class)
         .build();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 98eb7a0..7ed30f8 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -17,12 +17,17 @@
 package org.apache.hadoop.ozone.container.common.statemachine;
 
 import com.google.protobuf.GeneratedMessage;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
 import org.apache.hadoop.ozone.container.common.states.DatanodeState;
 import org.apache.hadoop.ozone.container.common.states.datanode
     .InitDatanodeState;
 import org.apache.hadoop.ozone.container.common.states.datanode
     .RunningDatanodeState;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus.CommandStatusBuilder;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +53,7 @@ public class StateContext {
   static final Logger LOG =
       LoggerFactory.getLogger(StateContext.class);
   private final Queue<SCMCommand> commandQueue;
+  private final Map<Long, CommandStatus> cmdStatusMap;
   private final Lock lock;
   private final DatanodeStateMachine parent;
   private final AtomicLong stateExecutionCount;
@@ -68,6 +74,7 @@ public class StateContext {
     this.state = state;
     this.parent = parent;
     commandQueue = new LinkedList<>();
+    cmdStatusMap = new ConcurrentHashMap<>();
     reports = new LinkedList<>();
     lock = new ReentrantLock();
     stateExecutionCount = new AtomicLong(0);
@@ -269,6 +276,7 @@ public class StateContext {
     } finally {
       lock.unlock();
     }
+    this.addCmdStatus(command);
   }
 
   /**
@@ -279,4 +287,66 @@ public class StateContext {
     return stateExecutionCount.get();
   }
 
+  /**
+   * Returns the next {@link CommandStatus} or null if it is empty.
+   *
+   * @return {@link CommandStatus} or Null.
+   */
+  public CommandStatus getCmdStatus(Long key) {
+    return cmdStatusMap.get(key);
+  }
+
+  /**
+   * Adds a {@link CommandStatus} to the State Machine.
+   *
+   * @param status - {@link CommandStatus}.
+   */
+  public void addCmdStatus(Long key, CommandStatus status) {
+    cmdStatusMap.put(key, status);
+  }
+
+  /**
+   * Adds a {@link CommandStatus} to the State Machine for given SCMCommand.
+   *
+   * @param cmd - {@link SCMCommand}.
+   */
+  public void addCmdStatus(SCMCommand cmd) {
+    this.addCmdStatus(cmd.getCmdId(),
+        CommandStatusBuilder.newBuilder()
+            .setCmdId(cmd.getCmdId())
+            .setStatus(Status.PENDING)
+            .setType(cmd.getType())
+            .build());
+  }
+
+  /**
+   * Get map holding all {@link CommandStatus} objects.
+   *
+   */
+  public Map<Long, CommandStatus> getCommandStatusMap() {
+    return cmdStatusMap;
+  }
+
+  /**
+   * Remove object from cache in StateContext#cmdStatusMap.
+   *
+   */
+  public void removeCommandStatus(Long cmdId) {
+    cmdStatusMap.remove(cmdId);
+  }
+
+  /**
+   * Updates status of a pending status command.
+   * @param cmdId       command id
+   * @param cmdExecuted SCMCommand
+   * @return true if command status updated successfully else false.
+   */
+  public boolean updateCommandStatus(Long cmdId, boolean cmdExecuted) {
+    if(cmdStatusMap.containsKey(cmdId)) {
+      cmdStatusMap.get(cmdId)
+          .setStatus(cmdExecuted ? Status.EXECUTED : Status.FAILED);
+      return true;
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index 45f2bbd..f58cbae 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -41,6 +41,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
       LoggerFactory.getLogger(CloseContainerCommandHandler.class);
   private int invocationCount;
   private long totalTime;
+  private boolean cmdExecuted;
 
   /**
    * Constructs a ContainerReport handler.
@@ -61,6 +62,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
       StateContext context, SCMConnectionManager connectionManager) {
     LOG.debug("Processing Close Container command.");
     invocationCount++;
+    cmdExecuted = false;
     long startTime = Time.monotonicNow();
     // TODO: define this as INVALID_CONTAINER_ID in HddsConsts.java (TBA)
     long containerID = -1;
@@ -88,10 +90,11 @@ public class CloseContainerCommandHandler implements CommandHandler {
       // submit the close container request for the XceiverServer to handle
       container.submitContainerRequest(
           request.build(), replicationType);
-
+      cmdExecuted = true;
     } catch (Exception e) {
       LOG.error("Can't close container " + containerID, e);
     } finally {
+      updateCommandStatus(context, command, cmdExecuted, LOG);
       long endTime = Time.monotonicNow();
       totalTime += endTime - startTime;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
index 60e2dc4..2016419 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.ozone.container.common.statemachine
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
 
 /**
  * Generic interface for handlers.
@@ -58,4 +59,14 @@ public interface CommandHandler {
    */
   long getAverageRunTime();
 
+  /**
+   * Default implementation for updating command status.
+   */
+  default void updateCommandStatus(StateContext context, SCMCommand command,
+      boolean cmdExecuted, Logger log) {
+    if (!context.updateCommandStatus(command.getCmdId(), cmdExecuted)) {
+      log.debug("{} with cmdId:{} not found.", command.getType(),
+          command.getCmdId());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index c3d1596..9640f93 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -21,7 +21,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
@@ -54,7 +55,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.List;
 
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Result.CONTAINER_NOT_FOUND;
 
 /**
  * Handle block deletion commands.
@@ -68,6 +70,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
   private final Configuration conf;
   private int invocationCount;
   private long totalTime;
+  private boolean cmdExecuted;
 
   public DeleteBlocksCommandHandler(ContainerSet cset,
       Configuration conf) {
@@ -78,93 +81,98 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
   @Override
   public void handle(SCMCommand command, OzoneContainer container,
       StateContext context, SCMConnectionManager connectionManager) {
-    if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) {
-      LOG.warn("Skipping handling command, expected command "
-              + "type {} but found {}",
-          SCMCommandProto.Type.deleteBlocksCommand, command.getType());
-      return;
-    }
-    LOG.debug("Processing block deletion command.");
-    invocationCount++;
+    cmdExecuted = false;
     long startTime = Time.monotonicNow();
-
-    // move blocks to deleting state.
-    // this is a metadata update, the actual deletion happens in another
-    // recycling thread.
-    DeleteBlocksCommand cmd = (DeleteBlocksCommand) command;
-    List<DeletedBlocksTransaction> containerBlocks = cmd.blocksTobeDeleted();
-
-
-    DeletedContainerBlocksSummary summary =
-        DeletedContainerBlocksSummary.getFrom(containerBlocks);
-    LOG.info("Start to delete container blocks, TXIDs={}, "
-            + "numOfContainers={}, numOfBlocks={}",
-        summary.getTxIDSummary(),
-        summary.getNumOfContainers(),
-        summary.getNumOfBlocks());
-
-    ContainerBlocksDeletionACKProto.Builder resultBuilder =
-        ContainerBlocksDeletionACKProto.newBuilder();
-    containerBlocks.forEach(entry -> {
-      DeleteBlockTransactionResult.Builder txResultBuilder =
-          DeleteBlockTransactionResult.newBuilder();
-      txResultBuilder.setTxID(entry.getTxID());
-      try {
-        long containerId = entry.getContainerID();
-        Container cont = containerSet.getContainer(containerId);
-        if(cont == null) {
-          throw new StorageContainerException("Unable to find the container "
-              + containerId, CONTAINER_NOT_FOUND);
-        }
-        ContainerProtos.ContainerType containerType = cont.getContainerType();
-        switch (containerType) {
-        case KeyValueContainer:
-          KeyValueContainerData containerData = (KeyValueContainerData)
-              cont.getContainerData();
-          deleteKeyValueContainerBlocks(containerData, entry);
-          txResultBuilder.setSuccess(true);
-          break;
-        default:
-          LOG.error("Delete Blocks Command Handler is not implemented for " +
-              "containerType {}", containerType);
-        }
-      } catch (IOException e) {
-        LOG.warn("Failed to delete blocks for container={}, TXID={}",
-            entry.getContainerID(), entry.getTxID(), e);
-        txResultBuilder.setSuccess(false);
+    try {
+      if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) {
+        LOG.warn("Skipping handling command, expected command "
+                + "type {} but found {}",
+            SCMCommandProto.Type.deleteBlocksCommand, command.getType());
+        return;
       }
-      resultBuilder.addResults(txResultBuilder.build());
-    });
-    ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build();
-
-    // Send ACK back to SCM as long as meta updated
-    // TODO Or we should wait until the blocks are actually deleted?
-    if (!containerBlocks.isEmpty()) {
-      for (EndpointStateMachine endPoint : connectionManager.getValues()) {
+      LOG.debug("Processing block deletion command.");
+      invocationCount++;
+
+      // move blocks to deleting state.
+      // this is a metadata update, the actual deletion happens in another
+      // recycling thread.
+      DeleteBlocksCommand cmd = (DeleteBlocksCommand) command;
+      List<DeletedBlocksTransaction> containerBlocks = cmd.blocksTobeDeleted();
+
+      DeletedContainerBlocksSummary summary =
+          DeletedContainerBlocksSummary.getFrom(containerBlocks);
+      LOG.info("Start to delete container blocks, TXIDs={}, "
+              + "numOfContainers={}, numOfBlocks={}",
+          summary.getTxIDSummary(),
+          summary.getNumOfContainers(),
+          summary.getNumOfBlocks());
+
+      ContainerBlocksDeletionACKProto.Builder resultBuilder =
+          ContainerBlocksDeletionACKProto.newBuilder();
+      containerBlocks.forEach(entry -> {
+        DeleteBlockTransactionResult.Builder txResultBuilder =
+            DeleteBlockTransactionResult.newBuilder();
+        txResultBuilder.setTxID(entry.getTxID());
         try {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Sending following block deletion ACK to SCM");
-            for (DeleteBlockTransactionResult result :
-                blockDeletionACK.getResultsList()) {
-              LOG.debug(result.getTxID() + " : " + result.getSuccess());
-            }
+          long containerId = entry.getContainerID();
+          Container cont = containerSet.getContainer(containerId);
+          if (cont == null) {
+            throw new StorageContainerException("Unable to find the container "
+                + containerId, CONTAINER_NOT_FOUND);
+          }
+          ContainerProtos.ContainerType containerType = cont.getContainerType();
+          switch (containerType) {
+          case KeyValueContainer:
+            KeyValueContainerData containerData = (KeyValueContainerData)
+                cont.getContainerData();
+            deleteKeyValueContainerBlocks(containerData, entry);
+            txResultBuilder.setSuccess(true);
+            break;
+          default:
+            LOG.error(
+                "Delete Blocks Command Handler is not implemented for " +
+                    "containerType {}", containerType);
           }
-          endPoint.getEndPoint()
-              .sendContainerBlocksDeletionACK(blockDeletionACK);
         } catch (IOException e) {
-          LOG.error("Unable to send block deletion ACK to SCM {}",
-              endPoint.getAddress().toString(), e);
+          LOG.warn("Failed to delete blocks for container={}, TXID={}",
+              entry.getContainerID(), entry.getTxID(), e);
+          txResultBuilder.setSuccess(false);
+        }
+        resultBuilder.addResults(txResultBuilder.build());
+      });
+      ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build();
+
+      // Send ACK back to SCM as long as meta updated
+      // TODO Or we should wait until the blocks are actually deleted?
+      if (!containerBlocks.isEmpty()) {
+        for (EndpointStateMachine endPoint : connectionManager.getValues()) {
+          try {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Sending following block deletion ACK to SCM");
+              for (DeleteBlockTransactionResult result :
+                  blockDeletionACK.getResultsList()) {
+                LOG.debug(result.getTxID() + " : " + result.getSuccess());
+              }
+            }
+            endPoint.getEndPoint()
+                .sendContainerBlocksDeletionACK(blockDeletionACK);
+          } catch (IOException e) {
+            LOG.error("Unable to send block deletion ACK to SCM {}",
+                endPoint.getAddress().toString(), e);
+          }
         }
       }
+      cmdExecuted = true;
+    } finally {
+      updateCommandStatus(context, command, cmdExecuted, LOG);
+      long endTime = Time.monotonicNow();
+      totalTime += endTime - startTime;
     }
-
-    long endTime = Time.monotonicNow();
-    totalTime += endTime - startTime;
   }
 
   /**
-   * Move a bunch of blocks from a container to deleting state.
-   * This is a meta update, the actual deletes happen in async mode.
+   * Move a bunch of blocks from a container to deleting state. This is a meta
+   * update, the actual deletes happen in async mode.
    *
    * @param containerData - KeyValueContainerData
    * @param delTX a block deletion transaction.
@@ -222,7 +230,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
         }
       } else {
         LOG.debug("Block {} not found or already under deletion in"
-                + " container {}, skip deleting it.", blk, containerId);
+            + " container {}, skip deleting it.", blk, containerId);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
index b4e83b7..fe1d4e8 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
@@ -39,12 +39,17 @@ public class ReplicateContainerCommandHandler implements CommandHandler {
   private int invocationCount;
 
   private long totalTime;
+  private boolean cmdExecuted;
 
   @Override
   public void handle(SCMCommand command, OzoneContainer container,
       StateContext context, SCMConnectionManager connectionManager) {
     LOG.warn("Replicate command is not yet handled");
-
+    try {
+      cmdExecuted = true;
+    } finally {
+      updateCommandStatus(context, command, cmdExecuted, LOG);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
index c7d8df5..6b7c22c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
@@ -1,19 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
  */
 package org.apache.hadoop.ozone.protocol.commands;
 
@@ -24,7 +23,6 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
 
-
 /**
  * Asks datanode to close a container.
  */
@@ -36,6 +34,15 @@ public class CloseContainerCommand
 
   public CloseContainerCommand(long containerID,
       HddsProtos.ReplicationType replicationType) {
+    super();
+    this.containerID = containerID;
+    this.replicationType = replicationType;
+  }
+
+  // Should be called only for protobuf conversion
+  private CloseContainerCommand(long containerID,
+      HddsProtos.ReplicationType replicationType, long cmdId) {
+    super(cmdId);
     this.containerID = containerID;
     this.replicationType = replicationType;
   }
@@ -63,6 +70,7 @@ public class CloseContainerCommand
   public CloseContainerCommandProto getProto() {
     return CloseContainerCommandProto.newBuilder()
         .setContainerID(containerID)
+        .setCmdId(getCmdId())
         .setReplicationType(replicationType).build();
   }
 
@@ -70,8 +78,8 @@ public class CloseContainerCommand
       CloseContainerCommandProto closeContainerProto) {
     Preconditions.checkNotNull(closeContainerProto);
     return new CloseContainerCommand(closeContainerProto.getContainerID(),
-        closeContainerProto.getReplicationType());
-
+        closeContainerProto.getReplicationType(), closeContainerProto
+        .getCmdId());
   }
 
   public long getContainerID() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java
new file mode 100644
index 0000000..bf99700
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+
+/**
+ * A class that is used to communicate status of datanode commands.
+ */
+public class CommandStatus {
+
+  private SCMCommandProto.Type type;
+  private Long cmdId;
+  private Status status;
+  private String msg;
+
+  public Type getType() {
+    return type;
+  }
+
+  public Long getCmdId() {
+    return cmdId;
+  }
+
+  public Status getStatus() {
+    return status;
+  }
+
+  public String getMsg() {
+    return msg;
+  }
+
+  /**
+   * To allow change of status once commandStatus is initialized.
+   *
+   * @param status
+   */
+  public void setStatus(Status status) {
+    this.status = status;
+  }
+
+  /**
+   * Returns a CommandStatus from the protocol buffers.
+   *
+   * @param cmdStatusProto - protoBuf Message
+   * @return CommandStatus
+   */
+  public CommandStatus getFromProtoBuf(
+      StorageContainerDatanodeProtocolProtos.CommandStatus cmdStatusProto) {
+    return CommandStatusBuilder.newBuilder()
+        .setCmdId(cmdStatusProto.getCmdId())
+        .setStatus(cmdStatusProto.getStatus())
+        .setType(cmdStatusProto.getType())
+        .setMsg(cmdStatusProto.getMsg()).build();
+  }
+  /**
+   * Returns a CommandStatus from the protocol buffers.
+   *
+   * @return StorageContainerDatanodeProtocolProtos.CommandStatus
+   */
+  public StorageContainerDatanodeProtocolProtos.CommandStatus
+      getProtoBufMessage() {
+    StorageContainerDatanodeProtocolProtos.CommandStatus.Builder builder =
+        StorageContainerDatanodeProtocolProtos.CommandStatus.newBuilder()
+            .setCmdId(this.getCmdId())
+            .setStatus(this.getStatus())
+            .setType(this.getType());
+    if (this.getMsg() != null) {
+      builder.setMsg(this.getMsg());
+    }
+    return builder.build();
+  }
+
+  /**
+   * Builder class for CommandStatus.
+   */
+  public static final class CommandStatusBuilder {
+
+    private SCMCommandProto.Type type;
+    private Long cmdId;
+    private StorageContainerDatanodeProtocolProtos.CommandStatus.Status status;
+    private String msg;
+
+    private CommandStatusBuilder() {
+    }
+
+    public static CommandStatusBuilder newBuilder() {
+      return new CommandStatusBuilder();
+    }
+
+    public CommandStatusBuilder setType(Type type) {
+      this.type = type;
+      return this;
+    }
+
+    public CommandStatusBuilder setCmdId(Long cmdId) {
+      this.cmdId = cmdId;
+      return this;
+    }
+
+    public CommandStatusBuilder setStatus(Status status) {
+      this.status = status;
+      return this;
+    }
+
+    public CommandStatusBuilder setMsg(String msg) {
+      this.msg = msg;
+      return this;
+    }
+
+    public CommandStatus build() {
+      CommandStatus commandStatus = new CommandStatus();
+      commandStatus.type = this.type;
+      commandStatus.msg = this.msg;
+      commandStatus.status = this.status;
+      commandStatus.cmdId = this.cmdId;
+      return commandStatus;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
index 4fa33f6..46af794 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -36,6 +36,14 @@ public class DeleteBlocksCommand extends
 
 
   public DeleteBlocksCommand(List<DeletedBlocksTransaction> blocks) {
+    super();
+    this.blocksTobeDeleted = blocks;
+  }
+
+  // Should be called only for protobuf conversion
+  private DeleteBlocksCommand(List<DeletedBlocksTransaction> blocks,
+      long cmdId) {
+    super(cmdId);
     this.blocksTobeDeleted = blocks;
   }
 
@@ -56,11 +64,12 @@ public class DeleteBlocksCommand extends
   public static DeleteBlocksCommand getFromProtobuf(
       DeleteBlocksCommandProto deleteBlocksProto) {
     return new DeleteBlocksCommand(deleteBlocksProto
-        .getDeletedBlocksTransactionsList());
+        .getDeletedBlocksTransactionsList(), deleteBlocksProto.getCmdId());
   }
 
   public DeleteBlocksCommandProto getProto() {
     return DeleteBlocksCommandProto.newBuilder()
+        .setCmdId(getCmdId())
         .addAllDeletedBlocksTransactions(blocksTobeDeleted).build();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
index 834318b..e860c93 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
 
 import com.google.common.base.Preconditions;
 
@@ -41,11 +40,19 @@ public class ReplicateContainerCommand
     extends SCMCommand<ReplicateContainerCommandProto> {
 
   private final long containerID;
-
   private final List<DatanodeDetails> sourceDatanodes;
 
   public ReplicateContainerCommand(long containerID,
       List<DatanodeDetails> sourceDatanodes) {
+    super();
+    this.containerID = containerID;
+    this.sourceDatanodes = sourceDatanodes;
+  }
+
+  // Should be called only for protobuf conversion
+  public ReplicateContainerCommand(long containerID,
+      List<DatanodeDetails> sourceDatanodes, long cmdId) {
+    super(cmdId);
     this.containerID = containerID;
     this.sourceDatanodes = sourceDatanodes;
   }
@@ -62,6 +69,7 @@ public class ReplicateContainerCommand
 
   public ReplicateContainerCommandProto getProto() {
     Builder builder = ReplicateContainerCommandProto.newBuilder()
+        .setCmdId(getCmdId())
         .setContainerID(containerID);
     for (DatanodeDetails dd : sourceDatanodes) {
       builder.addSources(dd.getProtoBufMessage());
@@ -75,12 +83,12 @@ public class ReplicateContainerCommand
 
     List<DatanodeDetails> datanodeDetails =
         protoMessage.getSourcesList()
-        .stream()
-        .map(DatanodeDetails::getFromProtoBuf)
-        .collect(Collectors.toList());
+            .stream()
+            .map(DatanodeDetails::getFromProtoBuf)
+            .collect(Collectors.toList());
 
     return new ReplicateContainerCommand(protoMessage.getContainerID(),
-        datanodeDetails);
+        datanodeDetails, protoMessage.getCmdId());
 
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
index 953e31a..d557104 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
@@ -49,6 +49,16 @@ public class ReregisterCommand extends
     return getProto().toByteArray();
   }
 
+  /**
+   * Not implemented for ReregisterCommand.
+   *
+   * @return cmdId.
+   */
+  @Override
+  public long getCmdId() {
+    return 0;
+  }
+
   public ReregisterCommandProto getProto() {
     return ReregisterCommandProto
         .newBuilder()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
index 35ca802..6cda591 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.protocol.commands;
 
 import com.google.protobuf.GeneratedMessage;
+import org.apache.hadoop.hdds.HddsIdFactory;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 
@@ -27,6 +28,15 @@ import org.apache.hadoop.hdds.protocol.proto
  * @param <T>
  */
 public abstract class SCMCommand<T extends GeneratedMessage> {
+  private long cmdId;
+
+  SCMCommand() {
+    this.cmdId = HddsIdFactory.getLongId();
+  }
+
+  SCMCommand(long cmdId) {
+    this.cmdId = cmdId;
+  }
   /**
    * Returns the type of this command.
    * @return Type
@@ -38,4 +48,13 @@ public abstract class SCMCommand<T extends GeneratedMessage> {
    * @return A protobuf message.
    */
   public abstract byte[] getProtoBufMessage();
+
+  /**
+   * Gets the commandId of this object.
+   * @return uuid.
+   */
+  public long getCmdId() {
+    return cmdId;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 54230c1..4238389 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -80,6 +80,7 @@ message SCMHeartbeatRequestProto {
   optional NodeReportProto nodeReport = 2;
   optional ContainerReportsProto containerReport = 3;
   optional ContainerActionsProto containerActions = 4;
+  optional CommandStatusReportsProto commandStatusReport = 5;
 }
 
 /*
@@ -127,6 +128,22 @@ message ContainerReportsProto {
   repeated ContainerInfo reports = 1;
 }
 
+message CommandStatusReportsProto {
+  repeated CommandStatus cmdStatus = 1;
+}
+
+message CommandStatus {
+  enum Status {
+    PENDING = 1;
+    EXECUTED = 2;
+    FAILED = 3;
+  }
+  required int64 cmdId = 1;
+  required Status status = 2 [default = PENDING];
+  required SCMCommandProto.Type type = 3;
+  optional string msg = 4;
+}
+
 message ContainerActionsProto {
   repeated ContainerAction containerActions = 1;
 }
@@ -193,6 +210,7 @@ message ReregisterCommandProto {}
 // HB response from SCM, contains a list of block deletion transactions.
 message DeleteBlocksCommandProto {
   repeated DeletedBlocksTransaction deletedBlocksTransactions = 1;
+  required int64 cmdId = 3;
 }
 
 // The deleted blocks which are stored in deletedBlock.db of scm.
@@ -226,6 +244,7 @@ This command asks the datanode to close a specific container.
 message CloseContainerCommandProto {
   required int64 containerID = 1;
   required hadoop.hdds.ReplicationType replicationType = 2;
+  required int64 cmdId = 3;
 }
 
 /**
@@ -233,6 +252,7 @@ This command asks the datanode to delete a specific container.
 */
 message DeleteContainerCommandProto {
   required int64 containerID = 1;
+  required int64 cmdId = 2;
 }
 
 /**
@@ -241,6 +261,7 @@ This command asks the datanode to replicate a container from specific sources.
 message ReplicateContainerCommandProto {
   required int64 containerID = 1;
   repeated DatanodeDetailsProto sources = 2;
+  required int64 cmdId = 3;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
index 8f4b0e3..fb8e7c1 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
@@ -18,6 +18,8 @@ package org.apache.hadoop.ozone.container.common;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CommandStatus;
 import org.apache.hadoop.hdds.scm.VersionInfo;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
@@ -59,6 +61,9 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
   private Map<DatanodeDetails, Map<String, ContainerInfo>> nodeContainers =
       new HashMap();
   private Map<DatanodeDetails, NodeReportProto> nodeReports = new HashMap<>();
+  private AtomicInteger commandStatusReport = new AtomicInteger(0);
+  private List<CommandStatus> cmdStatusList = new LinkedList<>();
+  private List<SCMCommandProto> scmCommandRequests = new LinkedList<>();
   /**
    * Returns the number of heartbeats made to this class.
    *
@@ -180,10 +185,12 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
       sendHeartbeat(SCMHeartbeatRequestProto heartbeat) throws IOException {
     rpcCount.incrementAndGet();
     heartbeatCount.incrementAndGet();
+    if(heartbeat.hasCommandStatusReport()){
+      cmdStatusList.addAll(heartbeat.getCommandStatusReport().getCmdStatusList());
+      commandStatusReport.incrementAndGet();
+    }
     sleepIfNeeded();
-    List<SCMCommandProto>
-        cmdResponses = new LinkedList<>();
-    return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses)
+    return SCMHeartbeatResponseProto.newBuilder().addAllCommands(scmCommandRequests)
         .setDatanodeUUID(heartbeat.getDatanodeDetails().getUuid())
         .build();
   }
@@ -302,4 +309,24 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
     nodeContainers.clear();
 
   }
+
+  public int getCommandStatusReportCount() {
+    return commandStatusReport.get();
+  }
+
+  public List<CommandStatus> getCmdStatusList() {
+    return cmdStatusList;
+  }
+
+  public List<SCMCommandProto> getScmCommandRequests() {
+    return scmCommandRequests;
+  }
+
+  public void clearScmCommandRequests() {
+    scmCommandRequests.clear();
+  }
+
+  public void addScmCommandRequest(SCMCommandProto scmCmd) {
+    scmCommandRequests.add(scmCmd);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index 5fd9cf6..026e7aa 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -20,18 +20,27 @@ package org.apache.hadoop.ozone.container.common.report;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.GeneratedMessage;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.HddsIdFactory;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -42,12 +51,20 @@ import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Test cases to test {@link ReportPublisher}.
  */
 public class TestReportPublisher {
 
+  private static Configuration config;
+
+  @BeforeClass
+  public static void setup() {
+    config = new OzoneConfiguration();
+  }
+
   /**
    * Dummy report publisher for testing.
    */
@@ -93,9 +110,9 @@ public class TestReportPublisher {
                 .setNameFormat("Unit test ReportManager Thread - %d").build());
     publisher.init(dummyContext, executorService);
     Thread.sleep(150);
-    Assert.assertEquals(1, ((DummyReportPublisher)publisher).getReportCount);
+    Assert.assertEquals(1, ((DummyReportPublisher) publisher).getReportCount);
     Thread.sleep(150);
-    Assert.assertEquals(2, ((DummyReportPublisher)publisher).getReportCount);
+    Assert.assertEquals(2, ((DummyReportPublisher) publisher).getReportCount);
     executorService.shutdown();
   }
 
@@ -110,12 +127,58 @@ public class TestReportPublisher {
     publisher.init(dummyContext, executorService);
     Thread.sleep(150);
     executorService.shutdown();
-    Assert.assertEquals(1, ((DummyReportPublisher)publisher).getReportCount);
+    Assert.assertEquals(1, ((DummyReportPublisher) publisher).getReportCount);
     verify(dummyContext, times(1)).addReport(null);
 
   }
 
   @Test
+  public void testCommandStatusPublisher() throws InterruptedException {
+    StateContext dummyContext = Mockito.mock(StateContext.class);
+    ReportPublisher publisher = new CommandStatusReportPublisher();
+    final Map<Long, CommandStatus> cmdStatusMap = new ConcurrentHashMap<>();
+    when(dummyContext.getCommandStatusMap()).thenReturn(cmdStatusMap);
+    publisher.setConf(config);
+
+    ScheduledExecutorService executorService = HadoopExecutors
+        .newScheduledThreadPool(1,
+            new ThreadFactoryBuilder().setDaemon(true)
+                .setNameFormat("Unit test ReportManager Thread - %d").build());
+    publisher.init(dummyContext, executorService);
+    Assert.assertEquals(0,
+        ((CommandStatusReportPublisher) publisher).getReport()
+            .getCmdStatusCount());
+
+    // Insert to status object to state context map and then get the report.
+    CommandStatus obj1 = CommandStatus.CommandStatusBuilder.newBuilder()
+        .setCmdId(HddsIdFactory.getLongId())
+        .setType(Type.deleteBlocksCommand)
+        .setStatus(Status.PENDING)
+        .build();
+    CommandStatus obj2 = CommandStatus.CommandStatusBuilder.newBuilder()
+        .setCmdId(HddsIdFactory.getLongId())
+        .setType(Type.closeContainerCommand)
+        .setStatus(Status.EXECUTED)
+        .build();
+    cmdStatusMap.put(obj1.getCmdId(), obj1);
+    cmdStatusMap.put(obj2.getCmdId(), obj2);
+    Assert.assertEquals("Should publish report with 2 status objects", 2,
+        ((CommandStatusReportPublisher) publisher).getReport()
+            .getCmdStatusCount());
+    Assert.assertEquals(
+        "Next report should have 1 status objects as command status o"
+            + "bjects are still in Pending state",
+        1, ((CommandStatusReportPublisher) publisher).getReport()
+            .getCmdStatusCount());
+    Assert.assertTrue(
+        "Next report should have 1 status objects as command status "
+            + "objects are still in Pending state",
+        ((CommandStatusReportPublisher) publisher).getReport()
+            .getCmdStatusList().get(0).getStatus().equals(Status.PENDING));
+    executorService.shutdown();
+  }
+
+  @Test
   public void testAddingReportToHeartbeat() {
     Configuration conf = new OzoneConfiguration();
     ReportPublisherFactory factory = new ReportPublisherFactory(conf);
@@ -168,10 +231,10 @@ public class TestReportPublisher {
    * Adds the report to heartbeat.
    *
    * @param requestBuilder builder to which the report has to be added.
-   * @param report the report to be added.
+   * @param report         the report to be added.
    */
-  private static void addReport(SCMHeartbeatRequestProto.Builder requestBuilder,
-                          GeneratedMessage report) {
+  private static void addReport(SCMHeartbeatRequestProto.Builder
+      requestBuilder, GeneratedMessage report) {
     String reportName = report.getDescriptorForType().getFullName();
     for (Descriptors.FieldDescriptor descriptor :
         SCMHeartbeatRequestProto.getDescriptor().getFields()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 0afd675..485b3f5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -21,8 +21,12 @@ package org.apache.hadoop.hdds.scm.events;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .CommandStatusReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .NodeReportFromDatanode;
 
 import org.apache.hadoop.hdds.server.events.Event;
 import org.apache.hadoop.hdds.server.events.TypedEvent;
@@ -34,47 +38,54 @@ import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 public final class SCMEvents {
 
   /**
-   * NodeReports are  sent out by Datanodes. This report is
-   * received by SCMDatanodeHeartbeatDispatcher and NodeReport Event is
-   * generated.
+   * NodeReports are  sent out by Datanodes. This report is received by
+   * SCMDatanodeHeartbeatDispatcher and NodeReport Event is generated.
    */
   public static final TypedEvent<NodeReportFromDatanode> NODE_REPORT =
       new TypedEvent<>(NodeReportFromDatanode.class, "Node_Report");
   /**
-   * ContainerReports are send out by Datanodes. This report
-   * is received by SCMDatanodeHeartbeatDispatcher and Container_Report Event
-   * i generated.
+   * ContainerReports are send out by Datanodes. This report is received by
+   * SCMDatanodeHeartbeatDispatcher and Container_Report Event
+   * isTestSCMDatanodeHeartbeatDispatcher generated.
    */
   public static final TypedEvent<ContainerReportFromDatanode> CONTAINER_REPORT =
       new TypedEvent<>(ContainerReportFromDatanode.class, "Container_Report");
 
   /**
+   * A Command status report will be sent by datanodes. This repoort is received
+   * by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated.
+   */
+  public static final TypedEvent<CommandStatusReportFromDatanode>
+      CMD_STATUS_REPORT =
+      new TypedEvent<>(CommandStatusReportFromDatanode.class,
+          "Cmd_Status_Report");
+
+  /**
    * When ever a command for the Datanode needs to be issued by any component
-   * inside SCM, a Datanode_Command event is generated. NodeManager listens
-   * to these events and dispatches them to Datanode for further processing.
+   * inside SCM, a Datanode_Command event is generated. NodeManager listens to
+   * these events and dispatches them to Datanode for further processing.
    */
   public static final Event<CommandForDatanode> DATANODE_COMMAND =
       new TypedEvent<>(CommandForDatanode.class, "Datanode_Command");
 
   /**
-   * A Close Container Event can be triggered under many condition.
-   * Some of them are:
-   *    1. A Container is full, then we stop writing further information to
-   *    that container. DN's let SCM know that current state and sends a
-   *    informational message that allows SCM to close the container.
-   *
-   *    2. If a pipeline is open; for example Ratis; if a single node fails,
-   *    we will proactively close these containers.
-   *
-   *  Once a command is dispatched to DN, we will also listen to updates from
-   *  the datanode which lets us know that this command completed or timed out.
+   * A Close Container Event can be triggered under many condition. Some of them
+   * are: 1. A Container is full, then we stop writing further information to
+   * that container. DN's let SCM know that current state and sends a
+   * informational message that allows SCM to close the container.
+   * <p>
+   * 2. If a pipeline is open; for example Ratis; if a single node fails, we
+   * will proactively close these containers.
+   * <p>
+   * Once a command is dispatched to DN, we will also listen to updates from the
+   * datanode which lets us know that this command completed or timed out.
    */
   public static final TypedEvent<ContainerID> CLOSE_CONTAINER =
       new TypedEvent<>(ContainerID.class, "Close_Container");
 
   /**
-   * This event will be triggered whenever a new datanode is
-   * registered with SCM.
+   * This event will be triggered whenever a new datanode is registered with
+   * SCM.
    */
   public static final TypedEvent<DatanodeDetails> NEW_NODE =
       new TypedEvent<>(DatanodeDetails.class, "New_Node");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index 4cfa98f..2461d37 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.server;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -37,7 +39,7 @@ import java.util.List;
 
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT;
-
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT;
 /**
  * This class is responsible for dispatching heartbeat from datanode to
  * appropriate EventHandler at SCM.
@@ -86,6 +88,13 @@ public final class SCMDatanodeHeartbeatDispatcher {
               heartbeat.getContainerReport()));
 
     }
+
+    if (heartbeat.hasCommandStatusReport()) {
+      eventPublisher.fireEvent(CMD_STATUS_REPORT,
+          new CommandStatusReportFromDatanode(datanodeDetails,
+              heartbeat.getCommandStatusReport()));
+    }
+
     return commands;
   }
 
@@ -136,4 +145,16 @@ public final class SCMDatanodeHeartbeatDispatcher {
     }
   }
 
+  /**
+   * Container report event payload with origin.
+   */
+  public static class CommandStatusReportFromDatanode
+      extends ReportFromDatanode<CommandStatusReportsProto> {
+
+    public CommandStatusReportFromDatanode(DatanodeDetails datanodeDetails,
+        CommandStatusReportsProto report) {
+      super(datanodeDetails, report);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java
index 042e3cc..1b79ebf 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java
@@ -21,6 +21,10 @@ import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
+import org.apache.hadoop.hdds.scm.server.
+    SCMDatanodeHeartbeatDispatcher.CommandStatusReportFromDatanode;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -42,6 +46,7 @@ import org.mockito.Mockito;
 
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT;
 
 /**
  * This class tests the behavior of SCMDatanodeHeartbeatDispatcher.
@@ -91,6 +96,8 @@ public class TestSCMDatanodeHeartbeatDispatcher {
 
     ContainerReportsProto containerReport =
         ContainerReportsProto.getDefaultInstance();
+    CommandStatusReportsProto commandStatusReport =
+        CommandStatusReportsProto.getDefaultInstance();
 
     SCMDatanodeHeartbeatDispatcher dispatcher =
         new SCMDatanodeHeartbeatDispatcher(Mockito.mock(NodeManager.class),
@@ -98,9 +105,18 @@ public class TestSCMDatanodeHeartbeatDispatcher {
           @Override
           public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
               EVENT_TYPE event, PAYLOAD payload) {
-            Assert.assertEquals(event, CONTAINER_REPORT);
-            Assert.assertEquals(containerReport,
-                ((ContainerReportFromDatanode)payload).getReport());
+            Assert.assertTrue(
+                event.equals(CONTAINER_REPORT)
+                    || event.equals(CMD_STATUS_REPORT));
+
+            if (payload instanceof ContainerReportFromDatanode) {
+              Assert.assertEquals(containerReport,
+                  ((ContainerReportFromDatanode) payload).getReport());
+            }
+            if (payload instanceof CommandStatusReportFromDatanode) {
+              Assert.assertEquals(commandStatusReport,
+                  ((CommandStatusReportFromDatanode) payload).getReport());
+            }
             eventReceived.incrementAndGet();
           }
         });
@@ -111,9 +127,10 @@ public class TestSCMDatanodeHeartbeatDispatcher {
         SCMHeartbeatRequestProto.newBuilder()
             .setDatanodeDetails(datanodeDetails.getProtoBufMessage())
             .setContainerReport(containerReport)
+            .setCommandStatusReport(commandStatusReport)
             .build();
     dispatcher.dispatch(heartbeat);
-    Assert.assertEquals(1, eventReceived.get());
+    Assert.assertEquals(2, eventReceived.get());
 
 
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89e2659/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 9db9e80..be8bd87 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -16,12 +16,29 @@
  */
 package org.apache.hadoop.ozone.container.common;
 
+import java.util.Map;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.DeleteBlocksCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.VersionInfo;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -54,6 +71,7 @@ import org.apache.hadoop.ozone.container.common.states.endpoint
 import org.apache.hadoop.ozone.container.common.states.endpoint
     .VersionEndpointTask;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
 import org.apache.hadoop.test.PathUtils;
 import org.apache.hadoop.util.Time;
 import org.junit.AfterClass;
@@ -74,6 +92,9 @@ import static org.apache.hadoop.ozone.container.common.ContainerTestUtils
     .createEndpoint;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.mockito.Mockito.when;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 /**
  * Tests the endpoints.
@@ -83,6 +104,7 @@ public class TestEndPoint {
   private static RPC.Server scmServer;
   private static ScmTestMock scmServerImpl;
   private static File testDir;
+  private static Configuration config;
 
   @AfterClass
   public static void tearDown() throws Exception {
@@ -99,6 +121,12 @@ public class TestEndPoint {
     scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(),
         scmServerImpl, serverAddress, 10);
     testDir = PathUtils.getTestDir(TestEndPoint.class);
+    config = SCMTestUtils.getConf();
+    config.set(DFS_DATANODE_DATA_DIR_KEY, testDir.getAbsolutePath());
+    config.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    config
+        .setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, true);
+    config.set(HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL,"1s");
   }
 
   @Test
@@ -312,7 +340,87 @@ public class TestEndPoint {
     }
   }
 
-  private void heartbeatTaskHelper(InetSocketAddress scmAddress,
+  @Test
+  public void testHeartbeatWithCommandStatusReport() throws Exception {
+    DatanodeDetails dataNode = getDatanodeDetails();
+    try (EndpointStateMachine rpcEndPoint =
+        createEndpoint(SCMTestUtils.getConf(),
+            serverAddress, 1000)) {
+      String storageId = UUID.randomUUID().toString();
+      // Add some scmCommands for heartbeat response
+      addScmCommands();
+
+
+      SCMHeartbeatRequestProto request = SCMHeartbeatRequestProto.newBuilder()
+          .setDatanodeDetails(dataNode.getProtoBufMessage())
+          .setNodeReport(TestUtils.createNodeReport(
+              getStorageReports(storageId)))
+          .build();
+
+      SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
+          .sendHeartbeat(request);
+      assertNotNull(responseProto);
+      assertEquals(3, responseProto.getCommandsCount());
+      assertEquals(0, scmServerImpl.getCommandStatusReportCount());
+
+      // Send heartbeat again from heartbeat endpoint task
+      final StateContext stateContext = heartbeatTaskHelper(serverAddress, 3000);
+      Map<Long, CommandStatus> map = stateContext.getCommandStatusMap();
+      assertNotNull(map);
+      assertEquals("Should have 3 objects", 3, map.size());
+      assertTrue(map.containsKey(Long.valueOf(1)));
+      assertTrue(map.containsKey(Long.valueOf(2)));
+      assertTrue(map.containsKey(Long.valueOf(3)));
+      assertTrue(map.get(Long.valueOf(1)).getType()
+          .equals(Type.closeContainerCommand));
+      assertTrue(map.get(Long.valueOf(2)).getType()
+          .equals(Type.replicateContainerCommand));
+      assertTrue(
+          map.get(Long.valueOf(3)).getType().equals(Type.deleteBlocksCommand));
+      assertTrue(map.get(Long.valueOf(1)).getStatus().equals(Status.PENDING));
+      assertTrue(map.get(Long.valueOf(2)).getStatus().equals(Status.PENDING));
+      assertTrue(map.get(Long.valueOf(3)).getStatus().equals(Status.PENDING));
+
+      scmServerImpl.clearScmCommandRequests();
+    }
+  }
+
+  private void addScmCommands() {
+    SCMCommandProto closeCommand = SCMCommandProto.newBuilder()
+        .setCloseContainerCommandProto(
+            CloseContainerCommandProto.newBuilder().setCmdId(1)
+        .setContainerID(1)
+        .setReplicationType(ReplicationType.RATIS)
+        .build())
+        .setCommandType(Type.closeContainerCommand)
+        .build();
+    SCMCommandProto replicationCommand = SCMCommandProto.newBuilder()
+        .setReplicateContainerCommandProto(
+            ReplicateContainerCommandProto.newBuilder()
+        .setCmdId(2)
+        .setContainerID(2)
+        .build())
+        .setCommandType(Type.replicateContainerCommand)
+        .build();
+    SCMCommandProto deleteBlockCommand = SCMCommandProto.newBuilder()
+        .setDeleteBlocksCommandProto(
+            DeleteBlocksCommandProto.newBuilder()
+                .setCmdId(3)
+                .addDeletedBlocksTransactions(
+                    DeletedBlocksTransaction.newBuilder()
+                        .setContainerID(45)
+                        .setCount(1)
+                        .setTxID(23)
+                        .build())
+                .build())
+        .setCommandType(Type.deleteBlocksCommand)
+        .build();
+    scmServerImpl.addScmCommandRequest(closeCommand);
+    scmServerImpl.addScmCommandRequest(deleteBlockCommand);
+    scmServerImpl.addScmCommandRequest(replicationCommand);
+  }
+
+  private StateContext heartbeatTaskHelper(InetSocketAddress scmAddress,
       int rpcTimeout) throws Exception {
     Configuration conf = SCMTestUtils.getConf();
     conf.set(DFS_DATANODE_DATA_DIR_KEY, testDir.getAbsolutePath());
@@ -344,6 +452,7 @@ public class TestEndPoint {
 
       Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT,
           rpcEndPoint.getState());
+      return stateContext;
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org