You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/06/24 23:31:31 UTC

[ozone] branch master updated: HDDS-6944. EC: Handle reconstructECContainersCommand in heartbeat (#3548)

This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 6354406b4a HDDS-6944. EC: Handle reconstructECContainersCommand in heartbeat (#3548)
6354406b4a is described below

commit 6354406b4a06a7d13c1bde19c95c268dd6debfbd
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Sat Jun 25 01:31:25 2022 +0200

    HDDS-6944. EC: Handle reconstructECContainersCommand in heartbeat (#3548)
---
 .../states/endpoint/HeartbeatEndpointTask.java     | 110 +++++++++++----------
 .../states/endpoint/TestHeartbeatEndpointTask.java |  42 ++++++++
 .../hdds/scm/server/SCMDatanodeProtocolServer.java |   8 ++
 3 files changed, 106 insertions(+), 54 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index d0bc18c4b4..45838b5fd8 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -55,9 +55,11 @@ import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand;
+import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
 import org.apache.hadoop.ozone.protocol.commands.RefreshVolumeUsageCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -309,141 +311,109 @@ public class HeartbeatEndpointTask
     for (SCMCommandProto commandResponseProto : response.getCommandsList()) {
       switch (commandResponseProto.getCommandType()) {
       case reregisterCommand:
-        if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Received SCM notification to register."
-                + " Interrupt HEARTBEAT and transit to REGISTER state.");
-          }
-          rpcEndpoint.setState(EndPointStates.REGISTER);
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Illegal state {} found, expecting {}.",
-                rpcEndpoint.getState().name(), EndPointStates.HEARTBEAT);
-          }
-        }
+        processReregisterCommand();
         break;
       case deleteBlocksCommand:
         DeleteBlocksCommand deleteBlocksCommand = DeleteBlocksCommand
             .getFromProtobuf(
                 commandResponseProto.getDeleteBlocksCommandProto());
-        if (commandResponseProto.hasTerm()) {
-          deleteBlocksCommand.setTerm(commandResponseProto.getTerm());
-        }
         if (!deleteBlocksCommand.blocksTobeDeleted().isEmpty()) {
           if (LOG.isDebugEnabled()) {
             LOG.debug(DeletedContainerBlocksSummary
                 .getFrom(deleteBlocksCommand.blocksTobeDeleted())
                 .toString());
           }
-          this.context.addCommand(deleteBlocksCommand);
+          processCommonCommand(commandResponseProto, deleteBlocksCommand);
         }
         break;
       case closeContainerCommand:
         CloseContainerCommand closeContainer =
             CloseContainerCommand.getFromProtobuf(
                 commandResponseProto.getCloseContainerCommandProto());
-        if (commandResponseProto.hasTerm()) {
-          closeContainer.setTerm(commandResponseProto.getTerm());
-        }
-        if (commandResponseProto.hasEncodedToken()) {
-          closeContainer.setEncodedToken(
-              commandResponseProto.getEncodedToken());
-        }
         if (LOG.isDebugEnabled()) {
           LOG.debug("Received SCM container close request for container {}",
               closeContainer.getContainerID());
         }
-        this.context.addCommand(closeContainer);
+        processCommonCommand(commandResponseProto, closeContainer);
         break;
       case replicateContainerCommand:
         ReplicateContainerCommand replicateContainerCommand =
             ReplicateContainerCommand.getFromProtobuf(
                 commandResponseProto.getReplicateContainerCommandProto());
-        if (commandResponseProto.hasTerm()) {
-          replicateContainerCommand.setTerm(commandResponseProto.getTerm());
-        }
         if (LOG.isDebugEnabled()) {
           LOG.debug("Received SCM container replicate request for container {}",
               replicateContainerCommand.getContainerID());
         }
-        this.context.addCommand(replicateContainerCommand);
+        processCommonCommand(commandResponseProto, replicateContainerCommand);
+        break;
+      case reconstructECContainersCommand:
+        ReconstructECContainersCommand reccc =
+            ReconstructECContainersCommand.getFromProtobuf(
+                commandResponseProto.getReconstructECContainersCommandProto());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Received SCM reconstruct request for container {}",
+              reccc.getContainerID());
+        }
+        processCommonCommand(commandResponseProto, reccc);
         break;
       case deleteContainerCommand:
         DeleteContainerCommand deleteContainerCommand =
             DeleteContainerCommand.getFromProtobuf(
                 commandResponseProto.getDeleteContainerCommandProto());
-        if (commandResponseProto.hasTerm()) {
-          deleteContainerCommand.setTerm(commandResponseProto.getTerm());
-        }
         if (LOG.isDebugEnabled()) {
           LOG.debug("Received SCM delete container request for container {}",
               deleteContainerCommand.getContainerID());
         }
-        this.context.addCommand(deleteContainerCommand);
+        processCommonCommand(commandResponseProto, deleteContainerCommand);
         break;
       case createPipelineCommand:
         CreatePipelineCommand createPipelineCommand =
             CreatePipelineCommand.getFromProtobuf(
                 commandResponseProto.getCreatePipelineCommandProto());
-        if (commandResponseProto.hasTerm()) {
-          createPipelineCommand.setTerm(commandResponseProto.getTerm());
-        }
         if (LOG.isDebugEnabled()) {
           LOG.debug("Received SCM create pipeline request {}",
               createPipelineCommand.getPipelineID());
         }
-        this.context.addCommand(createPipelineCommand);
+        processCommonCommand(commandResponseProto, createPipelineCommand);
         break;
       case closePipelineCommand:
         ClosePipelineCommand closePipelineCommand =
             ClosePipelineCommand.getFromProtobuf(
                 commandResponseProto.getClosePipelineCommandProto());
-        if (commandResponseProto.hasTerm()) {
-          closePipelineCommand.setTerm(commandResponseProto.getTerm());
-        }
         if (LOG.isDebugEnabled()) {
           LOG.debug("Received SCM close pipeline request {}",
               closePipelineCommand.getPipelineID());
         }
-        this.context.addCommand(closePipelineCommand);
+        processCommonCommand(commandResponseProto, closePipelineCommand);
         break;
       case setNodeOperationalStateCommand:
         SetNodeOperationalStateCommand setNodeOperationalStateCommand =
             SetNodeOperationalStateCommand.getFromProtobuf(
                 commandResponseProto.getSetNodeOperationalStateCommandProto());
-        if (commandResponseProto.hasTerm()) {
-          setNodeOperationalStateCommand.setTerm(
-              commandResponseProto.getTerm());
-        }
         if (LOG.isDebugEnabled()) {
           LOG.debug("Received SCM set operational state command. State: {} " +
               "Expiry: {}", setNodeOperationalStateCommand.getOpState(),
               setNodeOperationalStateCommand.getStateExpiryEpochSeconds());
         }
-        this.context.addCommand(setNodeOperationalStateCommand);
+        processCommonCommand(commandResponseProto,
+            setNodeOperationalStateCommand);
         break;
       case finalizeNewLayoutVersionCommand:
         FinalizeNewLayoutVersionCommand finalizeNewLayoutVersionCommand =
             FinalizeNewLayoutVersionCommand.getFromProtobuf(
                 commandResponseProto.getFinalizeNewLayoutVersionCommandProto());
-        if (commandResponseProto.hasTerm()) {
-          finalizeNewLayoutVersionCommand.setTerm(
-              commandResponseProto.getTerm());
-        }
         if (LOG.isDebugEnabled()) {
           LOG.debug("Received SCM finalize command {}",
               finalizeNewLayoutVersionCommand.getId());
         }
-        this.context.addCommand(finalizeNewLayoutVersionCommand);
+        processCommonCommand(commandResponseProto,
+            finalizeNewLayoutVersionCommand);
         break;
       case refreshVolumeUsageInfo:
         RefreshVolumeUsageCommand refreshVolumeUsageCommand =
             RefreshVolumeUsageCommand.getFromProtobuf(
             commandResponseProto.getRefreshVolumeUsageCommandProto());
-        if (commandResponseProto.hasTerm()) {
-          refreshVolumeUsageCommand.setTerm(commandResponseProto.getTerm());
-        }
-        this.context.addCommand(refreshVolumeUsageCommand);
+        processCommonCommand(commandResponseProto, refreshVolumeUsageCommand);
         break;
       default:
         throw new IllegalArgumentException("Unknown response : "
@@ -452,6 +422,38 @@ public class HeartbeatEndpointTask
     }
   }
 
+  /**
+   * Common processing for SCM commands.
+   *  - set term
+   *  - set encoded token
+   *  - add to context's queue
+   */
+  private void processCommonCommand(
+      SCMCommandProto response, SCMCommand<?> cmd) {
+    if (response.hasTerm()) {
+      cmd.setTerm(response.getTerm());
+    }
+    if (response.hasEncodedToken()) {
+      cmd.setEncodedToken(response.getEncodedToken());
+    }
+    context.addCommand(cmd);
+  }
+
+  private void processReregisterCommand() {
+    if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Received SCM notification to register."
+            + " Interrupt HEARTBEAT and transit to REGISTER state.");
+      }
+      rpcEndpoint.setState(EndPointStates.REGISTER);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Illegal state {} found, expecting {}.",
+            rpcEndpoint.getState().name(), EndPointStates.HEARTBEAT);
+      }
+    }
+  }
+
   /**
    * Builder class for HeartbeatEndpointTask.
    */
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
index 2057f58a48..2daa985635 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -18,13 +18,17 @@
 
 package org.apache.hadoop.ozone.container.common.states.endpoint;
 
+import static java.util.Collections.emptyList;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reconstructECContainersCommand;
 import static org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager.maxLayoutVersion;
+import static org.mockito.ArgumentMatchers.any;
 
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -42,6 +46,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachin
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
 import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
 
 import org.junit.Assert;
@@ -57,6 +62,43 @@ public class TestHeartbeatEndpointTask {
   private static final InetSocketAddress TEST_SCM_ENDPOINT =
       new InetSocketAddress("test-scm-1", 9861);
 
+  @Test
+  public void handlesReconstructContainerCommand() throws Exception {
+    StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+        Mockito.mock(
+            StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+
+    ReconstructECContainersCommand cmd = new ReconstructECContainersCommand(
+        1, emptyList(), emptyList(), new byte[]{2, 5},
+        new ECReplicationConfig(3, 2));
+
+    Mockito.when(scm.sendHeartbeat(any()))
+        .thenAnswer(invocation ->
+            SCMHeartbeatResponseProto.newBuilder()
+                .setDatanodeUUID(
+                    ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+                        .getDatanodeDetails().getUuid())
+                .addCommands(SCMCommandProto.newBuilder()
+                    .setCommandType(reconstructECContainersCommand)
+                    .setReconstructECContainersCommandProto(cmd.getProto())
+                    .build())
+                .build());
+
+    OzoneConfiguration conf = new OzoneConfiguration();
+    DatanodeStateMachine datanodeStateMachine =
+        Mockito.mock(DatanodeStateMachine.class);
+    StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+        datanodeStateMachine);
+
+    // WHEN
+    HeartbeatEndpointTask task = getHeartbeatEndpointTask(conf, context, scm);
+    task.call();
+
+    // THEN
+    Assert.assertEquals(1, context.getCommandQueueSummary()
+        .get(reconstructECContainersCommand).intValue());
+  }
+
   @Test
   public void testheartbeatWithoutReports() throws Exception {
     StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 0b5bee8929..625a59e0ec 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReconstructECContainersCommandProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReregisterCommandProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
@@ -90,6 +91,7 @@ import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProt
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteBlocksCommand;
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand;
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.finalizeNewLayoutVersionCommand;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reconstructECContainersCommand;
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.refreshVolumeUsageInfo;
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.replicateContainerCommand;
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reregisterCommand;
@@ -352,6 +354,12 @@ public class SCMDatanodeProtocolServer implements
           .setReplicateContainerCommandProto(
               ((ReplicateContainerCommand)cmd).getProto())
           .build();
+    case reconstructECContainersCommand:
+      return builder
+          .setCommandType(reconstructECContainersCommand)
+          .setReconstructECContainersCommandProto(
+              (ReconstructECContainersCommandProto) cmd.getProto())
+          .build();
     case createPipelineCommand:
       return builder
           .setCommandType(createPipelineCommand)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org