You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2019/03/25 20:11:09 UTC

[hadoop] branch trunk updated: HDDS-1217. Refactor ChillMode rules and chillmode manager. (#558)

This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8739693  HDDS-1217. Refactor ChillMode rules and chillmode manager. (#558)
8739693 is described below

commit 8739693514ac92c33b38e472c37b7dcf4febe73f
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Mon Mar 25 13:11:03 2019 -0700

    HDDS-1217. Refactor ChillMode rules and chillmode manager. (#558)
---
 .../hdds/scm/chillmode/ChillModeExitRule.java      |  88 +++++++++-
 .../hdds/scm/chillmode/ContainerChillModeRule.java |  70 ++++----
 .../hdds/scm/chillmode/DataNodeChillModeRule.java  |  60 +++----
 .../chillmode/HealthyPipelineChillModeRule.java    | 114 +++++++------
 .../chillmode/OneReplicaPipelineChillModeRule.java |  83 +++++----
 .../hdds/scm/chillmode/SCMChillModeManager.java    |  92 +++++++---
 .../scm/chillmode/TestSCMChillModeManager.java     | 190 ++++++++++++++++++++-
 7 files changed, 497 insertions(+), 200 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ChillModeExitRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ChillModeExitRule.java
index d283dfe..0c9b823 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ChillModeExitRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ChillModeExitRule.java
@@ -17,16 +17,94 @@
  */
 package org.apache.hadoop.hdds.scm.chillmode;
 
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.server.events.TypedEvent;
+
 /**
- * Interface for defining chill mode exit rules.
+ * Abstract class for ChillModeExitRules. When a new rule is added, the new
+ * rule should extend this abstract class.
+ *
+ * Each rule Should do:
+ * 1. Should add a handler for the event it is looking for during the
+ * initialization of the rule.
+ * 2. Add the rule in ScmChillModeManager to list of the rules.
+ *
  *
  * @param <T>
  */
-public interface ChillModeExitRule<T> {
+public abstract class ChillModeExitRule<T> implements EventHandler<T> {
+
+  private final SCMChillModeManager chillModeManager;
+  private final String ruleName;
+
+  public ChillModeExitRule(SCMChillModeManager chillModeManager,
+      String ruleName, EventQueue eventQueue) {
+    this.chillModeManager = chillModeManager;
+    this.ruleName = ruleName;
+    eventQueue.addHandler(getEventType(), this);
+  }
+
+  /**
+   * Return's the name of this ChillModeExit Rule.
+   * @return ruleName
+   */
+  public String getRuleName() {
+    return ruleName;
+  }
+
+  /**
+   * Return's the event type this chillMode exit rule handles.
+   * @return TypedEvent
+   */
+  protected abstract TypedEvent<T> getEventType();
+
+  /**
+   * Validate's this rule. If this rule condition is met, returns true, else
+   * returns false.
+   * @return boolean
+   */
+  protected abstract boolean validate();
+
+  /**
+   * Actual processing logic for this rule.
+   * @param report
+   */
+  protected abstract void process(T report);
+
+  /**
+   * Cleanup action's need to be done, once this rule is satisfied.
+   */
+  protected abstract void cleanup();
+
+  @Override
+  public final void onMessage(T report, EventPublisher publisher) {
+
+    // TODO: when we have remove handlers, we can remove getInChillmode check
+
+    if (scmInChillMode()) {
+      if (validate()) {
+        chillModeManager.validateChillModeExitRules(ruleName, publisher);
+        cleanup();
+        return;
+      }
+
+      process(report);
 
-  boolean validate();
+      if (validate()) {
+        chillModeManager.validateChillModeExitRules(ruleName, publisher);
+        cleanup();
+      }
+    }
+  }
 
-  void process(T report);
+  /**
+   * Return true if SCM is in chill mode, else false.
+   * @return boolean
+   */
+  protected boolean scmInChillMode() {
+    return chillModeManager.getInChillMode();
+  }
 
-  void cleanup();
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ContainerChillModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ContainerChillModeRule.java
index 17dd496..cd08786 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ContainerChillModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ContainerChillModeRule.java
@@ -22,22 +22,24 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer.NodeRegistrationContainerReport;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
+    .NodeRegistrationContainerReport;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.server.events.TypedEvent;
 
 /**
  * Class defining Chill mode exit criteria for Containers.
  */
-public class ContainerChillModeRule implements
-    ChillModeExitRule<NodeRegistrationContainerReport>,
-    EventHandler<NodeRegistrationContainerReport> {
+public class ContainerChillModeRule extends
+    ChillModeExitRule<NodeRegistrationContainerReport>{
 
   // Required cutoff % for containers with at least 1 reported replica.
   private double chillModeCutoff;
@@ -46,14 +48,20 @@ public class ContainerChillModeRule implements
   private double maxContainer;
 
   private AtomicLong containerWithMinReplicas = new AtomicLong(0);
-  private final SCMChillModeManager chillModeManager;
 
-  public ContainerChillModeRule(Configuration conf,
+  public ContainerChillModeRule(String ruleName, EventQueue eventQueue,
+      Configuration conf,
       List<ContainerInfo> containers, SCMChillModeManager manager) {
+    super(manager, ruleName, eventQueue);
     chillModeCutoff = conf.getDouble(
         HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT,
         HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT);
-    chillModeManager = manager;
+
+    Preconditions.checkArgument(
+        (chillModeCutoff >= 0.0 && chillModeCutoff <= 1.0),
+        HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT  +
+            " value should be >= 0.0 and <= 1.0");
+
     containerMap = new ConcurrentHashMap<>();
     if(containers != null) {
       containers.forEach(c -> {
@@ -67,10 +75,18 @@ public class ContainerChillModeRule implements
       });
       maxContainer = containerMap.size();
     }
+
+  }
+
+
+  @Override
+  protected TypedEvent<NodeRegistrationContainerReport> getEventType() {
+    return SCMEvents.NODE_REGISTRATION_CONT_REPORT;
   }
 
+
   @Override
-  public boolean validate() {
+  protected boolean validate() {
     return getCurrentContainerThreshold() >= chillModeCutoff;
   }
 
@@ -83,7 +99,7 @@ public class ContainerChillModeRule implements
   }
 
   @Override
-  public void process(NodeRegistrationContainerReport reportsProto) {
+  protected void process(NodeRegistrationContainerReport reportsProto) {
 
     reportsProto.getReport().getReportsList().forEach(c -> {
       if (containerMap.containsKey(c.getContainerID())) {
@@ -92,37 +108,17 @@ public class ContainerChillModeRule implements
         }
       }
     });
-  }
-
-  @Override
-  public void onMessage(NodeRegistrationContainerReport
-      nodeRegistrationContainerReport, EventPublisher publisher) {
-
-    // TODO: when we have remove handlers, we can remove getInChillmode check
-
-    if (chillModeManager.getInChillMode()) {
-      if (validate()) {
-        return;
-      }
-
-      process(nodeRegistrationContainerReport);
-      if (chillModeManager.getInChillMode()) {
-        SCMChillModeManager.getLogger().info(
-            "SCM in chill mode. {} % containers have at least one"
-                + " reported replica.",
-            (containerWithMinReplicas.get() / maxContainer) * 100);
-      }
-
-      if (validate()) {
-        chillModeManager.validateChillModeExitRules(publisher);
-      }
 
+    if (scmInChillMode()) {
+      SCMChillModeManager.getLogger().info(
+          "SCM in chill mode. {} % containers have at least one"
+              + " reported replica.",
+          (containerWithMinReplicas.doubleValue() / maxContainer) * 100);
     }
-
   }
 
   @Override
-  public void cleanup() {
+  protected void cleanup() {
     containerMap.clear();
   }
 }
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/DataNodeChillModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/DataNodeChillModeRule.java
index be99962..aae2160 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/DataNodeChillModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/DataNodeChillModeRule.java
@@ -22,19 +22,18 @@ import java.util.UUID;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer.NodeRegistrationContainerReport;
 
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.server.events.TypedEvent;
 
 /**
  * Class defining Chill mode exit criteria according to number of DataNodes
  * registered with SCM.
  */
-public class DataNodeChillModeRule implements
-    ChillModeExitRule<NodeRegistrationContainerReport>,
-    EventHandler<NodeRegistrationContainerReport> {
+public class DataNodeChillModeRule extends
+    ChillModeExitRule<NodeRegistrationContainerReport>{
 
   // Min DataNodes required to exit chill mode.
   private int requiredDns;
@@ -42,61 +41,42 @@ public class DataNodeChillModeRule implements
   // Set to track registered DataNodes.
   private HashSet<UUID> registeredDnSet;
 
-  private final SCMChillModeManager chillModeManager;
-
-  public DataNodeChillModeRule(Configuration conf,
+  public DataNodeChillModeRule(String ruleName, EventQueue eventQueue,
+      Configuration conf,
       SCMChillModeManager manager) {
+    super(manager, ruleName, eventQueue);
     requiredDns = conf.getInt(
         HddsConfigKeys.HDDS_SCM_CHILLMODE_MIN_DATANODE,
         HddsConfigKeys.HDDS_SCM_CHILLMODE_MIN_DATANODE_DEFAULT);
     registeredDnSet = new HashSet<>(requiredDns * 2);
-    chillModeManager = manager;
   }
 
   @Override
-  public boolean validate() {
-    return registeredDns >= requiredDns;
+  protected TypedEvent<NodeRegistrationContainerReport> getEventType() {
+    return SCMEvents.NODE_REGISTRATION_CONT_REPORT;
   }
 
-  @VisibleForTesting
-  public double getRegisteredDataNodes() {
-    return registeredDns;
+  @Override
+  protected boolean validate() {
+    return registeredDns >= requiredDns;
   }
 
   @Override
-  public void process(NodeRegistrationContainerReport reportsProto) {
+  protected void process(NodeRegistrationContainerReport reportsProto) {
 
     registeredDnSet.add(reportsProto.getDatanodeDetails().getUuid());
     registeredDns = registeredDnSet.size();
 
-  }
-
-  @Override
-  public void onMessage(NodeRegistrationContainerReport
-      nodeRegistrationContainerReport, EventPublisher publisher) {
-    // TODO: when we have remove handlers, we can remove getInChillmode check
-
-    if (chillModeManager.getInChillMode()) {
-      if (validate()) {
-        return;
-      }
-
-      process(nodeRegistrationContainerReport);
-
-      if (chillModeManager.getInChillMode()) {
-        SCMChillModeManager.getLogger().info(
-            "SCM in chill mode. {} DataNodes registered, {} required.",
-            registeredDns, requiredDns);
-      }
-
-      if (validate()) {
-        chillModeManager.validateChillModeExitRules(publisher);
-      }
+    if (scmInChillMode()) {
+      SCMChillModeManager.getLogger().info(
+          "SCM in chill mode. {} DataNodes registered, {} required.",
+          registeredDns, requiredDns);
     }
+
   }
 
   @Override
-  public void cleanup() {
+  protected void cleanup() {
     registeredDnSet.clear();
   }
 }
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/HealthyPipelineChillModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/HealthyPipelineChillModeRule.java
index 3f475b8..e0a9a40 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/HealthyPipelineChillModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/HealthyPipelineChillModeRule.java
@@ -17,21 +17,24 @@
  */
 package org.apache.hadoop.hdds.scm.chillmode;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
+
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.server.events.TypedEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,28 +49,33 @@ import java.util.Set;
  * through in a cluster.
  */
 public class HealthyPipelineChillModeRule
-    implements ChillModeExitRule<PipelineReportFromDatanode>,
-    EventHandler<PipelineReportFromDatanode> {
+    extends ChillModeExitRule<PipelineReportFromDatanode>{
 
   public static final Logger LOG =
       LoggerFactory.getLogger(HealthyPipelineChillModeRule.class);
   private final PipelineManager pipelineManager;
-  private final SCMChillModeManager chillModeManager;
   private final int healthyPipelineThresholdCount;
   private int currentHealthyPipelineCount = 0;
   private final Set<DatanodeDetails> processedDatanodeDetails =
       new HashSet<>();
 
-  HealthyPipelineChillModeRule(PipelineManager pipelineManager,
+  HealthyPipelineChillModeRule(String ruleName, EventQueue eventQueue,
+      PipelineManager pipelineManager,
       SCMChillModeManager manager, Configuration configuration) {
+    super(manager, ruleName, eventQueue);
     this.pipelineManager = pipelineManager;
-    this.chillModeManager = manager;
     double healthyPipelinesPercent =
         configuration.getDouble(HddsConfigKeys.
                 HDDS_SCM_CHILLMODE_HEALTHY_PIPELINE_THRESHOLD_PCT,
             HddsConfigKeys.
                 HDDS_SCM_CHILLMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT);
 
+    Preconditions.checkArgument(
+        (healthyPipelinesPercent >= 0.0 && healthyPipelinesPercent <= 1.0),
+        HddsConfigKeys.
+            HDDS_SCM_CHILLMODE_HEALTHY_PIPELINE_THRESHOLD_PCT
+            + " value should be >= 0.0 and <= 1.0");
+
     // As we want to wait for 3 node pipelines
     int pipelineCount =
         pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
@@ -84,67 +92,53 @@ public class HealthyPipelineChillModeRule
   }
 
   @Override
-  public boolean validate() {
-    if (currentHealthyPipelineCount >= healthyPipelineThresholdCount) {
-      return true;
-    }
-    return false;
+  protected TypedEvent<PipelineReportFromDatanode> getEventType() {
+    return SCMEvents.PROCESSED_PIPELINE_REPORT;
   }
 
   @Override
-  public void process(PipelineReportFromDatanode pipelineReportFromDatanode) {
-    Pipeline pipeline;
-    Preconditions.checkNotNull(pipelineReportFromDatanode);
-    PipelineReportsProto pipelineReport =
-        pipelineReportFromDatanode.getReport();
-
-    for (PipelineReport report : pipelineReport.getPipelineReportList()) {
-      PipelineID pipelineID = PipelineID
-          .getFromProtobuf(report.getPipelineID());
-      try {
-        pipeline = pipelineManager.getPipeline(pipelineID);
-      } catch (PipelineNotFoundException e) {
-        continue;
-      }
-
-      if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
-          pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
-        // If the pipeline is open state mean, all 3 datanodes are reported
-        // for this pipeline.
-        currentHealthyPipelineCount++;
-      }
+  protected boolean validate() {
+    if (currentHealthyPipelineCount >= healthyPipelineThresholdCount) {
+      return true;
     }
+    return false;
   }
 
   @Override
-  public void cleanup() {
-    // No need to deal with
-  }
-
-  @Override
-  public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
-      EventPublisher publisher) {
-    // If we have already reached healthy pipeline threshold, skip processing
-    // pipeline report from datanode.
-
-    if (validate()) {
-      chillModeManager.validateChillModeExitRules(publisher);
-      return;
-    }
-
+  protected void process(PipelineReportFromDatanode
+      pipelineReportFromDatanode) {
 
     // When SCM is in chill mode for long time, already registered
     // datanode can send pipeline report again, then pipeline handler fires
     // processed report event, we should not consider this pipeline report
     // from datanode again during threshold calculation.
+    Preconditions.checkNotNull(pipelineReportFromDatanode);
     DatanodeDetails dnDetails = pipelineReportFromDatanode.getDatanodeDetails();
     if (!processedDatanodeDetails.contains(
         pipelineReportFromDatanode.getDatanodeDetails())) {
 
-      // Process pipeline report from datanode
-      process(pipelineReportFromDatanode);
+      Pipeline pipeline;
+      PipelineReportsProto pipelineReport =
+          pipelineReportFromDatanode.getReport();
+
+      for (PipelineReport report : pipelineReport.getPipelineReportList()) {
+        PipelineID pipelineID = PipelineID
+            .getFromProtobuf(report.getPipelineID());
+        try {
+          pipeline = pipelineManager.getPipeline(pipelineID);
+        } catch (PipelineNotFoundException e) {
+          continue;
+        }
+
+        if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
+            pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
+          // If the pipeline is open state mean, all 3 datanodes are reported
+          // for this pipeline.
+          currentHealthyPipelineCount++;
+        }
+      }
 
-      if (chillModeManager.getInChillMode()) {
+      if (scmInChillMode()) {
         SCMChillModeManager.getLogger().info(
             "SCM in chill mode. Healthy pipelines reported count is {}, " +
                 "required healthy pipeline reported count is {}",
@@ -154,8 +148,20 @@ public class HealthyPipelineChillModeRule
       processedDatanodeDetails.add(dnDetails);
     }
 
-    if (validate()) {
-      chillModeManager.validateChillModeExitRules(publisher);
-    }
+  }
+
+  @Override
+  protected void cleanup() {
+    processedDatanodeDetails.clear();
+  }
+
+  @VisibleForTesting
+  public int getCurrentHealthyPipelineCount() {
+    return currentHealthyPipelineCount;
+  }
+
+  @VisibleForTesting
+  public int getHealthyPipelineThresholdCount() {
+    return healthyPipelineThresholdCount;
   }
 }
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/OneReplicaPipelineChillModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/OneReplicaPipelineChillModeRule.java
index 20b35b8..01db476 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/OneReplicaPipelineChillModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/OneReplicaPipelineChillModeRule.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.hdds.scm.chillmode;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
@@ -25,14 +26,15 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineReport;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.
     PipelineReportFromDatanode;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.server.events.TypedEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,13 +42,12 @@ import java.util.HashSet;
 import java.util.Set;
 
 /**
- * This rule covers whether we have atleast one datanode is reported for each
+ * This rule covers whether we have at least one datanode is reported for each
  * pipeline. This rule is for all open containers, we have at least one
  * replica available for read when we exit chill mode.
  */
-public class OneReplicaPipelineChillModeRule implements
-    ChillModeExitRule<PipelineReportFromDatanode>,
-    EventHandler<PipelineReportFromDatanode> {
+public class OneReplicaPipelineChillModeRule extends
+    ChillModeExitRule<PipelineReportFromDatanode> {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(OneReplicaPipelineChillModeRule.class);
@@ -54,12 +55,13 @@ public class OneReplicaPipelineChillModeRule implements
   private int thresholdCount;
   private Set<PipelineID> reportedPipelineIDSet = new HashSet<>();
   private final PipelineManager pipelineManager;
-  private final SCMChillModeManager chillModeManager;
+  private int currentReportedPipelineCount = 0;
 
-  public OneReplicaPipelineChillModeRule(PipelineManager pipelineManager,
-      SCMChillModeManager chillModeManager,
-      Configuration configuration) {
-    this.chillModeManager = chillModeManager;
+
+  public OneReplicaPipelineChillModeRule(String ruleName, EventQueue eventQueue,
+      PipelineManager pipelineManager,
+      SCMChillModeManager chillModeManager, Configuration configuration) {
+    super(chillModeManager, ruleName, eventQueue);
     this.pipelineManager = pipelineManager;
 
     double percent =
@@ -68,6 +70,11 @@ public class OneReplicaPipelineChillModeRule implements
             HddsConfigKeys.
                 HDDS_SCM_CHILLMODE_ONE_NODE_REPORTED_PIPELINE_PCT_DEFAULT);
 
+    Preconditions.checkArgument((percent >= 0.0 && percent <= 1.0),
+        HddsConfigKeys.
+            HDDS_SCM_CHILLMODE_ONE_NODE_REPORTED_PIPELINE_PCT  +
+            " value should be >= 0.0 and <= 1.0");
+
     int totalPipelineCount =
         pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE).size();
@@ -79,16 +86,23 @@ public class OneReplicaPipelineChillModeRule implements
         thresholdCount);
 
   }
+
   @Override
-  public boolean validate() {
-    if (reportedPipelineIDSet.size() >= thresholdCount) {
+  protected TypedEvent<PipelineReportFromDatanode> getEventType() {
+    return SCMEvents.PROCESSED_PIPELINE_REPORT;
+  }
+
+  @Override
+  protected boolean validate() {
+    if (currentReportedPipelineCount >= thresholdCount) {
       return true;
     }
     return false;
   }
 
   @Override
-  public void process(PipelineReportFromDatanode pipelineReportFromDatanode) {
+  protected void process(PipelineReportFromDatanode
+      pipelineReportFromDatanode) {
     Pipeline pipeline;
     Preconditions.checkNotNull(pipelineReportFromDatanode);
     PipelineReportsProto pipelineReport =
@@ -108,35 +122,32 @@ public class OneReplicaPipelineChillModeRule implements
         reportedPipelineIDSet.add(pipelineID);
       }
     }
-  }
 
-  @Override
-  public void cleanup() {
-    reportedPipelineIDSet.clear();
-  }
+    currentReportedPipelineCount = reportedPipelineIDSet.size();
 
-  @Override
-  public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
-      EventPublisher publisher) {
-
-    if (validate()) {
-      chillModeManager.validateChillModeExitRules(publisher);
-      return;
-    }
-
-    // Process pipeline report from datanode
-    process(pipelineReportFromDatanode);
-
-    if (chillModeManager.getInChillMode()) {
+    if (scmInChillMode()) {
       SCMChillModeManager.getLogger().info(
           "SCM in chill mode. Pipelines with atleast one datanode reported " +
               "count is {}, required atleast one datanode reported per " +
               "pipeline count is {}",
-          reportedPipelineIDSet.size(), thresholdCount);
+          currentReportedPipelineCount, thresholdCount);
     }
 
-    if (validate()) {
-      chillModeManager.validateChillModeExitRules(publisher);
-    }
   }
+
+  @Override
+  protected void cleanup() {
+    reportedPipelineIDSet.clear();
+  }
+
+  @VisibleForTesting
+  public int getThresholdCount() {
+    return thresholdCount;
+  }
+
+  @VisibleForTesting
+  public int getCurrentReportedPipelineCount() {
+    return currentReportedPipelineCount;
+  }
+
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java
index 1c7c881..194990d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java
@@ -18,9 +18,12 @@
 package org.apache.hadoop.hdds.scm.chillmode;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
@@ -39,10 +42,38 @@ import org.slf4j.LoggerFactory;
  *
  * ChillModeExitRule defines format to define new rules which must be satisfied
  * to exit Chill mode.
- * ContainerChillModeRule defines the only exit criteria right now.
- * On every new datanode registration event this class adds replicas
- * for reported containers and validates if cutoff threshold for
- * containers is meet.
+ *
+ * Current ChillMode rules:
+ * 1. ContainerChillModeRule:
+ * On every new datanode registration, it fires
+ * {@link SCMEvents#NODE_REGISTRATION_CONT_REPORT}.  This rule handles this
+ * event. This rule process this report, increment the
+ * containerWithMinReplicas count when this reported replica is in the
+ * containerMap. Then validates if cutoff threshold for containers is meet.
+ *
+ * 2. DatanodeChillModeRule:
+ * On every new datanode registration, it fires
+ * {@link SCMEvents#NODE_REGISTRATION_CONT_REPORT}. This rule handles this
+ * event. This rule process this report, and check if this is new node, add
+ * to its reported node list. Then validate it cutoff threshold for minimum
+ * number of datanode registered is met or not.
+ *
+ * 3. HealthyPipelineChillModeRule:
+ * Once the pipelineReportHandler processes the
+ * {@link SCMEvents#PIPELINE_REPORT}, it fires
+ * {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this
+ * event. This rule processes this report, and check if pipeline is healthy
+ * and increments current healthy pipeline count. Then validate it cutoff
+ * threshold for healthy pipeline is met or not.
+ *
+ * 4. OneReplicaPipelineChillModeRule:
+ * Once the pipelineReportHandler processes the
+ * {@link SCMEvents#PIPELINE_REPORT}, it fires
+ * {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this
+ * event. This rule processes this report, and add the reported pipeline to
+ * reported pipeline set. Then validate it cutoff threshold for one replica
+ * per pipeline is met or not.
+ *
  */
 public class SCMChillModeManager {
 
@@ -60,6 +91,8 @@ public class SCMChillModeManager {
   private static final String ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE =
       "AtleastOneDatanodeReportedRule";
 
+  private Set<String> validatedRules = new HashSet<>();
+
   private final EventQueue eventPublisher;
   private final PipelineManager pipelineManager;
 
@@ -75,30 +108,27 @@ public class SCMChillModeManager {
 
     if (isChillModeEnabled) {
       ContainerChillModeRule containerChillModeRule =
-          new ContainerChillModeRule(config, allContainers, this);
+          new ContainerChillModeRule(CONT_EXIT_RULE, eventQueue, config,
+              allContainers, this);
       DataNodeChillModeRule dataNodeChillModeRule =
-          new DataNodeChillModeRule(config, this);
+          new DataNodeChillModeRule(DN_EXIT_RULE, eventQueue, config, this);
       exitRules.put(CONT_EXIT_RULE, containerChillModeRule);
       exitRules.put(DN_EXIT_RULE, dataNodeChillModeRule);
-      eventPublisher.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
-          containerChillModeRule);
-      eventPublisher.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
-          dataNodeChillModeRule);
-
       if (conf.getBoolean(
           HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK,
           HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT)
           && pipelineManager != null) {
-        HealthyPipelineChillModeRule rule = new HealthyPipelineChillModeRule(
-            pipelineManager, this, config);
+        HealthyPipelineChillModeRule healthyPipelineChillModeRule =
+            new HealthyPipelineChillModeRule(HEALTHY_PIPELINE_EXIT_RULE,
+                eventQueue, pipelineManager,
+                this, config);
         OneReplicaPipelineChillModeRule oneReplicaPipelineChillModeRule =
-            new OneReplicaPipelineChillModeRule(pipelineManager, this, conf);
-        exitRules.put(HEALTHY_PIPELINE_EXIT_RULE, rule);
+            new OneReplicaPipelineChillModeRule(
+                ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE, eventQueue,
+                pipelineManager, this, conf);
+        exitRules.put(HEALTHY_PIPELINE_EXIT_RULE, healthyPipelineChillModeRule);
         exitRules.put(ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE,
             oneReplicaPipelineChillModeRule);
-        eventPublisher.addHandler(SCMEvents.PROCESSED_PIPELINE_REPORT, rule);
-        eventPublisher.addHandler(SCMEvents.PROCESSED_PIPELINE_REPORT,
-            oneReplicaPipelineChillModeRule);
       }
       emitChillModeStatus();
     } else {
@@ -115,13 +145,24 @@ public class SCMChillModeManager {
         new ChillModeStatus(getInChillMode()));
   }
 
-  public void validateChillModeExitRules(EventPublisher eventQueue) {
-    for (ChillModeExitRule exitRule : exitRules.values()) {
-      if (!exitRule.validate()) {
-        return;
-      }
+
+  public synchronized void validateChillModeExitRules(String ruleName,
+      EventPublisher eventQueue) {
+
+    if (exitRules.get(ruleName) != null) {
+      validatedRules.add(ruleName);
+    } else {
+      // This should never happen
+      LOG.error("No Such Exit rule {}", ruleName);
     }
-    exitChillMode(eventQueue);
+
+
+    if (validatedRules.size() == exitRules.size()) {
+      // All rules are satisfied, we can exit chill mode.
+      LOG.info("ScmChillModeManager, all rules are successfully validated");
+      exitChillMode(eventQueue);
+    }
+
   }
 
   /**
@@ -140,9 +181,6 @@ public class SCMChillModeManager {
     // TODO: Remove handler registration as there is no need to listen to
     // register events anymore.
 
-    for (ChillModeExitRule e : exitRules.values()) {
-      e.cleanup();
-    }
     emitChillModeStatus();
     // TODO: #CLUTIL if we reenter chill mode the fixed interval pipeline
     // creation job needs to stop
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestSCMChillModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestSCMChillModeManager.java
index 01b78f2..fda3e8e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestSCMChillModeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestSCMChillModeManager.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.chillmode;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -37,12 +38,17 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.*;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
+import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.rules.Timeout;
 import org.mockito.Mockito;
 
@@ -56,7 +62,10 @@ public class TestSCMChillModeManager {
   private List<ContainerInfo> containers;
 
   @Rule
-  public Timeout timeout = new Timeout(1000 * 35);
+  public Timeout timeout = new Timeout(1000 * 300);
+
+  @Rule
+  public final TemporaryFolder tempDir = new TemporaryFolder();
 
   @BeforeClass
   public static void setUp() {
@@ -124,6 +133,185 @@ public class TestSCMChillModeManager {
     }, 100, 1000 * 5);
   }
 
+
+  private OzoneConfiguration createConf(double healthyPercent,
+      double oneReplicaPercent) throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+        tempDir.newFolder().toString());
+    conf.setBoolean(
+        HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK,
+        true);
+    conf.setDouble(HddsConfigKeys.
+        HDDS_SCM_CHILLMODE_HEALTHY_PIPELINE_THRESHOLD_PCT, healthyPercent);
+    conf.setDouble(HddsConfigKeys.
+        HDDS_SCM_CHILLMODE_ONE_NODE_REPORTED_PIPELINE_PCT, oneReplicaPercent);
+
+    return conf;
+  }
+
+  @Test
+  public void testChillModeExitRuleWithPipelineAvailabilityCheck()
+      throws Exception{
+    testChillModeExitRuleWithPipelineAvailabilityCheck(100, 30, 8, 0.90, 1);
+    testChillModeExitRuleWithPipelineAvailabilityCheck(100, 90, 22, 0.10, 0.9);
+    testChillModeExitRuleWithPipelineAvailabilityCheck(100, 30, 8, 0, 0.9);
+    testChillModeExitRuleWithPipelineAvailabilityCheck(100, 90, 22, 0, 0);
+    testChillModeExitRuleWithPipelineAvailabilityCheck(100, 90, 22, 0, 0.5);
+  }
+
+  @Test
+  public void testFailWithIncorrectValueForHealthyPipelinePercent()
+      throws Exception {
+    try {
+      OzoneConfiguration conf = createConf(100,
+          0.9);
+      MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
+      PipelineManager pipelineManager = new SCMPipelineManager(conf,
+          mockNodeManager, queue);
+      scmChillModeManager = new SCMChillModeManager(
+          conf, containers, pipelineManager, queue);
+      fail("testFailWithIncorrectValueForHealthyPipelinePercent");
+    } catch (IllegalArgumentException ex) {
+      GenericTestUtils.assertExceptionContains("value should be >= 0.0 and <=" +
+          " 1.0", ex);
+    }
+  }
+
+  @Test
+  public void testFailWithIncorrectValueForOneReplicaPipelinePercent()
+      throws Exception {
+    try {
+      OzoneConfiguration conf = createConf(0.9,
+          200);
+      MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
+      PipelineManager pipelineManager = new SCMPipelineManager(conf,
+          mockNodeManager, queue);
+      scmChillModeManager = new SCMChillModeManager(
+          conf, containers, pipelineManager, queue);
+      fail("testFailWithIncorrectValueForOneReplicaPipelinePercent");
+    } catch (IllegalArgumentException ex) {
+      GenericTestUtils.assertExceptionContains("value should be >= 0.0 and <=" +
+          " 1.0", ex);
+    }
+  }
+
+  @Test
+  public void testFailWithIncorrectValueForChillModePercent() throws Exception {
+    try {
+      OzoneConfiguration conf = createConf(0.9, 0.1);
+      conf.setDouble(HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT, -1.0);
+      MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
+      PipelineManager pipelineManager = new SCMPipelineManager(conf,
+          mockNodeManager, queue);
+      scmChillModeManager = new SCMChillModeManager(
+          conf, containers, pipelineManager, queue);
+      fail("testFailWithIncorrectValueForChillModePercent");
+    } catch (IllegalArgumentException ex) {
+      GenericTestUtils.assertExceptionContains("value should be >= 0.0 and <=" +
+          " 1.0", ex);
+    }
+  }
+
+
+  public void testChillModeExitRuleWithPipelineAvailabilityCheck(
+      int containerCount, int nodeCount, int pipelineCount,
+      double healthyPipelinePercent, double oneReplicaPercent)
+      throws Exception {
+
+    OzoneConfiguration conf = createConf(healthyPipelinePercent,
+        oneReplicaPercent);
+
+    containers = new ArrayList<>();
+    containers.addAll(HddsTestUtils.getContainerInfo(containerCount));
+
+    MockNodeManager mockNodeManager = new MockNodeManager(true, nodeCount);
+    SCMPipelineManager pipelineManager = new SCMPipelineManager(conf,
+        mockNodeManager, queue);
+    PipelineProvider mockRatisProvider =
+        new MockRatisPipelineProvider(mockNodeManager,
+            pipelineManager.getStateManager(), config);
+    pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+        mockRatisProvider);
+
+
+    for (int i=0; i < pipelineCount; i++) {
+      pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
+          HddsProtos.ReplicationFactor.THREE);
+    }
+
+    for (ContainerInfo container : containers) {
+      container.setState(HddsProtos.LifeCycleState.CLOSED);
+    }
+
+    scmChillModeManager = new SCMChillModeManager(conf, containers,
+        pipelineManager, queue);
+
+    assertTrue(scmChillModeManager.getInChillMode());
+
+    testContainerThreshold(containers, 1.0);
+
+    List<Pipeline> pipelines = pipelineManager.getPipelines();
+
+    int healthyPipelineThresholdCount =
+        scmChillModeManager.getHealthyPipelineChillModeRule()
+            .getHealthyPipelineThresholdCount();
+    int oneReplicaThresholdCount =
+        scmChillModeManager.getOneReplicaPipelineChillModeRule()
+            .getThresholdCount();
+
+    // Because even if no pipelines are there, and threshold we set to zero,
+    // we shall a get an event when datanode is registered. In that case,
+    // validate will return true, and add this to validatedRules.
+    if (Math.max(healthyPipelinePercent, oneReplicaThresholdCount) == 0) {
+      firePipelineEvent(pipelines.get(0));
+    }
+
+    for (int i = 0; i < Math.max(healthyPipelineThresholdCount,
+        oneReplicaThresholdCount); i++) {
+      firePipelineEvent(pipelines.get(i));
+
+      if (i < healthyPipelineThresholdCount) {
+        checkHealthy(i + 1);
+      }
+
+      if (i < oneReplicaThresholdCount) {
+        checkOpen(i + 1);
+      }
+    }
+
+
+    GenericTestUtils.waitFor(() -> {
+      return !scmChillModeManager.getInChillMode();
+    }, 100, 1000 * 5);
+  }
+
+  private void checkHealthy(int expectedCount) throws Exception{
+    GenericTestUtils.waitFor(() -> scmChillModeManager
+            .getHealthyPipelineChillModeRule()
+            .getCurrentHealthyPipelineCount() == expectedCount,
+        100,  5000);
+  }
+
+  private void checkOpen(int expectedCount) throws Exception {
+    GenericTestUtils.waitFor(() -> scmChillModeManager
+            .getOneReplicaPipelineChillModeRule()
+            .getCurrentReportedPipelineCount() == expectedCount,
+        1000,  5000);
+  }
+
+  private void firePipelineEvent(Pipeline pipeline) throws Exception {
+    PipelineReportsProto.Builder reportBuilder =
+        PipelineReportsProto.newBuilder();
+
+    reportBuilder.addPipelineReport(PipelineReport.newBuilder()
+        .setPipelineID(pipeline.getId().getProtobuf()));
+    queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
+        new PipelineReportFromDatanode(pipeline.getNodes().get(0),
+            reportBuilder.build()));
+  }
+
+
   @Test
   public void testDisableChillMode() {
     OzoneConfiguration conf = new OzoneConfiguration(config);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org