You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ca...@apache.org on 2022/10/20 13:09:30 UTC
[ozone] 02/04: HDDS-7341. EC: Close pipelines with unregistered nodes (#3850)
This is an automated email from the ASF dual-hosted git repository.
captainzmc pushed a commit to branch ozone-1.3
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 36490993db7fd123fe29e63848da6d4185aa7cbf
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Wed Oct 19 09:53:30 2022 +0100
HDDS-7341. EC: Close pipelines with unregistered nodes (#3850)
---
.../hdds/scm/pipeline/PipelineManagerImpl.java | 40 ++++++++++++++++++---
.../hdds/scm/pipeline/TestPipelineManagerImpl.java | 41 +++++++++++++++++-----
2 files changed, 68 insertions(+), 13 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index aaa3088f9a..044cbc0166 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -509,22 +509,52 @@ public class PipelineManagerImpl implements PipelineManager {
if (p.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
(currentTime.toEpochMilli() - p.getCreationTimestamp()
.toEpochMilli() >= pipelineScrubTimeoutInMills)) {
- LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
- " since it stays at ALLOCATED stage for " +
+ LOG.info("Scrubbing pipeline: id: {} since it stays at ALLOCATED " +
+ "stage for {} mins.", p.getId(),
Duration.between(currentTime, p.getCreationTimestamp())
- .toMinutes() + " mins.");
+ .toMinutes());
closePipeline(p, false);
}
// scrub pipelines who stay CLOSED for too long.
if (p.getPipelineState() == Pipeline.PipelineState.CLOSED) {
- LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
- " since it stays at CLOSED stage.");
+ LOG.info("Scrubbing pipeline: id: {} since it stays at CLOSED stage.",
+ p.getId());
closeContainersForPipeline(p.getId());
removePipeline(p);
}
+ // If a datanode is stopped and then SCM is restarted, a pipeline can get
+ // stuck in an open state. For Ratis, provided some other DNs that were
+ // part of the open pipeline register to SCM after the restart, the Ratis
+ // pipeline close will get triggered by the DNs. For EC that will never
+ // happen, as the DNs are not aware of the pipeline. Therefore we should
+ // close any pipelines in the scrubber if they have nodes which are not
+ // registered
+ if (isOpenWithUnregisteredNodes(p)) {
+ LOG.info("Scrubbing pipeline: id: {} as it has unregistered nodes",
+ p.getId());
+ closeContainersForPipeline(p.getId());
+ closePipeline(p, true);
+ }
}
}
+ /**
+ * @param pipeline The pipeline to check
+ * @return True if the pipeline is open and contains unregistered nodes. False
+ * otherwise.
+ */
+ private boolean isOpenWithUnregisteredNodes(Pipeline pipeline) {
+ if (!pipeline.isOpen()) {
+ return false;
+ }
+ for (DatanodeDetails dn : pipeline.getNodes()) {
+ if (nodeManager.getNodeByUuid(dn.getUuidString()) == null) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* Schedules a fixed interval job to create pipelines.
*/
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index 8fa45f5842..1ed9b845ac 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
import com.google.common.base.Supplier;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -29,12 +30,14 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBuffer;
import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBufferStub;
@@ -125,6 +128,10 @@ public class TestPipelineManagerImpl {
GenericTestUtils.getRandomizedTempPath());
scm = HddsTestUtils.getScm(conf);
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ // Mock Node Manager is not able to correctly set up things for the EC
+ // placement policy (Rack Scatter), so just use the random one.
+ conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_EC_IMPL_KEY,
+ SCMContainerPlacementRandom.class.getName());
dbStore = DBStoreBuilder.createDBStore(conf, new SCMDBDefinition());
nodeManager = new MockNodeManager(true, 20);
maxPipelineCount = nodeManager.getNodeCount(
@@ -151,7 +158,7 @@ public class TestPipelineManagerImpl {
throws IOException {
return PipelineManagerImpl.newPipelineManager(conf,
SCMHAManagerStub.getInstance(isLeader),
- new MockNodeManager(true, 20),
+ nodeManager,
SCMDBDefinition.PIPELINES.getTable(dbStore),
new EventQueue(),
scmContext,
@@ -163,7 +170,7 @@ public class TestPipelineManagerImpl {
boolean isLeader, SCMHADBTransactionBuffer buffer) throws IOException {
return PipelineManagerImpl.newPipelineManager(conf,
SCMHAManagerStub.getInstance(isLeader, buffer),
- new MockNodeManager(true, 20),
+ nodeManager,
SCMDBDefinition.PIPELINES.getTable(dbStore),
new EventQueue(),
SCMContext.emptyContext(),
@@ -341,7 +348,6 @@ public class TestPipelineManagerImpl {
@Test
public void testRemovePipeline() throws Exception {
PipelineManagerImpl pipelineManager = createPipelineManager(true);
- pipelineManager.setScmContext(scmContext);
// Create a pipeline
Pipeline pipeline = pipelineManager.createPipeline(
RatisReplicationConfig.getInstance(ReplicationFactor.THREE));
@@ -391,7 +397,6 @@ public class TestPipelineManagerImpl {
@Test
public void testClosePipelineShouldFailOnFollower() throws Exception {
PipelineManagerImpl pipelineManager = createPipelineManager(true);
- pipelineManager.setScmContext(scmContext);
Pipeline pipeline = pipelineManager.createPipeline(
RatisReplicationConfig.getInstance(ReplicationFactor.THREE));
Assertions.assertEquals(1, pipelineManager.getPipelines().size());
@@ -413,7 +418,6 @@ public class TestPipelineManagerImpl {
@Test
public void testPipelineReport() throws Exception {
PipelineManagerImpl pipelineManager = createPipelineManager(true);
- pipelineManager.setScmContext(scmContext);
SCMSafeModeManager scmSafeModeManager =
new SCMSafeModeManager(conf, new ArrayList<>(), null, pipelineManager,
new EventQueue(), serviceManager, scmContext);
@@ -571,7 +575,6 @@ public class TestPipelineManagerImpl {
OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, 50, TimeUnit.SECONDS);
PipelineManagerImpl pipelineManager = createPipelineManager(true);
- pipelineManager.setScmContext(scmContext);
Pipeline allocatedPipeline = pipelineManager
.createPipeline(RatisReplicationConfig
.getInstance(ReplicationFactor.THREE));
@@ -628,13 +631,36 @@ public class TestPipelineManagerImpl {
pipelineManager.close();
}
+ @Test
+ public void testScrubOpenWithUnregisteredNodes() throws Exception {
+ PipelineManagerImpl pipelineManager = createPipelineManager(true);
+ Pipeline pipeline = pipelineManager
+ .createPipeline(new ECReplicationConfig(3, 2));
+ pipelineManager.openPipeline(pipeline.getId());
+
+ // Scrubbing the pipelines should not affect this pipeline
+ pipelineManager.scrubPipelines();
+ pipeline = pipelineManager.getPipeline(pipeline.getId());
+ Assertions.assertEquals(Pipeline.PipelineState.OPEN,
+ pipeline.getPipelineState());
+
+ // Now, "unregister" one of the nodes in the pipeline
+ DatanodeDetails firstDN = nodeManager.getNodeByUuid(
+ pipeline.getNodes().get(0).getUuidString());
+ nodeManager.getClusterNetworkTopologyMap().remove(firstDN);
+
+ pipelineManager.scrubPipelines();
+ pipeline = pipelineManager.getPipeline(pipeline.getId());
+ Assertions.assertEquals(Pipeline.PipelineState.CLOSED,
+ pipeline.getPipelineState());
+ }
+
@Test
public void testScrubPipelinesShouldFailOnFollower() throws Exception {
conf.setTimeDuration(
OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, 10, TimeUnit.SECONDS);
PipelineManagerImpl pipelineManager = createPipelineManager(true);
- pipelineManager.setScmContext(scmContext);
Pipeline pipeline = pipelineManager
.createPipeline(RatisReplicationConfig
.getInstance(ReplicationFactor.THREE));
@@ -765,7 +791,6 @@ public class TestPipelineManagerImpl {
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(LoggerFactory.getLogger(PipelineManagerImpl.class));
PipelineManagerImpl pipelineManager = createPipelineManager(true);
- pipelineManager.setScmContext(scmContext);
Pipeline pipeline = pipelineManager.createPipeline(
RatisReplicationConfig
.getInstance(HddsProtos.ReplicationFactor.THREE));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org