You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2019/11/27 12:19:00 UTC
[hadoop-ozone] branch HDDS-1564 updated: HDDS-1572 Implement a
Pipeline scrubber to clean up non-OPEN pipeline. (#237)
This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-1564
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-1564 by this push:
new c855528 HDDS-1572 Implement a Pipeline scrubber to clean up non-OPEN pipeline. (#237)
c855528 is described below
commit c85552817fe62feceb3bb47b843b720903996ac8
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Wed Nov 27 20:18:50 2019 +0800
HDDS-1572 Implement a Pipeline scrubber to clean up non-OPEN pipeline. (#237)
---
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 7 ++++
.../apache/hadoop/hdds/scm/pipeline/Pipeline.java | 29 +++++++++++++++
hadoop-hdds/common/src/main/proto/hdds.proto | 1 +
.../common/src/main/resources/ozone-default.xml | 12 ++++++
.../scm/pipeline/BackgroundPipelineCreator.java | 7 +++-
.../hadoop/hdds/scm/pipeline/PipelineManager.java | 3 ++
.../hdds/scm/pipeline/SCMPipelineManager.java | 26 +++++++++++++
.../scm/safemode/HealthyPipelineSafeModeRule.java | 1 -
.../scm/pipeline/MockRatisPipelineProvider.java | 28 ++++++++++++++
.../scm/pipeline/TestRatisPipelineProvider.java | 11 +++++-
.../hdds/scm/pipeline/TestSCMPipelineManager.java | 43 +++++++++++++++++++++-
.../hadoop/ozone/TestContainerOperations.java | 2 -
12 files changed, 163 insertions(+), 7 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 17e09c1..4066661 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -333,6 +333,13 @@ public final class ScmConfigKeys {
OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
"ozone.scm.keyvalue.container.deletion-choosing.policy";
+ // Max timeout for pipeline to stay at ALLOCATED state before scrubbed.
+ public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT =
+ "ozone.scm.pipeline.allocated.timeout";
+
+ public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT =
+ "5m";
+
public static final String OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT =
"ozone.scm.container.creation.lease.timeout";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 47ec453..54d752f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -55,6 +55,8 @@ public final class Pipeline {
private ThreadLocal<List<DatanodeDetails>> nodesInOrder = new ThreadLocal<>();
// Current reported Leader for the pipeline
private UUID leaderId;
+ // Timestamp for pipeline upon creation
+ private Long creationTimestamp;
/**
* The immutable properties of pipeline object is used in
@@ -69,6 +71,7 @@ public final class Pipeline {
this.factor = factor;
this.state = state;
this.nodeStatus = nodeStatus;
+ this.creationTimestamp = System.currentTimeMillis();
}
/**
@@ -108,6 +111,24 @@ public final class Pipeline {
}
/**
+ * Return the creation time of pipeline.
+ *
+ * @return Creation Timestamp
+ */
+ public Long getCreationTimestamp() {
+ return creationTimestamp;
+ }
+
+ /**
+ * Set the creation timestamp. Only for protobuf now.
+ *
+ * @param creationTimestamp
+ */
+ void setCreationTimestamp(Long creationTimestamp) {
+ this.creationTimestamp = creationTimestamp;
+ }
+
+ /**
* Return the pipeline leader's UUID.
*
* @return DatanodeDetails.UUID.
@@ -196,6 +217,7 @@ public final class Pipeline {
.setFactor(factor)
.setState(PipelineState.getProtobuf(state))
.setLeaderID(leaderId != null ? leaderId.toString() : "")
+ .setCreationTimeStamp(creationTimestamp)
.addAllMembers(nodeStatus.keySet().stream()
.map(DatanodeDetails::getProtoBufMessage)
.collect(Collectors.toList()));
@@ -274,6 +296,7 @@ public final class Pipeline {
b.append(", Type:").append(getType());
b.append(", Factor:").append(getFactor());
b.append(", State:").append(getPipelineState());
+ b.append(", CreationTimestamp").append(getCreationTimestamp());
b.append("]");
return b.toString();
}
@@ -298,6 +321,7 @@ public final class Pipeline {
private List<Integer> nodeOrder = null;
private List<DatanodeDetails> nodesInOrder = null;
private UUID leaderId = null;
+ private Long creationTimestamp = null;
public Builder() {}
@@ -309,6 +333,7 @@ public final class Pipeline {
this.nodeStatus = pipeline.nodeStatus;
this.nodesInOrder = pipeline.nodesInOrder.get();
this.leaderId = pipeline.getLeaderId();
+ this.creationTimestamp = pipeline.getCreationTimestamp();
}
public Builder setId(PipelineID id1) {
@@ -355,6 +380,10 @@ public final class Pipeline {
Preconditions.checkNotNull(nodeStatus);
Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus);
pipeline.setLeaderId(leaderId);
+ // overwrite with original creationTimestamp
+ if (creationTimestamp != null) {
+ pipeline.setCreationTimestamp(creationTimestamp);
+ }
if (nodeOrder != null && !nodeOrder.isEmpty()) {
// This branch is for build from ProtoBuf
diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto
index 39a01dc..b313604 100644
--- a/hadoop-hdds/common/src/main/proto/hdds.proto
+++ b/hadoop-hdds/common/src/main/proto/hdds.proto
@@ -75,6 +75,7 @@ message Pipeline {
required PipelineID id = 5;
optional string leaderID = 6;
repeated uint32 memberOrders = 7;
+ optional uint64 creationTimeStamp = 8;
}
message KeyValue {
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 909c692..bd7dda6 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -857,6 +857,18 @@
</description>
</property>
<property>
+ <name>ozone.scm.pipeline.allocated.timeout</name>
+ <value>5m</value>
+ <tag>OZONE, SCM, PIPELINE</tag>
+ <description>
+ Timeout for every pipeline to stay in ALLOCATED stage. When pipeline is created,
+ it should be at OPEN stage once pipeline report is successfully received by SCM.
+ If a pipeline stays at ALLOCATED for too long, it should be scrubbed so that new
+ pipeline can be created. This timeout is for how long pipeline can stay at ALLOCATED
+ stage until it gets scrubbed.
+ </description>
+ </property>
+ <property>
<name>ozone.scm.container.size</name>
<value>5GB</value>
<tag>OZONE, PERFORMANCE, MANAGEMENT</tag>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index 6952f74..9bfd87a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -91,12 +91,17 @@ class BackgroundPipelineCreator {
for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
.values()) {
+ try {
+ pipelineManager.scrubPipeline(type, factor);
+ } catch (IOException e) {
+ LOG.error("Error while scrubbing pipelines {}", e);
+ }
+
while (true) {
try {
if (scheduler.isClosed()) {
break;
}
-
pipelineManager.createPipeline(type, factor);
} catch (IOException ioe) {
break;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 9ba5f31..adbe442 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -73,6 +73,9 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
throws IOException;
+ void scrubPipeline(ReplicationType type, ReplicationFactor factor)
+ throws IOException;
+
void startPipelineCreator();
void triggerPipelineCreation();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index b41c595..9fd1cd0 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -53,6 +53,7 @@ import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB;
@@ -323,6 +324,31 @@ public class SCMPipelineManager implements PipelineManager {
}
@Override
+ public void scrubPipeline(ReplicationType type, ReplicationFactor factor)
+ throws IOException{
+ if (type != ReplicationType.RATIS || factor != ReplicationFactor.THREE) {
+ // Only srub pipeline for RATIS THREE pipeline
+ return;
+ }
+ Long currentTime = System.currentTimeMillis();
+ Long pipelineScrubTimeoutInMills = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
+ ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ List<Pipeline> needToSrubPipelines = stateManager.getPipelines(type, factor,
+ Pipeline.PipelineState.ALLOCATED).stream()
+ .filter(p -> (currentTime - p.getCreationTimestamp()
+ >= pipelineScrubTimeoutInMills))
+ .collect(Collectors.toList());
+ for (Pipeline p : needToSrubPipelines) {
+ LOG.info("srubbing pipeline: id: " + p.getId().toString() +
+ " since it stays at ALLOCATED stage for " +
+ (currentTime - p.getCreationTimestamp())/60000 + " mins.");
+ finalizeAndDestroyPipeline(p, false);
+ }
+ }
+
+ @Override
public Map<String, Integer> getPipelineInfo() {
final Map<String, Integer> pipelineInfo = new HashMap<>();
for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 3b31454..4672c23 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -147,7 +147,6 @@ public class HealthyPipelineSafeModeRule
getSafeModeMetrics().incCurrentHealthyPipelinesCount();
processedPipelineIDs.add(pipelineID);
}
-
}
if (scmInSafeMode()) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
index 342ee5b..0ed3f16 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
@@ -31,10 +31,20 @@ import java.util.List;
*/
public class MockRatisPipelineProvider extends RatisPipelineProvider {
+ private boolean autoOpenPipeline;
+
+ public MockRatisPipelineProvider(NodeManager nodeManager,
+ PipelineStateManager stateManager,
+ Configuration conf, boolean autoOpen) {
+ super(nodeManager, stateManager, conf, null);
+ autoOpenPipeline = autoOpen;
+ }
+
public MockRatisPipelineProvider(NodeManager nodeManager,
PipelineStateManager stateManager,
Configuration conf) {
super(nodeManager, stateManager, conf, null);
+ autoOpenPipeline = true;
}
protected void initializePipeline(Pipeline pipeline) throws IOException {
@@ -42,6 +52,24 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
}
@Override
+ public Pipeline create(HddsProtos.ReplicationFactor factor)
+ throws IOException {
+ if (autoOpenPipeline) {
+ return super.create(factor);
+ } else {
+ Pipeline initialPipeline = super.create(factor);
+ return Pipeline.newBuilder()
+ .setId(initialPipeline.getId())
+ // overwrite pipeline state to main ALLOCATED
+ .setState(Pipeline.PipelineState.ALLOCATED)
+ .setType(initialPipeline.getType())
+ .setFactor(factor)
+ .setNodes(initialPipeline.getNodes())
+ .build();
+ }
+ }
+
+ @Override
public void shutdown() {
// Do nothing.
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 6f0425d..b64f338 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -53,9 +54,11 @@ public class TestRatisPipelineProvider {
@Before
public void init() throws Exception {
nodeManager = new MockNodeManager(true, 10);
- stateManager = new PipelineStateManager(new OzoneConfiguration());
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1);
+ stateManager = new PipelineStateManager(conf);
provider = new MockRatisPipelineProvider(nodeManager,
- stateManager, new OzoneConfiguration());
+ stateManager, conf);
}
private void createPipelineAndAssertions(
@@ -63,6 +66,7 @@ public class TestRatisPipelineProvider {
Pipeline pipeline = provider.create(factor);
assertPipelineProperties(pipeline, factor, REPLICATION_TYPE);
stateManager.addPipeline(pipeline);
+ nodeManager.addPipeline(pipeline);
Pipeline pipeline1 = provider.create(factor);
assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE);
@@ -142,6 +146,8 @@ public class TestRatisPipelineProvider {
// only 2 healthy DNs left that are not part of any pipeline
Pipeline pipeline = provider.create(factor);
assertPipelineProperties(pipeline, factor, REPLICATION_TYPE);
+ nodeManager.addPipeline(pipeline);
+ stateManager.addPipeline(pipeline);
List<DatanodeDetails> nodes = pipeline.getNodes();
@@ -176,5 +182,6 @@ public class TestRatisPipelineProvider {
.build();
stateManager.addPipeline(openPipeline);
+ nodeManager.addPipeline(openPipeline);
}
}
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 7aba39a..0ee1721 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.scm.pipeline;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
@@ -28,6 +29,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
@@ -205,7 +207,7 @@ public class TestSCMPipelineManager {
numPipelineCreateFailed = getLongCounter(
"NumPipelineCreationFailed", metrics);
Assert.assertTrue(numPipelineCreateFailed == 1);
-
+
// clean up
pipelineManager.close();
}
@@ -309,6 +311,45 @@ public class TestSCMPipelineManager {
pipelineManager.close();
}
+ @Test
+ public void testScrubPipeline() throws IOException {
+ // No timeout for pipeline scrubber.
+ conf.setTimeDuration(
+ OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
+ TimeUnit.MILLISECONDS);
+
+ final SCMPipelineManager pipelineManager =
+ new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+ final PipelineProvider ratisProvider = new MockRatisPipelineProvider(
+ nodeManager, pipelineManager.getStateManager(), conf, false);
+
+ pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+ ratisProvider);
+
+ Pipeline pipeline = pipelineManager
+ .createPipeline(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+ // At this point, pipeline is not at OPEN stage.
+ Assert.assertEquals(pipeline.getPipelineState(),
+ Pipeline.PipelineState.ALLOCATED);
+
+ // pipeline should be seen in pipelineManager as ALLOCATED.
+ Assert.assertTrue(pipelineManager
+ .getPipelines(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE,
+ Pipeline.PipelineState.ALLOCATED).contains(pipeline));
+ pipelineManager.scrubPipeline(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+
+ // pipeline should be scrubbed.
+ Assert.assertFalse(pipelineManager
+ .getPipelines(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE,
+ Pipeline.PipelineState.ALLOCATED).contains(pipeline));
+
+ pipelineManager.close();
+ }
+
private void sendPipelineReport(DatanodeDetails dn,
Pipeline pipeline, PipelineReportHandler pipelineReportHandler,
boolean isLeader, EventQueue eventQueue) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
index 6f347cf..50429cc 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.ozone;
-import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -30,7 +29,6 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE;
import static org.junit.Assert.assertEquals;
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org