You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/07/20 21:38:14 UTC

[3/3] hadoop git commit: HDDS-260. Support in Datanode for sending ContainerActions to SCM. Contributed by Nanda kumar.

HDDS-260. Support in Datanode for sending ContainerActions to SCM. Contributed by Nanda kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/347c9550
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/347c9550
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/347c9550

Branch: refs/heads/trunk
Commit: 347c9550135ea10fd84d5007124452bf5f2d6619
Parents: 9be25e3
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Fri Jul 20 14:37:13 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Fri Jul 20 14:37:13 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdds/HddsConfigKeys.java  |   6 +
 .../common/src/main/resources/ozone-default.xml |  10 +
 .../common/statemachine/StateContext.java       |  55 +++-
 .../states/endpoint/HeartbeatEndpointTask.java  |  33 +-
 .../StorageContainerDatanodeProtocol.proto      |   4 +-
 .../common/report/TestReportPublisher.java      |  41 ---
 .../endpoint/TestHeartbeatEndpointTask.java     | 300 +++++++++++++++++++
 .../common/states/endpoint/package-info.java    |  18 ++
 8 files changed, 414 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 0283615..fd4bf08 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -48,4 +48,10 @@ public final class HddsConfigKeys {
       "hdds.command.status.report.interval";
   public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT =
       "60s";
+
+  public static final String HDDS_CONTAINER_ACTION_MAX_LIMIT =
+      "hdds.container.action.max.limit";
+  public static final int HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT =
+      20;
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 69a382a..84a3e0c 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1098,4 +1098,14 @@
     </description>
   </property>
 
+  <property>
+    <name>hdds.container.action.max.limit</name>
+    <value>20</value>
+    <tag>DATANODE</tag>
+    <description>
+      Maximum number of Container Actions sent by the datanode to SCM in a
+      single heartbeat.
+    </description>
+  </property>
+
 </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index faaff69..7862cc6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -20,14 +20,18 @@ import com.google.protobuf.GeneratedMessage;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerAction;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
 import org.apache.hadoop.ozone.container.common.states.DatanodeState;
 import org.apache.hadoop.ozone.container.common.states.datanode
     .InitDatanodeState;
 import org.apache.hadoop.ozone.container.common.states.datanode
     .RunningDatanodeState;
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
-import org.apache.hadoop.ozone.protocol.commands.CommandStatus.CommandStatusBuilder;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus
+    .CommandStatusBuilder;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,6 +47,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
 
@@ -59,6 +64,7 @@ public class StateContext {
   private final AtomicLong stateExecutionCount;
   private final Configuration conf;
   private final Queue<GeneratedMessage> reports;
+  private final Queue<ContainerAction> containerActions;
   private DatanodeStateMachine.DatanodeStates state;
 
   /**
@@ -76,6 +82,7 @@ public class StateContext {
     commandQueue = new LinkedList<>();
     cmdStatusMap = new ConcurrentHashMap<>();
     reports = new LinkedList<>();
+    containerActions = new LinkedList<>();
     lock = new ReentrantLock();
     stateExecutionCount = new AtomicLong(0);
   }
@@ -187,15 +194,45 @@ public class StateContext {
    * @return List<reports>
    */
   public List<GeneratedMessage> getReports(int maxLimit) {
-    List<GeneratedMessage> results = new ArrayList<>();
     synchronized (reports) {
-      GeneratedMessage report = reports.poll();
-      while(results.size() < maxLimit && report != null) {
-        results.add(report);
-        report = reports.poll();
-      }
+      return reports.parallelStream().limit(maxLimit)
+          .collect(Collectors.toList());
+    }
+  }
+
+
+  /**
+   * Adds the ContainerAction to ContainerAction queue.
+   *
+   * @param containerAction ContainerAction to be added
+   */
+  public void addContainerAction(ContainerAction containerAction) {
+    synchronized (containerActions) {
+      containerActions.add(containerAction);
+    }
+  }
+
+  /**
+   * Returns all the pending ContainerActions from the ContainerAction queue,
+   * or empty list if the queue is empty.
+   *
+   * @return List<ContainerAction>
+   */
+  public List<ContainerAction> getAllPendingContainerActions() {
+    return getPendingContainerAction(Integer.MAX_VALUE);
+  }
+
+  /**
+   * Returns pending ContainerActions from the ContainerAction queue with a
+   * max limit on list size, or empty list if the queue is empty.
+   *
+   * @return List<ContainerAction>
+   */
+  public List<ContainerAction> getPendingContainerAction(int maxLimit) {
+    synchronized (containerActions) {
+      return containerActions.parallelStream().limit(maxLimit)
+          .collect(Collectors.toList());
     }
-    return results;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 260a245..020fb71 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -25,6 +25,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerAction;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
@@ -46,8 +50,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.time.ZonedDateTime;
+import java.util.List;
 import java.util.concurrent.Callable;
 
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_CONTAINER_ACTION_MAX_LIMIT;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT;
+
 /**
  * Heartbeat class for SCMs.
  */
@@ -59,6 +69,7 @@ public class HeartbeatEndpointTask
   private final Configuration conf;
   private DatanodeDetailsProto datanodeDetailsProto;
   private StateContext context;
+  private int maxContainerActionsPerHB;
 
   /**
    * Constructs a SCM heart beat.
@@ -70,6 +81,8 @@ public class HeartbeatEndpointTask
     this.rpcEndpoint = rpcEndpoint;
     this.conf = conf;
     this.context = context;
+    this.maxContainerActionsPerHB = conf.getInt(HDDS_CONTAINER_ACTION_MAX_LIMIT,
+        HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT);
   }
 
   /**
@@ -107,7 +120,7 @@ public class HeartbeatEndpointTask
           SCMHeartbeatRequestProto.newBuilder()
               .setDatanodeDetails(datanodeDetailsProto);
       addReports(requestBuilder);
-
+      addContainerActions(requestBuilder);
       SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
           .sendHeartbeat(requestBuilder.build());
       processResponse(reponse, datanodeDetailsProto);
@@ -140,6 +153,24 @@ public class HeartbeatEndpointTask
   }
 
   /**
+   * Adds all the pending ContainerActions to the heartbeat.
+   *
+   * @param requestBuilder builder to which the report has to be added.
+   */
+  private void addContainerActions(
+      SCMHeartbeatRequestProto.Builder requestBuilder) {
+    List<ContainerAction> actions = context.getPendingContainerAction(
+        maxContainerActionsPerHB);
+    if (!actions.isEmpty()) {
+      ContainerActionsProto cap = ContainerActionsProto.newBuilder()
+          .addAllContainerActions(actions)
+          .build();
+      requestBuilder.setContainerActions(cap);
+    }
+  }
+
+
+  /**
    * Returns a builder class for HeartbeatEndpointTask task.
    * @return   Builder.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 4238389..d89567b 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -79,8 +79,8 @@ message SCMHeartbeatRequestProto {
   required DatanodeDetailsProto datanodeDetails = 1;
   optional NodeReportProto nodeReport = 2;
   optional ContainerReportsProto containerReport = 3;
-  optional ContainerActionsProto containerActions = 4;
-  optional CommandStatusReportsProto commandStatusReport = 5;
+  optional CommandStatusReportsProto commandStatusReport = 4;
+  optional ContainerActionsProto containerActions = 5;
 }
 
 /*

http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index a0db2e8..811599f 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.ozone.container.common.report;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.Descriptors;
 import com.google.protobuf.GeneratedMessage;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -28,14 +27,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
@@ -178,22 +171,6 @@ public class TestReportPublisher {
     executorService.shutdown();
   }
 
-  @Test
-  public void testAddingReportToHeartbeat() {
-    GeneratedMessage nodeReport = NodeReportProto.getDefaultInstance();
-    GeneratedMessage containerReport = ContainerReportsProto
-        .getDefaultInstance();
-    SCMHeartbeatRequestProto.Builder heartbeatBuilder =
-        SCMHeartbeatRequestProto.newBuilder();
-    heartbeatBuilder.setDatanodeDetails(
-        getDatanodeDetails().getProtoBufMessage());
-    addReport(heartbeatBuilder, nodeReport);
-    addReport(heartbeatBuilder, containerReport);
-    SCMHeartbeatRequestProto heartbeat = heartbeatBuilder.build();
-    Assert.assertTrue(heartbeat.hasNodeReport());
-    Assert.assertTrue(heartbeat.hasContainerReport());
-  }
-
   /**
    * Get a datanode details.
    *
@@ -222,22 +199,4 @@ public class TestReportPublisher {
     return builder.build();
   }
 
-  /**
-   * Adds the report to heartbeat.
-   *
-   * @param requestBuilder builder to which the report has to be added.
-   * @param report         the report to be added.
-   */
-  private static void addReport(SCMHeartbeatRequestProto.Builder
-      requestBuilder, GeneratedMessage report) {
-    String reportName = report.getDescriptorForType().getFullName();
-    for (Descriptors.FieldDescriptor descriptor :
-        SCMHeartbeatRequestProto.getDescriptor().getFields()) {
-      String heartbeatFieldName = descriptor.getMessageType().getFullName();
-      if (heartbeatFieldName.equals(reportName)) {
-        requestBuilder.setField(descriptor, report);
-      }
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
new file mode 100644
index 0000000..b4d718d
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.states.endpoint;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerInfo;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerAction;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .DatanodeStateMachine.DatanodeStates;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.protocolPB
+    .StorageContainerDatanodeProtocolClientSideTranslatorPB;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.util.UUID;
+
+/**
+ * This class tests the functionality of HeartbeatEndpointTask.
+ */
+public class TestHeartbeatEndpointTask {
+
+
+  @Test
+  public void testheartbeatWithoutReports() throws Exception {
+    StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+        Mockito.mock(
+            StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+    ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
+        .forClass(SCMHeartbeatRequestProto.class);
+    Mockito.when(scm.sendHeartbeat(argument.capture()))
+        .thenAnswer(invocation ->
+            SCMHeartbeatResponseProto.newBuilder()
+                .setDatanodeUUID(
+                    ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+                        .getDatanodeDetails().getUuid())
+                .build());
+
+    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(scm);
+    endpointTask.call();
+    SCMHeartbeatRequestProto heartbeat = argument.getValue();
+    Assert.assertTrue(heartbeat.hasDatanodeDetails());
+    Assert.assertFalse(heartbeat.hasNodeReport());
+    Assert.assertFalse(heartbeat.hasContainerReport());
+    Assert.assertFalse(heartbeat.hasCommandStatusReport());
+    Assert.assertFalse(heartbeat.hasContainerActions());
+  }
+
+  @Test
+  public void testheartbeatWithNodeReports() throws Exception {
+    Configuration conf = new OzoneConfiguration();
+    StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+        Mockito.mock(DatanodeStateMachine.class));
+
+    StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+        Mockito.mock(
+            StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+    ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
+        .forClass(SCMHeartbeatRequestProto.class);
+    Mockito.when(scm.sendHeartbeat(argument.capture()))
+        .thenAnswer(invocation ->
+            SCMHeartbeatResponseProto.newBuilder()
+                .setDatanodeUUID(
+                    ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+                        .getDatanodeDetails().getUuid())
+                .build());
+
+    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
+        conf, context, scm);
+    context.addReport(NodeReportProto.getDefaultInstance());
+    endpointTask.call();
+    SCMHeartbeatRequestProto heartbeat = argument.getValue();
+    Assert.assertTrue(heartbeat.hasDatanodeDetails());
+    Assert.assertTrue(heartbeat.hasNodeReport());
+    Assert.assertFalse(heartbeat.hasContainerReport());
+    Assert.assertFalse(heartbeat.hasCommandStatusReport());
+    Assert.assertFalse(heartbeat.hasContainerActions());
+  }
+
+  @Test
+  public void testheartbeatWithContainerReports() throws Exception {
+    Configuration conf = new OzoneConfiguration();
+    StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+        Mockito.mock(DatanodeStateMachine.class));
+
+    StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+        Mockito.mock(
+            StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+    ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
+        .forClass(SCMHeartbeatRequestProto.class);
+    Mockito.when(scm.sendHeartbeat(argument.capture()))
+        .thenAnswer(invocation ->
+            SCMHeartbeatResponseProto.newBuilder()
+                .setDatanodeUUID(
+                    ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+                        .getDatanodeDetails().getUuid())
+                .build());
+
+    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
+        conf, context, scm);
+    context.addReport(ContainerReportsProto.getDefaultInstance());
+    endpointTask.call();
+    SCMHeartbeatRequestProto heartbeat = argument.getValue();
+    Assert.assertTrue(heartbeat.hasDatanodeDetails());
+    Assert.assertFalse(heartbeat.hasNodeReport());
+    Assert.assertTrue(heartbeat.hasContainerReport());
+    Assert.assertFalse(heartbeat.hasCommandStatusReport());
+    Assert.assertFalse(heartbeat.hasContainerActions());
+  }
+
+  @Test
+  public void testheartbeatWithCommandStatusReports() throws Exception {
+    Configuration conf = new OzoneConfiguration();
+    StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+        Mockito.mock(DatanodeStateMachine.class));
+
+    StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+        Mockito.mock(
+            StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+    ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
+        .forClass(SCMHeartbeatRequestProto.class);
+    Mockito.when(scm.sendHeartbeat(argument.capture()))
+        .thenAnswer(invocation ->
+            SCMHeartbeatResponseProto.newBuilder()
+                .setDatanodeUUID(
+                    ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+                        .getDatanodeDetails().getUuid())
+                .build());
+
+    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
+        conf, context, scm);
+    context.addReport(CommandStatusReportsProto.getDefaultInstance());
+    endpointTask.call();
+    SCMHeartbeatRequestProto heartbeat = argument.getValue();
+    Assert.assertTrue(heartbeat.hasDatanodeDetails());
+    Assert.assertFalse(heartbeat.hasNodeReport());
+    Assert.assertFalse(heartbeat.hasContainerReport());
+    Assert.assertTrue(heartbeat.hasCommandStatusReport());
+    Assert.assertFalse(heartbeat.hasContainerActions());
+  }
+
+  @Test
+  public void testheartbeatWithContainerActions() throws Exception {
+    Configuration conf = new OzoneConfiguration();
+    StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+        Mockito.mock(DatanodeStateMachine.class));
+
+    StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+        Mockito.mock(
+            StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+    ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
+        .forClass(SCMHeartbeatRequestProto.class);
+    Mockito.when(scm.sendHeartbeat(argument.capture()))
+        .thenAnswer(invocation ->
+            SCMHeartbeatResponseProto.newBuilder()
+                .setDatanodeUUID(
+                    ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+                        .getDatanodeDetails().getUuid())
+                .build());
+
+    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
+        conf, context, scm);
+    context.addContainerAction(getContainerAction());
+    endpointTask.call();
+    SCMHeartbeatRequestProto heartbeat = argument.getValue();
+    Assert.assertTrue(heartbeat.hasDatanodeDetails());
+    Assert.assertFalse(heartbeat.hasNodeReport());
+    Assert.assertFalse(heartbeat.hasContainerReport());
+    Assert.assertFalse(heartbeat.hasCommandStatusReport());
+    Assert.assertTrue(heartbeat.hasContainerActions());
+  }
+
+  @Test
+  public void testheartbeatWithAllReports() throws Exception {
+    Configuration conf = new OzoneConfiguration();
+    StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+        Mockito.mock(DatanodeStateMachine.class));
+
+    StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+        Mockito.mock(
+            StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+    ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
+        .forClass(SCMHeartbeatRequestProto.class);
+    Mockito.when(scm.sendHeartbeat(argument.capture()))
+        .thenAnswer(invocation ->
+            SCMHeartbeatResponseProto.newBuilder()
+                .setDatanodeUUID(
+                    ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+                        .getDatanodeDetails().getUuid())
+                .build());
+
+    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
+        conf, context, scm);
+    context.addReport(NodeReportProto.getDefaultInstance());
+    context.addReport(ContainerReportsProto.getDefaultInstance());
+    context.addReport(CommandStatusReportsProto.getDefaultInstance());
+    context.addContainerAction(getContainerAction());
+    endpointTask.call();
+    SCMHeartbeatRequestProto heartbeat = argument.getValue();
+    Assert.assertTrue(heartbeat.hasDatanodeDetails());
+    Assert.assertTrue(heartbeat.hasNodeReport());
+    Assert.assertTrue(heartbeat.hasContainerReport());
+    Assert.assertTrue(heartbeat.hasCommandStatusReport());
+    Assert.assertTrue(heartbeat.hasContainerActions());
+  }
+
+  /**
+   * Creates HeartbeatEndpointTask for the given StorageContainerManager proxy.
+   *
+   * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB
+   *
+   * @return HeartbeatEndpointTask
+   */
+  private HeartbeatEndpointTask getHeartbeatEndpointTask(
+      StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) {
+    Configuration conf = new OzoneConfiguration();
+    StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+        Mockito.mock(DatanodeStateMachine.class));
+    return getHeartbeatEndpointTask(conf, context, proxy);
+
+  }
+
+  /**
+   * Creates HeartbeatEndpointTask with the given conf, context and
+   * StorageContainerManager client side proxy.
+   *
+   * @param conf Configuration
+   * @param context StateContext
+   * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB
+   *
+   * @return HeartbeatEndpointTask
+   */
+  private HeartbeatEndpointTask getHeartbeatEndpointTask(
+      Configuration conf,
+      StateContext context,
+      StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) {
+    DatanodeDetails datanodeDetails = DatanodeDetails.newBuilder()
+        .setUuid(UUID.randomUUID().toString())
+        .setHostName("localhost")
+        .setIpAddress("127.0.0.1")
+        .build();
+    EndpointStateMachine endpointStateMachine = Mockito
+        .mock(EndpointStateMachine.class);
+    Mockito.when(endpointStateMachine.getEndPoint()).thenReturn(proxy);
+    return HeartbeatEndpointTask.newBuilder()
+        .setConfig(conf)
+        .setDatanodeDetails(datanodeDetails)
+        .setContext(context)
+        .setEndpointStateMachine(endpointStateMachine)
+        .build();
+  }
+
+  private ContainerAction getContainerAction() {
+    ContainerAction.Builder builder = ContainerAction.newBuilder();
+    ContainerInfo containerInfo = ContainerInfo.newBuilder()
+        .setContainerID(1L)
+        .build();
+    builder.setContainer(containerInfo)
+        .setAction(ContainerAction.Action.CLOSE)
+        .setReason(ContainerAction.Reason.CONTAINER_FULL);
+    return builder.build();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
new file mode 100644
index 0000000..d120a5c
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.endpoint;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org