You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by si...@apache.org on 2021/02/10 17:21:47 UTC

[ozone] branch master updated: HDDS-3974. StateContext#addPipelineActionIfAbsent does not work as expected (#1898)

This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b67ae0  HDDS-3974. StateContext#addPipelineActionIfAbsent does not work as expected (#1898)
6b67ae0 is described below

commit 6b67ae0c40110af195b897fb70a87dc6e82d467e
Author: Siyao Meng <50...@users.noreply.github.com>
AuthorDate: Wed Feb 10 09:21:33 2021 -0800

    HDDS-3974. StateContext#addPipelineActionIfAbsent does not work as expected (#1898)
---
 .../common/statemachine/StateContext.java          | 34 +++++++++++++++-------
 .../common/statemachine/TestStateContext.java      | 24 +++++++++++++--
 2 files changed, 44 insertions(+), 14 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 28b7331..77260e2 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -412,6 +412,24 @@ public class StateContext {
   }
 
   /**
+   * Helper function for addPipelineActionIfAbsent that check if inputs are the
+   * same close pipeline action.
+   *
+   * Important Note: Make sure to double check for correctness before using this
+   * helper function for other purposes!
+   *
+   * @return true if a1 and a2 are the same close pipeline action,
+   *         false otherwise
+   */
+  boolean isSameClosePipelineAction(PipelineAction a1, PipelineAction a2) {
+    return a1.getAction() == a2.getAction()
+        && a1.hasClosePipeline()
+        && a2.hasClosePipeline()
+        && a1.getClosePipeline().getPipelineID()
+        .equals(a2.getClosePipeline().getPipelineID());
+  }
+
+  /**
    * Add PipelineAction to PipelineAction queue if it's not present.
    *
    * @param pipelineAction PipelineAction to be added
@@ -427,18 +445,12 @@ public class StateContext {
        * multiple times here.
        */
       for (InetSocketAddress endpoint : endpoints) {
-        Queue<PipelineAction> actionsForEndpoint =
-            this.pipelineActions.get(endpoint);
-        for (PipelineAction pipelineActionIter : actionsForEndpoint) {
-          if (pipelineActionIter.getAction() == pipelineAction.getAction()
-              && pipelineActionIter.hasClosePipeline() && pipelineAction
-              .hasClosePipeline()
-              && pipelineActionIter.getClosePipeline().getPipelineID()
-              .equals(pipelineAction.getClosePipeline().getPipelineID())) {
-            break;
-          }
+        final Queue<PipelineAction> actionsForEndpoint =
+            pipelineActions.get(endpoint);
+        if (actionsForEndpoint.stream().noneMatch(
+            action -> isSameClosePipelineAction(action, pipelineAction))) {
+          actionsForEndpoint.add(pipelineAction);
         }
-        actionsForEndpoint.add(pipelineAction);
       }
     }
   }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index e9c39d3..67f5073 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.container.common.statemachine;
 
 import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction.Action.CLOSE;
 import static org.apache.hadoop.test.GenericTestUtils.waitFor;
 import static org.junit.Assert.assertEquals;
@@ -49,6 +50,7 @@ import com.google.protobuf.Descriptors.Descriptor;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
 import org.apache.hadoop.ozone.container.common.states.DatanodeState;
 import org.apache.hadoop.test.LambdaTestUtils;
@@ -392,13 +394,29 @@ public class TestStateContext {
     stateContext.addEndpoint(scm1);
     stateContext.addEndpoint(scm2);
 
+    final ClosePipelineInfo closePipelineInfo = ClosePipelineInfo.newBuilder()
+        .setPipelineID(PipelineID.randomId().getProtobuf())
+        .setReason(ClosePipelineInfo.Reason.PIPELINE_FAILED)
+        .setDetailedReason("Test").build();
+    final PipelineAction pipelineAction = PipelineAction.newBuilder()
+        .setClosePipeline(closePipelineInfo)
+        .setAction(PipelineAction.Action.CLOSE)
+        .build();
+
     // Add PipelineAction. Should be added to all endpoints.
-    stateContext.addPipelineActionIfAbsent(
-        PipelineAction.newBuilder().setAction(
-            PipelineAction.Action.CLOSE).build());
+    stateContext.addPipelineActionIfAbsent(pipelineAction);
+
+    pipelineActions = stateContext.getPendingPipelineAction(scm2, 10);
+    assertEquals(1, pipelineActions.size());
+    // The pipeline action is dequeued from scm2 now, but still in scm1
 
+    // The same pipeline action will not be added if it already exists
+    stateContext.addPipelineActionIfAbsent(pipelineAction);
     pipelineActions = stateContext.getPendingPipelineAction(scm1, 10);
     assertEquals(1, pipelineActions.size());
+    // The pipeline action should have been be added back to the scm2
+    pipelineActions = stateContext.getPendingPipelineAction(scm2, 10);
+    assertEquals(1, pipelineActions.size());
 
     // Add ContainerAction. Should be added to all endpoints.
     stateContext.addContainerAction(ContainerAction.newBuilder()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org