You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by el...@apache.org on 2020/06/15 13:12:59 UTC
[hadoop-ozone] branch HDDS-2823 updated: HDDS-3679 Add tests for
PipelineManager V2. (#1019)
This is an automated email from the ASF dual-hosted git repository.
elek pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push:
new 8e86480 HDDS-3679 Add tests for PipelineManager V2. (#1019)
8e86480 is described below
commit 8e86480896c86f6e05c8339718611c4b264dcd8e
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Mon Jun 15 21:12:52 2020 +0800
HDDS-3679 Add tests for PipelineManager V2. (#1019)
---
.../hdds/scm/pipeline/PipelineManagerV2Impl.java | 8 +-
.../hdds/scm/pipeline/PipelineStateManager.java | 4 +-
.../hdds/scm/pipeline/TestPipelieManagerImpl.java | 87 ----
.../hdds/scm/pipeline/TestPipelineManagerImpl.java | 462 +++++++++++++++++++++
4 files changed, 468 insertions(+), 93 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index f451000..3732add 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -299,8 +299,8 @@ public final class PipelineManagerV2Impl implements PipelineManager {
}
if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
LOG.info("Pipeline {} moved to OPEN state", pipeline);
- stateManager.updatePipelineState(pipelineId.getProtobuf(),
- HddsProtos.PipelineState.PIPELINE_OPEN);
+ stateManager.updatePipelineState(
+ pipelineId.getProtobuf(), HddsProtos.PipelineState.PIPELINE_OPEN);
}
metrics.incNumPipelineCreated();
metrics.createPerPipelineMetrics(pipeline);
@@ -349,8 +349,8 @@ public final class PipelineManagerV2Impl implements PipelineManager {
try {
Pipeline pipeline = stateManager.getPipeline(pipelineId);
if (!pipeline.isClosed()) {
- stateManager.updatePipelineState(pipelineId.getProtobuf(),
- HddsProtos.PipelineState.PIPELINE_CLOSED);
+ stateManager.updatePipelineState(
+ pipelineId.getProtobuf(), HddsProtos.PipelineState.PIPELINE_CLOSED);
LOG.info("Pipeline {} moved to CLOSED state", pipeline);
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index de6f186..899d877 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -201,8 +201,8 @@ public class PipelineStateManager implements StateManager {
}
@Override
- public void updatePipelineState(HddsProtos.PipelineID pipelineIDProto,
- HddsProtos.PipelineState newState)
+ public void updatePipelineState(
+ HddsProtos.PipelineID pipelineIDProto, HddsProtos.PipelineState newState)
throws IOException {
throw new IOException("Not supported.");
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelieManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelieManagerImpl.java
deleted file mode 100644
index a2a8e25..0000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelieManagerImpl.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.pipeline;
-
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.MockNodeManager;
-import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
-import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
-import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
-import org.apache.hadoop.hdds.server.events.EventQueue;
-import org.apache.hadoop.hdds.utils.db.DBStore;
-import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
-import org.apache.hadoop.ozone.container.common.SCMTestUtils;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.UUID;
-
-/**
- * Tests for PipelineManagerImpl.
- */
-public class TestPipelieManagerImpl {
- private PipelineManagerV2Impl pipelineManager;
- private File testDir;
- private DBStore dbStore;
-
- @Before
- public void init() throws Exception {
- final OzoneConfiguration conf = SCMTestUtils.getConf();
- testDir = GenericTestUtils.getTestDir(
- TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
- conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
- dbStore = DBStoreBuilder.createDBStore(conf, new SCMDBDefinition());
- pipelineManager = PipelineManagerV2Impl.newPipelineManager(
- conf, MockSCMHAManager.getInstance(),
- new MockNodeManager(true, 20),
- SCMDBDefinition.PIPELINES.getTable(dbStore), new EventQueue());
- }
-
- @After
- public void cleanup() throws Exception {
- if (pipelineManager != null) {
- pipelineManager.close();
- }
- if (dbStore != null) {
- dbStore.close();
- }
- FileUtil.fullyDelete(testDir);
- }
-
- @Test
- public void testCreatePipeline() throws Exception {
- Assert.assertTrue(pipelineManager.getPipelines().isEmpty());
- pipelineManager.allowPipelineCreation();
- Pipeline pipeline1 = pipelineManager.createPipeline(
- HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
- Assert.assertEquals(1, pipelineManager.getPipelines().size());
- Assert.assertTrue(pipelineManager.containsPipeline(pipeline1.getId()));
-
- Pipeline pipeline2 = pipelineManager.createPipeline(
- HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE);
- Assert.assertEquals(2, pipelineManager.getPipelines().size());
- Assert.assertTrue(pipelineManager.containsPipeline(pipeline2.getId()));
- }
-}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
new file mode 100644
index 0000000..f8eeb6e
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.ALLOCATED;
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for PipelineManagerImpl.
+ */
+public class TestPipelineManagerImpl {
+ private static OzoneConfiguration conf;
+ private static File testDir;
+ private DBStore dbStore;
+ private static MockNodeManager nodeManager;
+ private static int maxPipelineCount;
+ private static EventQueue eventQueue;
+
+ @Before
+ public void init() throws Exception {
+ conf = SCMTestUtils.getConf();
+ testDir = GenericTestUtils.getTestDir(
+ TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ dbStore = DBStoreBuilder.createDBStore(conf, new SCMDBDefinition());
+ nodeManager = new MockNodeManager(true, 20);
+ eventQueue = new EventQueue();
+ maxPipelineCount = nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY) *
+ conf.getInt(OZONE_DATANODE_PIPELINE_LIMIT,
+ OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT) /
+ HddsProtos.ReplicationFactor.THREE.getNumber();
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ if (dbStore != null) {
+ dbStore.close();
+ }
+ FileUtil.fullyDelete(testDir);
+ }
+
+ private PipelineManagerV2Impl createPipelineManager()
+ throws IOException {
+ return PipelineManagerV2Impl.newPipelineManager(
+ conf, MockSCMHAManager.getInstance(),
+ nodeManager,
+ SCMDBDefinition.PIPELINES.getTable(dbStore), eventQueue);
+ }
+
+ @Test
+ public void testCreatePipeline() throws Exception {
+ PipelineManagerV2Impl pipelineManager = createPipelineManager();
+ Assert.assertTrue(pipelineManager.getPipelines().isEmpty());
+ pipelineManager.allowPipelineCreation();
+ Pipeline pipeline1 = pipelineManager.createPipeline(
+ HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
+ Assert.assertEquals(1, pipelineManager.getPipelines().size());
+ Assert.assertTrue(pipelineManager.containsPipeline(pipeline1.getId()));
+
+ Pipeline pipeline2 = pipelineManager.createPipeline(
+ HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE);
+ Assert.assertEquals(2, pipelineManager.getPipelines().size());
+ Assert.assertTrue(pipelineManager.containsPipeline(pipeline2.getId()));
+ pipelineManager.close();
+
+ PipelineManagerV2Impl pipelineManager2 = createPipelineManager();
+ // Should be able to load previous pipelines.
+ Assert.assertFalse(pipelineManager.getPipelines().isEmpty());
+ Assert.assertEquals(2, pipelineManager.getPipelines().size());
+ pipelineManager.allowPipelineCreation();
+ Pipeline pipeline3 = pipelineManager.createPipeline(
+ HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
+ Assert.assertEquals(3, pipelineManager.getPipelines().size());
+ Assert.assertTrue(pipelineManager.containsPipeline(pipeline3.getId()));
+
+ pipelineManager2.close();
+ }
+
+ @Test
+ public void testUpdatePipelineStates() throws Exception {
+ PipelineManagerV2Impl pipelineManager = createPipelineManager();
+ pipelineManager.allowPipelineCreation();
+ Pipeline pipeline = pipelineManager.createPipeline(
+ HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
+ Assert.assertEquals(1, pipelineManager.getPipelines().size());
+ Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId()));
+ Assert.assertEquals(ALLOCATED, pipeline.getPipelineState());
+ PipelineID pipelineID = pipeline.getId();
+
+ pipelineManager.openPipeline(pipelineID);
+ pipelineManager.addContainerToPipeline(pipelineID, ContainerID.valueof(1));
+ Assert.assertTrue(pipelineManager
+ .getPipelines(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE,
+ Pipeline.PipelineState.OPEN).contains(pipeline));
+
+ pipelineManager.deactivatePipeline(pipeline.getId());
+ Assert.assertEquals(Pipeline.PipelineState.DORMANT,
+ pipelineManager.getPipeline(pipelineID).getPipelineState());
+ Assert.assertFalse(pipelineManager
+ .getPipelines(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE,
+ Pipeline.PipelineState.OPEN).contains(pipeline));
+
+ pipelineManager.activatePipeline(pipeline.getId());
+ Assert.assertTrue(pipelineManager
+ .getPipelines(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE,
+ Pipeline.PipelineState.OPEN).contains(pipeline));
+
+ pipelineManager.close();
+ }
+
+ @Test
+ public void testRemovePipeline() throws Exception {
+ PipelineManagerV2Impl pipelineManager = createPipelineManager();
+ pipelineManager.allowPipelineCreation();
+ // Create a pipeline
+ Pipeline pipeline = pipelineManager.createPipeline(
+ HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
+ Assert.assertEquals(1, pipelineManager.getPipelines().size());
+ Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId()));
+ Assert.assertEquals(ALLOCATED, pipeline.getPipelineState());
+
+ // Open the pipeline
+ pipelineManager.openPipeline(pipeline.getId());
+ pipelineManager
+ .addContainerToPipeline(pipeline.getId(), ContainerID.valueof(1));
+ Assert.assertTrue(pipelineManager
+ .getPipelines(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE,
+ Pipeline.PipelineState.OPEN).contains(pipeline));
+
+ try {
+ pipelineManager.removePipeline(pipeline.getId());
+ fail();
+ } catch (IOException ioe) {
+ // Should not be able to remove the OPEN pipeline.
+ Assert.assertEquals(1, pipelineManager.getPipelines().size());
+ } catch (Exception e) {
+ Assert.fail("Should not reach here.");
+ }
+
+ // Destroy pipeline
+ pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+ try {
+ pipelineManager.getPipeline(pipeline.getId());
+ fail("Pipeline should not have been retrieved");
+ } catch (PipelineNotFoundException e) {
+ // There should be no pipeline in pipelineManager.
+ Assert.assertEquals(0, pipelineManager.getPipelines().size());
+ }
+
+ pipelineManager.close();
+ }
+
+ @Test
+ public void testPipelineReport() throws Exception {
+ PipelineManagerV2Impl pipelineManager = createPipelineManager();
+ pipelineManager.allowPipelineCreation();
+ SCMSafeModeManager scmSafeModeManager =
+ new SCMSafeModeManager(conf, new ArrayList<>(), pipelineManager,
+ eventQueue);
+ Pipeline pipeline = pipelineManager
+ .createPipeline(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+
+ // pipeline is not healthy until all dns report
+ List<DatanodeDetails> nodes = pipeline.getNodes();
+ Assert.assertFalse(
+ pipelineManager.getPipeline(pipeline.getId()).isHealthy());
+ // get pipeline report from each dn in the pipeline
+ PipelineReportHandler pipelineReportHandler =
+ new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf);
+ nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
+ pipelineReportHandler, false));
+ sendPipelineReport(nodes.get(nodes.size() - 1), pipeline,
+ pipelineReportHandler, true);
+
+ // pipeline is healthy when all dns report
+ Assert
+ .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isHealthy());
+ // pipeline should now move to open state
+ Assert
+ .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen());
+
+ // close the pipeline
+ pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+
+ // pipeline report for destroyed pipeline should be ignored
+ nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
+ pipelineReportHandler, false));
+ sendPipelineReport(nodes.get(nodes.size() - 1), pipeline,
+ pipelineReportHandler, true);
+
+ try {
+ pipelineManager.getPipeline(pipeline.getId());
+ fail("Pipeline should not have been retrieved");
+ } catch (PipelineNotFoundException e) {
+ // should reach here
+ }
+
+ // clean up
+ pipelineManager.close();
+ }
+
+ @Test
+ public void testPipelineCreationFailedMetric() throws Exception {
+ PipelineManagerV2Impl pipelineManager = createPipelineManager();
+ pipelineManager.allowPipelineCreation();
+
+ // No pipeline at start
+ MetricsRecordBuilder metrics = getMetrics(
+ SCMPipelineMetrics.class.getSimpleName());
+ long numPipelineAllocated = getLongCounter("NumPipelineAllocated",
+ metrics);
+ Assert.assertEquals(0, numPipelineAllocated);
+
+ // 3 DNs are unhealthy.
+ // Create 5 pipelines (Use up 15 Datanodes)
+
+ for (int i = 0; i < maxPipelineCount; i++) {
+ Pipeline pipeline = pipelineManager
+ .createPipeline(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+ Assert.assertNotNull(pipeline);
+ }
+
+ metrics = getMetrics(
+ SCMPipelineMetrics.class.getSimpleName());
+ numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
+ Assert.assertEquals(maxPipelineCount, numPipelineAllocated);
+
+ long numPipelineCreateFailed = getLongCounter(
+ "NumPipelineCreationFailed", metrics);
+ Assert.assertEquals(0, numPipelineCreateFailed);
+
+ //This should fail...
+ try {
+ pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+ fail();
+ } catch (SCMException ioe) {
+ // pipeline creation failed this time.
+ Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE,
+ ioe.getResult());
+ }
+
+ metrics = getMetrics(
+ SCMPipelineMetrics.class.getSimpleName());
+ numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
+ Assert.assertEquals(maxPipelineCount, numPipelineAllocated);
+
+ numPipelineCreateFailed = getLongCounter(
+ "NumPipelineCreationFailed", metrics);
+ Assert.assertEquals(1, numPipelineCreateFailed);
+
+ // clean up
+ pipelineManager.close();
+ }
+
+ @Test
+ public void testPipelineOpenOnlyWhenLeaderReported() throws Exception {
+ PipelineManagerV2Impl pipelineManager = createPipelineManager();
+ pipelineManager.allowPipelineCreation();
+
+ pipelineManager.onMessage(
+ new SCMSafeModeManager.SafeModeStatus(true, true), null);
+ Pipeline pipeline = pipelineManager
+ .createPipeline(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+ // close manager
+ pipelineManager.close();
+ // new pipeline manager loads the pipelines from the db in ALLOCATED state
+ pipelineManager = createPipelineManager();
+ Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
+ pipelineManager.getPipeline(pipeline.getId()).getPipelineState());
+
+ SCMSafeModeManager scmSafeModeManager =
+ new SCMSafeModeManager(new OzoneConfiguration(),
+ new ArrayList<>(), pipelineManager, eventQueue);
+ PipelineReportHandler pipelineReportHandler =
+ new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf);
+
+ // Report pipelines with leaders
+ List<DatanodeDetails> nodes = pipeline.getNodes();
+ Assert.assertEquals(3, nodes.size());
+ // Send report for all but no leader
+ nodes.forEach(dn -> sendPipelineReport(dn, pipeline, pipelineReportHandler,
+ false));
+
+ Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
+ pipelineManager.getPipeline(pipeline.getId()).getPipelineState());
+
+ nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
+ pipelineReportHandler, false));
+ sendPipelineReport(nodes.get(nodes.size() - 1), pipeline,
+ pipelineReportHandler, true);
+
+ Assert.assertEquals(Pipeline.PipelineState.OPEN,
+ pipelineManager.getPipeline(pipeline.getId()).getPipelineState());
+
+ pipelineManager.close();
+ }
+
+ @Test
+ public void testScrubPipeline() throws Exception {
+ // No timeout for pipeline scrubber.
+ conf.setTimeDuration(
+ OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
+ TimeUnit.MILLISECONDS);
+
+ PipelineManagerV2Impl pipelineManager = createPipelineManager();
+ pipelineManager.allowPipelineCreation();
+ Pipeline pipeline = pipelineManager
+ .createPipeline(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+ // At this point, pipeline is not at OPEN stage.
+ Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
+ pipeline.getPipelineState());
+
+ // pipeline should be seen in pipelineManager as ALLOCATED.
+ Assert.assertTrue(pipelineManager
+ .getPipelines(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE,
+ Pipeline.PipelineState.ALLOCATED).contains(pipeline));
+ pipelineManager.scrubPipeline(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+
+ // pipeline should be scrubbed.
+ Assert.assertFalse(pipelineManager
+ .getPipelines(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE,
+ Pipeline.PipelineState.ALLOCATED).contains(pipeline));
+
+ pipelineManager.close();
+ }
+
+ @Test
+ public void testPipelineNotCreatedUntilSafeModePrecheck() throws Exception {
+ // No timeout for pipeline scrubber.
+ conf.setTimeDuration(
+ OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
+ TimeUnit.MILLISECONDS);
+
+ PipelineManagerV2Impl pipelineManager = createPipelineManager();
+ try {
+ pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+ fail("Pipelines should not have been created");
+ } catch (IOException e) {
+ // No pipeline is created.
+ Assert.assertTrue(pipelineManager.getPipelines().isEmpty());
+ }
+
+ // Ensure a pipeline of factor ONE can be created - no exceptions should be
+ // raised.
+ Pipeline pipeline = pipelineManager
+ .createPipeline(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.ONE);
+ Assert.assertTrue(pipelineManager
+ .getPipelines(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.ONE).contains(pipeline));
+
+ // Simulate safemode check exiting.
+ pipelineManager.onMessage(
+ new SCMSafeModeManager.SafeModeStatus(true, true), null);
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return pipelineManager.getPipelines().size() != 0;
+ }
+ }, 100, 10000);
+ pipelineManager.close();
+ }
+
+ @Test
+ public void testSafeModeUpdatedOnSafemodeExit() throws Exception {
+ // No timeout for pipeline scrubber.
+ conf.setTimeDuration(
+ OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
+ TimeUnit.MILLISECONDS);
+
+ PipelineManagerV2Impl pipelineManager = createPipelineManager();
+ Assert.assertTrue(pipelineManager.getSafeModeStatus());
+ Assert.assertFalse(pipelineManager.isPipelineCreationAllowed());
+ // First pass pre-check as true, but safemode still on
+ pipelineManager.onMessage(
+ new SCMSafeModeManager.SafeModeStatus(true, true), null);
+ Assert.assertTrue(pipelineManager.getSafeModeStatus());
+ Assert.assertTrue(pipelineManager.isPipelineCreationAllowed());
+
+ // Then also turn safemode off
+ pipelineManager.onMessage(
+ new SCMSafeModeManager.SafeModeStatus(false, true), null);
+ Assert.assertFalse(pipelineManager.getSafeModeStatus());
+ Assert.assertTrue(pipelineManager.isPipelineCreationAllowed());
+ pipelineManager.close();
+ }
+
+ private void sendPipelineReport(
+ DatanodeDetails dn, Pipeline pipeline,
+ PipelineReportHandler pipelineReportHandler,
+ boolean isLeader) {
+ SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode report =
+ TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId(), isLeader);
+ pipelineReportHandler.onMessage(report, eventQueue);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org