You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2019/03/25 20:11:09 UTC
[hadoop] branch trunk updated: HDDS-1217. Refactor ChillMode rules
and chillmode manager. (#558)
This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8739693 HDDS-1217. Refactor ChillMode rules and chillmode manager. (#558)
8739693 is described below
commit 8739693514ac92c33b38e472c37b7dcf4febe73f
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Mon Mar 25 13:11:03 2019 -0700
HDDS-1217. Refactor ChillMode rules and chillmode manager. (#558)
---
.../hdds/scm/chillmode/ChillModeExitRule.java | 88 +++++++++-
.../hdds/scm/chillmode/ContainerChillModeRule.java | 70 ++++----
.../hdds/scm/chillmode/DataNodeChillModeRule.java | 60 +++----
.../chillmode/HealthyPipelineChillModeRule.java | 114 +++++++------
.../chillmode/OneReplicaPipelineChillModeRule.java | 83 +++++----
.../hdds/scm/chillmode/SCMChillModeManager.java | 92 +++++++---
.../scm/chillmode/TestSCMChillModeManager.java | 190 ++++++++++++++++++++-
7 files changed, 497 insertions(+), 200 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ChillModeExitRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ChillModeExitRule.java
index d283dfe..0c9b823 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ChillModeExitRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ChillModeExitRule.java
@@ -17,16 +17,94 @@
*/
package org.apache.hadoop.hdds.scm.chillmode;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.server.events.TypedEvent;
+
/**
- * Interface for defining chill mode exit rules.
+ * Abstract class for ChillModeExitRules. When a new rule is added, the new
+ * rule should extend this abstract class.
+ *
+ * Each rule Should do:
+ * 1. Should add a handler for the event it is looking for during the
+ * initialization of the rule.
+ * 2. Add the rule in ScmChillModeManager to list of the rules.
+ *
*
* @param <T>
*/
-public interface ChillModeExitRule<T> {
+public abstract class ChillModeExitRule<T> implements EventHandler<T> {
+
+ private final SCMChillModeManager chillModeManager;
+ private final String ruleName;
+
+ public ChillModeExitRule(SCMChillModeManager chillModeManager,
+ String ruleName, EventQueue eventQueue) {
+ this.chillModeManager = chillModeManager;
+ this.ruleName = ruleName;
+ eventQueue.addHandler(getEventType(), this);
+ }
+
+ /**
+ * Return's the name of this ChillModeExit Rule.
+ * @return ruleName
+ */
+ public String getRuleName() {
+ return ruleName;
+ }
+
+ /**
+ * Return's the event type this chillMode exit rule handles.
+ * @return TypedEvent
+ */
+ protected abstract TypedEvent<T> getEventType();
+
+ /**
+ * Validate's this rule. If this rule condition is met, returns true, else
+ * returns false.
+ * @return boolean
+ */
+ protected abstract boolean validate();
+
+ /**
+ * Actual processing logic for this rule.
+ * @param report
+ */
+ protected abstract void process(T report);
+
+ /**
+ * Cleanup action's need to be done, once this rule is satisfied.
+ */
+ protected abstract void cleanup();
+
+ @Override
+ public final void onMessage(T report, EventPublisher publisher) {
+
+ // TODO: when we have remove handlers, we can remove getInChillmode check
+
+ if (scmInChillMode()) {
+ if (validate()) {
+ chillModeManager.validateChillModeExitRules(ruleName, publisher);
+ cleanup();
+ return;
+ }
+
+ process(report);
- boolean validate();
+ if (validate()) {
+ chillModeManager.validateChillModeExitRules(ruleName, publisher);
+ cleanup();
+ }
+ }
+ }
- void process(T report);
+ /**
+ * Return true if SCM is in chill mode, else false.
+ * @return boolean
+ */
+ protected boolean scmInChillMode() {
+ return chillModeManager.getInChillMode();
+ }
- void cleanup();
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ContainerChillModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ContainerChillModeRule.java
index 17dd496..cd08786 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ContainerChillModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ContainerChillModeRule.java
@@ -22,22 +22,24 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer.NodeRegistrationContainerReport;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
+ .NodeRegistrationContainerReport;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.server.events.TypedEvent;
/**
* Class defining Chill mode exit criteria for Containers.
*/
-public class ContainerChillModeRule implements
- ChillModeExitRule<NodeRegistrationContainerReport>,
- EventHandler<NodeRegistrationContainerReport> {
+public class ContainerChillModeRule extends
+ ChillModeExitRule<NodeRegistrationContainerReport>{
// Required cutoff % for containers with at least 1 reported replica.
private double chillModeCutoff;
@@ -46,14 +48,20 @@ public class ContainerChillModeRule implements
private double maxContainer;
private AtomicLong containerWithMinReplicas = new AtomicLong(0);
- private final SCMChillModeManager chillModeManager;
- public ContainerChillModeRule(Configuration conf,
+ public ContainerChillModeRule(String ruleName, EventQueue eventQueue,
+ Configuration conf,
List<ContainerInfo> containers, SCMChillModeManager manager) {
+ super(manager, ruleName, eventQueue);
chillModeCutoff = conf.getDouble(
HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT,
HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT);
- chillModeManager = manager;
+
+ Preconditions.checkArgument(
+ (chillModeCutoff >= 0.0 && chillModeCutoff <= 1.0),
+ HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT +
+ " value should be >= 0.0 and <= 1.0");
+
containerMap = new ConcurrentHashMap<>();
if(containers != null) {
containers.forEach(c -> {
@@ -67,10 +75,18 @@ public class ContainerChillModeRule implements
});
maxContainer = containerMap.size();
}
+
+ }
+
+
+ @Override
+ protected TypedEvent<NodeRegistrationContainerReport> getEventType() {
+ return SCMEvents.NODE_REGISTRATION_CONT_REPORT;
}
+
@Override
- public boolean validate() {
+ protected boolean validate() {
return getCurrentContainerThreshold() >= chillModeCutoff;
}
@@ -83,7 +99,7 @@ public class ContainerChillModeRule implements
}
@Override
- public void process(NodeRegistrationContainerReport reportsProto) {
+ protected void process(NodeRegistrationContainerReport reportsProto) {
reportsProto.getReport().getReportsList().forEach(c -> {
if (containerMap.containsKey(c.getContainerID())) {
@@ -92,37 +108,17 @@ public class ContainerChillModeRule implements
}
}
});
- }
-
- @Override
- public void onMessage(NodeRegistrationContainerReport
- nodeRegistrationContainerReport, EventPublisher publisher) {
-
- // TODO: when we have remove handlers, we can remove getInChillmode check
-
- if (chillModeManager.getInChillMode()) {
- if (validate()) {
- return;
- }
-
- process(nodeRegistrationContainerReport);
- if (chillModeManager.getInChillMode()) {
- SCMChillModeManager.getLogger().info(
- "SCM in chill mode. {} % containers have at least one"
- + " reported replica.",
- (containerWithMinReplicas.get() / maxContainer) * 100);
- }
-
- if (validate()) {
- chillModeManager.validateChillModeExitRules(publisher);
- }
+ if (scmInChillMode()) {
+ SCMChillModeManager.getLogger().info(
+ "SCM in chill mode. {} % containers have at least one"
+ + " reported replica.",
+ (containerWithMinReplicas.doubleValue() / maxContainer) * 100);
}
-
}
@Override
- public void cleanup() {
+ protected void cleanup() {
containerMap.clear();
}
}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/DataNodeChillModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/DataNodeChillModeRule.java
index be99962..aae2160 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/DataNodeChillModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/DataNodeChillModeRule.java
@@ -22,19 +22,18 @@ import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer.NodeRegistrationContainerReport;
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.server.events.TypedEvent;
/**
* Class defining Chill mode exit criteria according to number of DataNodes
* registered with SCM.
*/
-public class DataNodeChillModeRule implements
- ChillModeExitRule<NodeRegistrationContainerReport>,
- EventHandler<NodeRegistrationContainerReport> {
+public class DataNodeChillModeRule extends
+ ChillModeExitRule<NodeRegistrationContainerReport>{
// Min DataNodes required to exit chill mode.
private int requiredDns;
@@ -42,61 +41,42 @@ public class DataNodeChillModeRule implements
// Set to track registered DataNodes.
private HashSet<UUID> registeredDnSet;
- private final SCMChillModeManager chillModeManager;
-
- public DataNodeChillModeRule(Configuration conf,
+ public DataNodeChillModeRule(String ruleName, EventQueue eventQueue,
+ Configuration conf,
SCMChillModeManager manager) {
+ super(manager, ruleName, eventQueue);
requiredDns = conf.getInt(
HddsConfigKeys.HDDS_SCM_CHILLMODE_MIN_DATANODE,
HddsConfigKeys.HDDS_SCM_CHILLMODE_MIN_DATANODE_DEFAULT);
registeredDnSet = new HashSet<>(requiredDns * 2);
- chillModeManager = manager;
}
@Override
- public boolean validate() {
- return registeredDns >= requiredDns;
+ protected TypedEvent<NodeRegistrationContainerReport> getEventType() {
+ return SCMEvents.NODE_REGISTRATION_CONT_REPORT;
}
- @VisibleForTesting
- public double getRegisteredDataNodes() {
- return registeredDns;
+ @Override
+ protected boolean validate() {
+ return registeredDns >= requiredDns;
}
@Override
- public void process(NodeRegistrationContainerReport reportsProto) {
+ protected void process(NodeRegistrationContainerReport reportsProto) {
registeredDnSet.add(reportsProto.getDatanodeDetails().getUuid());
registeredDns = registeredDnSet.size();
- }
-
- @Override
- public void onMessage(NodeRegistrationContainerReport
- nodeRegistrationContainerReport, EventPublisher publisher) {
- // TODO: when we have remove handlers, we can remove getInChillmode check
-
- if (chillModeManager.getInChillMode()) {
- if (validate()) {
- return;
- }
-
- process(nodeRegistrationContainerReport);
-
- if (chillModeManager.getInChillMode()) {
- SCMChillModeManager.getLogger().info(
- "SCM in chill mode. {} DataNodes registered, {} required.",
- registeredDns, requiredDns);
- }
-
- if (validate()) {
- chillModeManager.validateChillModeExitRules(publisher);
- }
+ if (scmInChillMode()) {
+ SCMChillModeManager.getLogger().info(
+ "SCM in chill mode. {} DataNodes registered, {} required.",
+ registeredDns, requiredDns);
}
+
}
@Override
- public void cleanup() {
+ protected void cleanup() {
registeredDnSet.clear();
}
}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/HealthyPipelineChillModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/HealthyPipelineChillModeRule.java
index 3f475b8..e0a9a40 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/HealthyPipelineChillModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/HealthyPipelineChillModeRule.java
@@ -17,21 +17,24 @@
*/
package org.apache.hadoop.hdds.scm.chillmode;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
+
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.server.events.TypedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,28 +49,33 @@ import java.util.Set;
* through in a cluster.
*/
public class HealthyPipelineChillModeRule
- implements ChillModeExitRule<PipelineReportFromDatanode>,
- EventHandler<PipelineReportFromDatanode> {
+ extends ChillModeExitRule<PipelineReportFromDatanode>{
public static final Logger LOG =
LoggerFactory.getLogger(HealthyPipelineChillModeRule.class);
private final PipelineManager pipelineManager;
- private final SCMChillModeManager chillModeManager;
private final int healthyPipelineThresholdCount;
private int currentHealthyPipelineCount = 0;
private final Set<DatanodeDetails> processedDatanodeDetails =
new HashSet<>();
- HealthyPipelineChillModeRule(PipelineManager pipelineManager,
+ HealthyPipelineChillModeRule(String ruleName, EventQueue eventQueue,
+ PipelineManager pipelineManager,
SCMChillModeManager manager, Configuration configuration) {
+ super(manager, ruleName, eventQueue);
this.pipelineManager = pipelineManager;
- this.chillModeManager = manager;
double healthyPipelinesPercent =
configuration.getDouble(HddsConfigKeys.
HDDS_SCM_CHILLMODE_HEALTHY_PIPELINE_THRESHOLD_PCT,
HddsConfigKeys.
HDDS_SCM_CHILLMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT);
+ Preconditions.checkArgument(
+ (healthyPipelinesPercent >= 0.0 && healthyPipelinesPercent <= 1.0),
+ HddsConfigKeys.
+ HDDS_SCM_CHILLMODE_HEALTHY_PIPELINE_THRESHOLD_PCT
+ + " value should be >= 0.0 and <= 1.0");
+
// As we want to wait for 3 node pipelines
int pipelineCount =
pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
@@ -84,67 +92,53 @@ public class HealthyPipelineChillModeRule
}
@Override
- public boolean validate() {
- if (currentHealthyPipelineCount >= healthyPipelineThresholdCount) {
- return true;
- }
- return false;
+ protected TypedEvent<PipelineReportFromDatanode> getEventType() {
+ return SCMEvents.PROCESSED_PIPELINE_REPORT;
}
@Override
- public void process(PipelineReportFromDatanode pipelineReportFromDatanode) {
- Pipeline pipeline;
- Preconditions.checkNotNull(pipelineReportFromDatanode);
- PipelineReportsProto pipelineReport =
- pipelineReportFromDatanode.getReport();
-
- for (PipelineReport report : pipelineReport.getPipelineReportList()) {
- PipelineID pipelineID = PipelineID
- .getFromProtobuf(report.getPipelineID());
- try {
- pipeline = pipelineManager.getPipeline(pipelineID);
- } catch (PipelineNotFoundException e) {
- continue;
- }
-
- if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
- pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
- // If the pipeline is open state mean, all 3 datanodes are reported
- // for this pipeline.
- currentHealthyPipelineCount++;
- }
+ protected boolean validate() {
+ if (currentHealthyPipelineCount >= healthyPipelineThresholdCount) {
+ return true;
}
+ return false;
}
@Override
- public void cleanup() {
- // No need to deal with
- }
-
- @Override
- public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
- EventPublisher publisher) {
- // If we have already reached healthy pipeline threshold, skip processing
- // pipeline report from datanode.
-
- if (validate()) {
- chillModeManager.validateChillModeExitRules(publisher);
- return;
- }
-
+ protected void process(PipelineReportFromDatanode
+ pipelineReportFromDatanode) {
// When SCM is in chill mode for long time, already registered
// datanode can send pipeline report again, then pipeline handler fires
// processed report event, we should not consider this pipeline report
// from datanode again during threshold calculation.
+ Preconditions.checkNotNull(pipelineReportFromDatanode);
DatanodeDetails dnDetails = pipelineReportFromDatanode.getDatanodeDetails();
if (!processedDatanodeDetails.contains(
pipelineReportFromDatanode.getDatanodeDetails())) {
- // Process pipeline report from datanode
- process(pipelineReportFromDatanode);
+ Pipeline pipeline;
+ PipelineReportsProto pipelineReport =
+ pipelineReportFromDatanode.getReport();
+
+ for (PipelineReport report : pipelineReport.getPipelineReportList()) {
+ PipelineID pipelineID = PipelineID
+ .getFromProtobuf(report.getPipelineID());
+ try {
+ pipeline = pipelineManager.getPipeline(pipelineID);
+ } catch (PipelineNotFoundException e) {
+ continue;
+ }
+
+ if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
+ pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
+ // If the pipeline is open state mean, all 3 datanodes are reported
+ // for this pipeline.
+ currentHealthyPipelineCount++;
+ }
+ }
- if (chillModeManager.getInChillMode()) {
+ if (scmInChillMode()) {
SCMChillModeManager.getLogger().info(
"SCM in chill mode. Healthy pipelines reported count is {}, " +
"required healthy pipeline reported count is {}",
@@ -154,8 +148,20 @@ public class HealthyPipelineChillModeRule
processedDatanodeDetails.add(dnDetails);
}
- if (validate()) {
- chillModeManager.validateChillModeExitRules(publisher);
- }
+ }
+
+ @Override
+ protected void cleanup() {
+ processedDatanodeDetails.clear();
+ }
+
+ @VisibleForTesting
+ public int getCurrentHealthyPipelineCount() {
+ return currentHealthyPipelineCount;
+ }
+
+ @VisibleForTesting
+ public int getHealthyPipelineThresholdCount() {
+ return healthyPipelineThresholdCount;
}
}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/OneReplicaPipelineChillModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/OneReplicaPipelineChillModeRule.java
index 20b35b8..01db476 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/OneReplicaPipelineChillModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/OneReplicaPipelineChillModeRule.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.scm.chillmode;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
@@ -25,14 +26,15 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.
PipelineReportFromDatanode;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.server.events.TypedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,13 +42,12 @@ import java.util.HashSet;
import java.util.Set;
/**
- * This rule covers whether we have atleast one datanode is reported for each
+ * This rule covers whether we have at least one datanode is reported for each
* pipeline. This rule is for all open containers, we have at least one
* replica available for read when we exit chill mode.
*/
-public class OneReplicaPipelineChillModeRule implements
- ChillModeExitRule<PipelineReportFromDatanode>,
- EventHandler<PipelineReportFromDatanode> {
+public class OneReplicaPipelineChillModeRule extends
+ ChillModeExitRule<PipelineReportFromDatanode> {
private static final Logger LOG =
LoggerFactory.getLogger(OneReplicaPipelineChillModeRule.class);
@@ -54,12 +55,13 @@ public class OneReplicaPipelineChillModeRule implements
private int thresholdCount;
private Set<PipelineID> reportedPipelineIDSet = new HashSet<>();
private final PipelineManager pipelineManager;
- private final SCMChillModeManager chillModeManager;
+ private int currentReportedPipelineCount = 0;
- public OneReplicaPipelineChillModeRule(PipelineManager pipelineManager,
- SCMChillModeManager chillModeManager,
- Configuration configuration) {
- this.chillModeManager = chillModeManager;
+
+ public OneReplicaPipelineChillModeRule(String ruleName, EventQueue eventQueue,
+ PipelineManager pipelineManager,
+ SCMChillModeManager chillModeManager, Configuration configuration) {
+ super(chillModeManager, ruleName, eventQueue);
this.pipelineManager = pipelineManager;
double percent =
@@ -68,6 +70,11 @@ public class OneReplicaPipelineChillModeRule implements
HddsConfigKeys.
HDDS_SCM_CHILLMODE_ONE_NODE_REPORTED_PIPELINE_PCT_DEFAULT);
+ Preconditions.checkArgument((percent >= 0.0 && percent <= 1.0),
+ HddsConfigKeys.
+ HDDS_SCM_CHILLMODE_ONE_NODE_REPORTED_PIPELINE_PCT +
+ " value should be >= 0.0 and <= 1.0");
+
int totalPipelineCount =
pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE).size();
@@ -79,16 +86,23 @@ public class OneReplicaPipelineChillModeRule implements
thresholdCount);
}
+
@Override
- public boolean validate() {
- if (reportedPipelineIDSet.size() >= thresholdCount) {
+ protected TypedEvent<PipelineReportFromDatanode> getEventType() {
+ return SCMEvents.PROCESSED_PIPELINE_REPORT;
+ }
+
+ @Override
+ protected boolean validate() {
+ if (currentReportedPipelineCount >= thresholdCount) {
return true;
}
return false;
}
@Override
- public void process(PipelineReportFromDatanode pipelineReportFromDatanode) {
+ protected void process(PipelineReportFromDatanode
+ pipelineReportFromDatanode) {
Pipeline pipeline;
Preconditions.checkNotNull(pipelineReportFromDatanode);
PipelineReportsProto pipelineReport =
@@ -108,35 +122,32 @@ public class OneReplicaPipelineChillModeRule implements
reportedPipelineIDSet.add(pipelineID);
}
}
- }
- @Override
- public void cleanup() {
- reportedPipelineIDSet.clear();
- }
+ currentReportedPipelineCount = reportedPipelineIDSet.size();
- @Override
- public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
- EventPublisher publisher) {
-
- if (validate()) {
- chillModeManager.validateChillModeExitRules(publisher);
- return;
- }
-
- // Process pipeline report from datanode
- process(pipelineReportFromDatanode);
-
- if (chillModeManager.getInChillMode()) {
+ if (scmInChillMode()) {
SCMChillModeManager.getLogger().info(
"SCM in chill mode. Pipelines with atleast one datanode reported " +
"count is {}, required atleast one datanode reported per " +
"pipeline count is {}",
- reportedPipelineIDSet.size(), thresholdCount);
+ currentReportedPipelineCount, thresholdCount);
}
- if (validate()) {
- chillModeManager.validateChillModeExitRules(publisher);
- }
}
+
+ @Override
+ protected void cleanup() {
+ reportedPipelineIDSet.clear();
+ }
+
+ @VisibleForTesting
+ public int getThresholdCount() {
+ return thresholdCount;
+ }
+
+ @VisibleForTesting
+ public int getCurrentReportedPipelineCount() {
+ return currentReportedPipelineCount;
+ }
+
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java
index 1c7c881..194990d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java
@@ -18,9 +18,12 @@
package org.apache.hadoop.hdds.scm.chillmode;
import com.google.common.annotations.VisibleForTesting;
+
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
@@ -39,10 +42,38 @@ import org.slf4j.LoggerFactory;
*
* ChillModeExitRule defines format to define new rules which must be satisfied
* to exit Chill mode.
- * ContainerChillModeRule defines the only exit criteria right now.
- * On every new datanode registration event this class adds replicas
- * for reported containers and validates if cutoff threshold for
- * containers is meet.
+ *
+ * Current ChillMode rules:
+ * 1. ContainerChillModeRule:
+ * On every new datanode registration, it fires
+ * {@link SCMEvents#NODE_REGISTRATION_CONT_REPORT}. This rule handles this
+ * event. This rule process this report, increment the
+ * containerWithMinReplicas count when this reported replica is in the
+ * containerMap. Then validates if cutoff threshold for containers is meet.
+ *
+ * 2. DatanodeChillModeRule:
+ * On every new datanode registration, it fires
+ * {@link SCMEvents#NODE_REGISTRATION_CONT_REPORT}. This rule handles this
+ * event. This rule process this report, and check if this is new node, add
+ * to its reported node list. Then validate it cutoff threshold for minimum
+ * number of datanode registered is met or not.
+ *
+ * 3. HealthyPipelineChillModeRule:
+ * Once the pipelineReportHandler processes the
+ * {@link SCMEvents#PIPELINE_REPORT}, it fires
+ * {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this
+ * event. This rule processes this report, and check if pipeline is healthy
+ * and increments current healthy pipeline count. Then validate it cutoff
+ * threshold for healthy pipeline is met or not.
+ *
+ * 4. OneReplicaPipelineChillModeRule:
+ * Once the pipelineReportHandler processes the
+ * {@link SCMEvents#PIPELINE_REPORT}, it fires
+ * {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this
+ * event. This rule processes this report, and add the reported pipeline to
+ * reported pipeline set. Then validate it cutoff threshold for one replica
+ * per pipeline is met or not.
+ *
*/
public class SCMChillModeManager {
@@ -60,6 +91,8 @@ public class SCMChillModeManager {
private static final String ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE =
"AtleastOneDatanodeReportedRule";
+ private Set<String> validatedRules = new HashSet<>();
+
private final EventQueue eventPublisher;
private final PipelineManager pipelineManager;
@@ -75,30 +108,27 @@ public class SCMChillModeManager {
if (isChillModeEnabled) {
ContainerChillModeRule containerChillModeRule =
- new ContainerChillModeRule(config, allContainers, this);
+ new ContainerChillModeRule(CONT_EXIT_RULE, eventQueue, config,
+ allContainers, this);
DataNodeChillModeRule dataNodeChillModeRule =
- new DataNodeChillModeRule(config, this);
+ new DataNodeChillModeRule(DN_EXIT_RULE, eventQueue, config, this);
exitRules.put(CONT_EXIT_RULE, containerChillModeRule);
exitRules.put(DN_EXIT_RULE, dataNodeChillModeRule);
- eventPublisher.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
- containerChillModeRule);
- eventPublisher.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
- dataNodeChillModeRule);
-
if (conf.getBoolean(
HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK,
HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT)
&& pipelineManager != null) {
- HealthyPipelineChillModeRule rule = new HealthyPipelineChillModeRule(
- pipelineManager, this, config);
+ HealthyPipelineChillModeRule healthyPipelineChillModeRule =
+ new HealthyPipelineChillModeRule(HEALTHY_PIPELINE_EXIT_RULE,
+ eventQueue, pipelineManager,
+ this, config);
OneReplicaPipelineChillModeRule oneReplicaPipelineChillModeRule =
- new OneReplicaPipelineChillModeRule(pipelineManager, this, conf);
- exitRules.put(HEALTHY_PIPELINE_EXIT_RULE, rule);
+ new OneReplicaPipelineChillModeRule(
+ ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE, eventQueue,
+ pipelineManager, this, conf);
+ exitRules.put(HEALTHY_PIPELINE_EXIT_RULE, healthyPipelineChillModeRule);
exitRules.put(ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE,
oneReplicaPipelineChillModeRule);
- eventPublisher.addHandler(SCMEvents.PROCESSED_PIPELINE_REPORT, rule);
- eventPublisher.addHandler(SCMEvents.PROCESSED_PIPELINE_REPORT,
- oneReplicaPipelineChillModeRule);
}
emitChillModeStatus();
} else {
@@ -115,13 +145,24 @@ public class SCMChillModeManager {
new ChillModeStatus(getInChillMode()));
}
- public void validateChillModeExitRules(EventPublisher eventQueue) {
- for (ChillModeExitRule exitRule : exitRules.values()) {
- if (!exitRule.validate()) {
- return;
- }
+
+ public synchronized void validateChillModeExitRules(String ruleName,
+ EventPublisher eventQueue) {
+
+ if (exitRules.get(ruleName) != null) {
+ validatedRules.add(ruleName);
+ } else {
+ // This should never happen
+ LOG.error("No Such Exit rule {}", ruleName);
}
- exitChillMode(eventQueue);
+
+
+ if (validatedRules.size() == exitRules.size()) {
+ // All rules are satisfied, we can exit chill mode.
+ LOG.info("ScmChillModeManager, all rules are successfully validated");
+ exitChillMode(eventQueue);
+ }
+
}
/**
@@ -140,9 +181,6 @@ public class SCMChillModeManager {
// TODO: Remove handler registration as there is no need to listen to
// register events anymore.
- for (ChillModeExitRule e : exitRules.values()) {
- e.cleanup();
- }
emitChillModeStatus();
// TODO: #CLUTIL if we reenter chill mode the fixed interval pipeline
// creation job needs to stop
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestSCMChillModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestSCMChillModeManager.java
index 01b78f2..fda3e8e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestSCMChillModeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestSCMChillModeManager.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.chillmode;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.File;
import java.util.ArrayList;
@@ -37,12 +38,17 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.pipeline.*;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
+import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
@@ -56,7 +62,10 @@ public class TestSCMChillModeManager {
private List<ContainerInfo> containers;
@Rule
- public Timeout timeout = new Timeout(1000 * 35);
+ public Timeout timeout = new Timeout(1000 * 300);
+
+ @Rule
+ public final TemporaryFolder tempDir = new TemporaryFolder();
@BeforeClass
public static void setUp() {
@@ -124,6 +133,185 @@ public class TestSCMChillModeManager {
}, 100, 1000 * 5);
}
+
+ private OzoneConfiguration createConf(double healthyPercent,
+ double oneReplicaPercent) throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+ tempDir.newFolder().toString());
+ conf.setBoolean(
+ HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK,
+ true);
+ conf.setDouble(HddsConfigKeys.
+ HDDS_SCM_CHILLMODE_HEALTHY_PIPELINE_THRESHOLD_PCT, healthyPercent);
+ conf.setDouble(HddsConfigKeys.
+ HDDS_SCM_CHILLMODE_ONE_NODE_REPORTED_PIPELINE_PCT, oneReplicaPercent);
+
+ return conf;
+ }
+
+ @Test
+ public void testChillModeExitRuleWithPipelineAvailabilityCheck()
+ throws Exception{
+ testChillModeExitRuleWithPipelineAvailabilityCheck(100, 30, 8, 0.90, 1);
+ testChillModeExitRuleWithPipelineAvailabilityCheck(100, 90, 22, 0.10, 0.9);
+ testChillModeExitRuleWithPipelineAvailabilityCheck(100, 30, 8, 0, 0.9);
+ testChillModeExitRuleWithPipelineAvailabilityCheck(100, 90, 22, 0, 0);
+ testChillModeExitRuleWithPipelineAvailabilityCheck(100, 90, 22, 0, 0.5);
+ }
+
+ @Test
+ public void testFailWithIncorrectValueForHealthyPipelinePercent()
+ throws Exception {
+ try {
+ OzoneConfiguration conf = createConf(100,
+ 0.9);
+ MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
+ PipelineManager pipelineManager = new SCMPipelineManager(conf,
+ mockNodeManager, queue);
+ scmChillModeManager = new SCMChillModeManager(
+ conf, containers, pipelineManager, queue);
+ fail("testFailWithIncorrectValueForHealthyPipelinePercent");
+ } catch (IllegalArgumentException ex) {
+ GenericTestUtils.assertExceptionContains("value should be >= 0.0 and <=" +
+ " 1.0", ex);
+ }
+ }
+
+ @Test
+ public void testFailWithIncorrectValueForOneReplicaPipelinePercent()
+ throws Exception {
+ try {
+ OzoneConfiguration conf = createConf(0.9,
+ 200);
+ MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
+ PipelineManager pipelineManager = new SCMPipelineManager(conf,
+ mockNodeManager, queue);
+ scmChillModeManager = new SCMChillModeManager(
+ conf, containers, pipelineManager, queue);
+ fail("testFailWithIncorrectValueForOneReplicaPipelinePercent");
+ } catch (IllegalArgumentException ex) {
+ GenericTestUtils.assertExceptionContains("value should be >= 0.0 and <=" +
+ " 1.0", ex);
+ }
+ }
+
+ @Test
+ public void testFailWithIncorrectValueForChillModePercent() throws Exception {
+ try {
+ OzoneConfiguration conf = createConf(0.9, 0.1);
+ conf.setDouble(HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT, -1.0);
+ MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
+ PipelineManager pipelineManager = new SCMPipelineManager(conf,
+ mockNodeManager, queue);
+ scmChillModeManager = new SCMChillModeManager(
+ conf, containers, pipelineManager, queue);
+ fail("testFailWithIncorrectValueForChillModePercent");
+ } catch (IllegalArgumentException ex) {
+ GenericTestUtils.assertExceptionContains("value should be >= 0.0 and <=" +
+ " 1.0", ex);
+ }
+ }
+
+
+ public void testChillModeExitRuleWithPipelineAvailabilityCheck(
+ int containerCount, int nodeCount, int pipelineCount,
+ double healthyPipelinePercent, double oneReplicaPercent)
+ throws Exception {
+
+ OzoneConfiguration conf = createConf(healthyPipelinePercent,
+ oneReplicaPercent);
+
+ containers = new ArrayList<>();
+ containers.addAll(HddsTestUtils.getContainerInfo(containerCount));
+
+ MockNodeManager mockNodeManager = new MockNodeManager(true, nodeCount);
+ SCMPipelineManager pipelineManager = new SCMPipelineManager(conf,
+ mockNodeManager, queue);
+ PipelineProvider mockRatisProvider =
+ new MockRatisPipelineProvider(mockNodeManager,
+ pipelineManager.getStateManager(), config);
+ pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+ mockRatisProvider);
+
+
+ for (int i=0; i < pipelineCount; i++) {
+ pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+ }
+
+ for (ContainerInfo container : containers) {
+ container.setState(HddsProtos.LifeCycleState.CLOSED);
+ }
+
+ scmChillModeManager = new SCMChillModeManager(conf, containers,
+ pipelineManager, queue);
+
+ assertTrue(scmChillModeManager.getInChillMode());
+
+ testContainerThreshold(containers, 1.0);
+
+ List<Pipeline> pipelines = pipelineManager.getPipelines();
+
+ int healthyPipelineThresholdCount =
+ scmChillModeManager.getHealthyPipelineChillModeRule()
+ .getHealthyPipelineThresholdCount();
+ int oneReplicaThresholdCount =
+ scmChillModeManager.getOneReplicaPipelineChillModeRule()
+ .getThresholdCount();
+
+ // Because even if no pipelines are there, and threshold we set to zero,
+ // we shall a get an event when datanode is registered. In that case,
+ // validate will return true, and add this to validatedRules.
+ if (Math.max(healthyPipelinePercent, oneReplicaThresholdCount) == 0) {
+ firePipelineEvent(pipelines.get(0));
+ }
+
+ for (int i = 0; i < Math.max(healthyPipelineThresholdCount,
+ oneReplicaThresholdCount); i++) {
+ firePipelineEvent(pipelines.get(i));
+
+ if (i < healthyPipelineThresholdCount) {
+ checkHealthy(i + 1);
+ }
+
+ if (i < oneReplicaThresholdCount) {
+ checkOpen(i + 1);
+ }
+ }
+
+
+ GenericTestUtils.waitFor(() -> {
+ return !scmChillModeManager.getInChillMode();
+ }, 100, 1000 * 5);
+ }
+
+ private void checkHealthy(int expectedCount) throws Exception{
+ GenericTestUtils.waitFor(() -> scmChillModeManager
+ .getHealthyPipelineChillModeRule()
+ .getCurrentHealthyPipelineCount() == expectedCount,
+ 100, 5000);
+ }
+
+ private void checkOpen(int expectedCount) throws Exception {
+ GenericTestUtils.waitFor(() -> scmChillModeManager
+ .getOneReplicaPipelineChillModeRule()
+ .getCurrentReportedPipelineCount() == expectedCount,
+ 1000, 5000);
+ }
+
+ private void firePipelineEvent(Pipeline pipeline) throws Exception {
+ PipelineReportsProto.Builder reportBuilder =
+ PipelineReportsProto.newBuilder();
+
+ reportBuilder.addPipelineReport(PipelineReport.newBuilder()
+ .setPipelineID(pipeline.getId().getProtobuf()));
+ queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
+ new PipelineReportFromDatanode(pipeline.getNodes().get(0),
+ reportBuilder.build()));
+ }
+
+
@Test
public void testDisableChillMode() {
OzoneConfiguration conf = new OzoneConfiguration(config);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org