You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yq...@apache.org on 2019/02/18 14:36:31 UTC

[hadoop] branch trunk updated: HDDS-1106. Introduce queryMap in PipelineManager. Contributed by Lokesh Jain.

This is an automated email from the ASF dual-hosted git repository.

yqlin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f2fb653  HDDS-1106. Introduce queryMap in PipelineManager. Contributed by Lokesh Jain.
f2fb653 is described below

commit f2fb6536dcbe6320f69273bf9e11d4701248172c
Author: Yiqun Lin <yq...@apache.org>
AuthorDate: Mon Feb 18 22:35:23 2019 +0800

    HDDS-1106. Introduce queryMap in PipelineManager. Contributed by Lokesh Jain.
---
 .../hadoop/hdds/scm/pipeline/PipelineStateMap.java | 72 +++++++++++++++++++++-
 .../scm/pipeline/TestPipelineStateManager.java     | 42 +++++++++++++
 2 files changed, 111 insertions(+), 3 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
index dea2115..2b6c61b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
 
 /**
@@ -42,15 +44,27 @@ class PipelineStateMap {
 
   private final Map<PipelineID, Pipeline> pipelineMap;
   private final Map<PipelineID, NavigableSet<ContainerID>> pipeline2container;
+  private final Map<PipelineQuery, List<Pipeline>> query2OpenPipelines;
 
   PipelineStateMap() {
 
     // TODO: Use TreeMap for range operations?
-    this.pipelineMap = new HashMap<>();
-    this.pipeline2container = new HashMap<>();
+    pipelineMap = new HashMap<>();
+    pipeline2container = new HashMap<>();
+    query2OpenPipelines = new HashMap<>();
+    initializeQueryMap();
 
   }
 
+  private void initializeQueryMap() {
+    for (ReplicationType type : ReplicationType.values()) {
+      for (ReplicationFactor factor : ReplicationFactor.values()) {
+        query2OpenPipelines
+            .put(new PipelineQuery(type, factor), new CopyOnWriteArrayList<>());
+      }
+    }
+  }
+
   /**
    * Adds provided pipeline in the data structures.
    *
@@ -70,6 +84,9 @@ class PipelineStateMap {
           .format("Duplicate pipeline ID %s detected.", pipeline.getId()));
     }
     pipeline2container.put(pipeline.getId(), new TreeSet<>());
+    if (pipeline.getPipelineState() == PipelineState.OPEN) {
+      query2OpenPipelines.get(new PipelineQuery(pipeline)).add(pipeline);
+    }
   }
 
   /**
@@ -188,6 +205,10 @@ class PipelineStateMap {
     Preconditions.checkNotNull(factor, "Replication factor cannot be null");
     Preconditions.checkNotNull(state, "Pipeline state cannot be null");
 
+    if (state == PipelineState.OPEN) {
+      return Collections.unmodifiableList(
+          query2OpenPipelines.get(new PipelineQuery(type, factor)));
+    }
     return pipelineMap.values().stream().filter(
         pipeline -> pipeline.getType() == type
             && pipeline.getPipelineState() == state
@@ -293,7 +314,52 @@ class PipelineStateMap {
     Preconditions.checkNotNull(state, "Pipeline LifeCycleState cannot be null");
 
     final Pipeline pipeline = getPipeline(pipelineID);
-    return pipelineMap.compute(pipelineID,
+    Pipeline updatedPipeline = pipelineMap.compute(pipelineID,
         (id, p) -> Pipeline.newBuilder(pipeline).setState(state).build());
+    PipelineQuery query = new PipelineQuery(pipeline);
+    if (updatedPipeline.getPipelineState() == PipelineState.OPEN) {
+      // for transition to OPEN state add pipeline to query2OpenPipelines
+      query2OpenPipelines.get(query).add(updatedPipeline);
+    } else if (updatedPipeline.getPipelineState() == PipelineState.CLOSED) {
+      // for transition from OPEN to CLOSED state remove pipeline from
+      // query2OpenPipelines
+      query2OpenPipelines.get(query).remove(pipeline);
+    }
+    return updatedPipeline;
+  }
+
+  private class PipelineQuery {
+    private ReplicationType type;
+    private ReplicationFactor factor;
+
+    PipelineQuery(ReplicationType type, ReplicationFactor factor) {
+      this.type = type;
+      this.factor = factor;
+    }
+
+    PipelineQuery(Pipeline pipeline) {
+      type = pipeline.getType();
+      factor = pipeline.getFactor();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (this == other) {
+        return true;
+      }
+      if (!this.getClass().equals(other.getClass())) {
+        return false;
+      }
+      PipelineQuery otherQuery = (PipelineQuery) other;
+      return type == otherQuery.type && factor == otherQuery.factor;
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder()
+          .append(type)
+          .append(factor)
+          .toHashCode();
+    }
   }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
index 823cd7d..33dd7df 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
@@ -419,6 +419,48 @@ public class TestPipelineStateManager {
     removePipeline(pipeline);
   }
 
+  @Test
+  public void testQueryPipeline() throws IOException {
+    Pipeline pipeline = createDummyPipeline(HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.THREE, 3);
+    // pipeline in allocated state should not be reported
+    stateManager.addPipeline(pipeline);
+    Assert.assertEquals(0, stateManager
+        .getPipelines(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
+        .size());
+
+    // pipeline in open state should be reported
+    stateManager.openPipeline(pipeline.getId());
+    Assert.assertEquals(1, stateManager
+        .getPipelines(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
+        .size());
+
+    Pipeline pipeline2 = createDummyPipeline(HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.THREE, 3);
+    pipeline2 = Pipeline.newBuilder(pipeline2)
+        .setState(Pipeline.PipelineState.OPEN)
+        .build();
+    // pipeline in open state should be reported
+    stateManager.addPipeline(pipeline2);
+    Assert.assertEquals(2, stateManager
+        .getPipelines(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
+        .size());
+
+    // pipeline in closed state should not be reported
+    stateManager.finalizePipeline(pipeline2.getId());
+    Assert.assertEquals(1, stateManager
+        .getPipelines(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
+        .size());
+
+    // clean up
+    removePipeline(pipeline);
+    removePipeline(pipeline2);
+  }
+
   private void removePipeline(Pipeline pipeline) throws IOException {
     stateManager.finalizePipeline(pipeline.getId());
     stateManager.removePipeline(pipeline.getId());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org