You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by el...@apache.org on 2019/11/20 09:02:08 UTC
[hadoop-ozone] 01/02: Revert "Revert "HDDS-2034. Async RATIS
pipeline creation and destroy through heartbeat commands (#29)""
This is an automated email from the ASF dual-hosted git repository.
elek pushed a commit to branch HDDS-2531
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit 3e7e21244c9a0bf249c976f20ae4e559acd1013d
Author: Márton Elek <el...@apache.org>
AuthorDate: Wed Nov 20 10:00:11 2019 +0100
Revert "Revert "HDDS-2034. Async RATIS pipeline creation and destroy through heartbeat commands (#29)""
This reverts commit dcfe5f34d79473def14f25812d77c6c685b92e58.
---
.../org/apache/hadoop/hdds/HddsConfigKeys.java | 12 +-
.../apache/hadoop/hdds/scm/pipeline/Pipeline.java | 5 +
.../org/apache/hadoop/hdds/utils/Scheduler.java | 7 +-
.../common/src/main/resources/ozone-default.xml | 30 ++-
.../common/statemachine/DatanodeStateMachine.java | 6 +
.../CloseContainerCommandHandler.java | 11 +-
.../ClosePipelineCommandHandler.java | 120 ++++++++++++
.../commandhandler/CommandHandler.java | 2 +-
.../CreatePipelineCommandHandler.java | 135 +++++++++++++
.../states/endpoint/HeartbeatEndpointTask.java | 22 +++
.../common/transport/server/XceiverServerSpi.java | 18 ++
.../transport/server/ratis/XceiverServerRatis.java | 36 ++++
.../protocol/commands/ClosePipelineCommand.java | 73 +++++++
.../protocol/commands/CreatePipelineCommand.java | 100 ++++++++++
.../proto/StorageContainerDatanodeProtocol.proto | 23 +++
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 3 +
.../hdds/scm/container/ContainerStateManager.java | 1 +
.../apache/hadoop/hdds/scm/events/SCMEvents.java | 12 +-
.../scm/pipeline/BackgroundPipelineCreator.java | 9 +-
.../hadoop/hdds/scm/pipeline/PipelineFactory.java | 13 +-
.../hadoop/hdds/scm/pipeline/PipelineManager.java | 15 +-
.../hadoop/hdds/scm/pipeline/PipelineProvider.java | 2 +
.../hdds/scm/pipeline/PipelineReportHandler.java | 54 +++---
.../hdds/scm/pipeline/PipelineStateManager.java | 4 +-
.../hdds/scm/pipeline/RatisPipelineProvider.java | 137 +++++---------
.../hdds/scm/pipeline/RatisPipelineUtils.java | 103 ----------
.../hdds/scm/pipeline/SCMPipelineManager.java | 82 ++++++--
.../hdds/scm/pipeline/SCMPipelineMetrics.java | 10 +
.../hdds/scm/pipeline/SimplePipelineProvider.java | 5 +
.../scm/safemode/HealthyPipelineSafeModeRule.java | 83 ++++----
.../safemode/OneReplicaPipelineSafeModeRule.java | 65 +++----
.../hdds/scm/safemode/SCMSafeModeManager.java | 16 +-
.../hadoop/hdds/scm/safemode/SafeModeHandler.java | 5 +-
.../scm/server/SCMDatanodeHeartbeatDispatcher.java | 1 +
.../hdds/scm/server/SCMDatanodeProtocolServer.java | 20 ++
.../hdds/scm/server/StorageContainerManager.java | 3 +-
.../java/org/apache/hadoop/hdds/scm/TestUtils.java | 12 ++
.../hadoop/hdds/scm/block/TestBlockManager.java | 44 ++++-
.../container/TestCloseContainerEventHandler.java | 11 +-
.../scm/container/TestSCMContainerManager.java | 4 +-
.../hdds/scm/node/TestContainerPlacement.java | 2 +-
.../hadoop/hdds/scm/node/TestDeadNodeHandler.java | 11 +-
.../hadoop/hdds/scm/node/TestSCMNodeManager.java | 9 +-
.../scm/pipeline/MockRatisPipelineProvider.java | 10 +-
.../scm/pipeline/TestRatisPipelineProvider.java | 26 ++-
.../safemode/TestHealthyPipelineSafeModeRule.java | 44 ++---
.../TestOneReplicaPipelineSafeModeRule.java | 36 +---
.../hdds/scm/safemode/TestSCMSafeModeManager.java | 50 ++---
hadoop-ozone/dist/src/main/compose/testlib.sh | 35 ++++
.../TestContainerStateManagerIntegration.java | 6 +-
.../metrics/TestSCMContainerManagerMetrics.java | 3 +
.../hdds/scm/pipeline/TestPipelineClose.java | 4 +-
.../scm/pipeline/TestRatisPipelineProvider.java | 210 +++++++++++++++++++++
.../hdds/scm/pipeline/TestSCMPipelineManager.java | 93 +++++++--
.../org/apache/hadoop/ozone/MiniOzoneCluster.java | 2 +-
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 43 ++---
.../hadoop/ozone/TestContainerOperations.java | 2 +-
.../TestContainerStateMachineIdempotency.java | 2 +-
.../apache/hadoop/ozone/TestMiniOzoneCluster.java | 6 +-
.../hadoop/ozone/TestStorageContainerManager.java | 18 +-
.../apache/hadoop/ozone/client/rpc/TestBCSID.java | 3 +
.../client/rpc/TestContainerStateMachine.java | 3 +
.../rpc/TestContainerStateMachineFailures.java | 9 +-
.../ozone/container/TestContainerReplication.java | 2 +-
.../commandhandler/TestBlockDeletion.java | 6 +-
.../commandhandler/TestCloseContainerHandler.java | 2 +
.../commandhandler/TestDeleteContainerHandler.java | 2 +
.../hadoop/ozone/dn/scrubber/TestDataScrubber.java | 2 +
.../org/apache/hadoop/ozone/om/TestKeyPurging.java | 2 +-
.../apache/hadoop/ozone/om/TestScmSafeMode.java | 2 +-
.../hadoop/ozone/scm/TestContainerSmallFile.java | 2 +-
.../scm/TestGetCommittedBlockLengthAndPutKey.java | 2 +-
.../org/apache/hadoop/ozone/scm/TestSCMMXBean.java | 2 +-
.../hadoop/ozone/scm/node/TestSCMNodeMetrics.java | 4 +-
.../hadoop/fs/ozone/TestOzoneFsRenameDir.java | 2 +-
.../hadoop/ozone/fsck/TestContainerMapper.java | 2 +-
76 files changed, 1403 insertions(+), 570 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 99972ae..5e161b3 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -81,7 +81,12 @@ public final class HddsConfigKeys {
public static final String HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK =
"hdds.scm.safemode.pipeline-availability.check";
public static final boolean
- HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT = false;
+ HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT = true;
+
+ public static final String HDDS_SCM_SAFEMODE_PIPELINE_CREATION =
+ "hdds.scm.safemode.pipeline.creation";
+ public static final boolean
+ HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT = true;
// % of containers which should have at least one reported replica
// before SCM comes out of safe mode.
@@ -89,13 +94,16 @@ public final class HddsConfigKeys {
"hdds.scm.safemode.threshold.pct";
public static final double HDDS_SCM_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.99;
-
// percentage of healthy pipelines, where all 3 datanodes are reported in the
// pipeline.
public static final String HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT =
"hdds.scm.safemode.healthy.pipelie.pct";
public static final double
HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT = 0.10;
+ // number of healthy RATIS pipeline(ONE or THREE factor)
+ public static final String HDDS_SCM_SAFEMODE_MIN_PIPELINE =
+ "hdds.scm.safemode.min.pipeline";
+ public static final int HDDS_SCM_SAFEMODE_MIN_PIPELINE_DEFAULT = 1;
public static final String HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT =
"hdds.scm.safemode.atleast.one.node.reported.pipeline.pct";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 47ec453..51598a7 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -155,6 +155,11 @@ public final class Pipeline {
return state == PipelineState.OPEN;
}
+ public boolean isAllocationTimeout() {
+ //TODO: define a system property to control the timeout value
+ return false;
+ }
+
public void setNodesInOrder(List<DatanodeDetails> nodes) {
nodesInOrder.set(nodes);
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Scheduler.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Scheduler.java
index 9edc104..f5e55c1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Scheduler.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Scheduler.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
@@ -72,9 +73,9 @@ public class Scheduler {
}, delay, timeUnit);
}
- public void scheduleWithFixedDelay(Runnable runnable, long initialDelay,
- long fixedDelay, TimeUnit timeUnit) {
- scheduler
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable,
+ long initialDelay, long fixedDelay, TimeUnit timeUnit) {
+ return scheduler
.scheduleWithFixedDelay(runnable, initialDelay, fixedDelay, timeUnit);
}
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 2f9ce31..c20a6a4 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -319,15 +319,6 @@
defined with postfix (ns,ms,s,m,h,d)</description>
</property>
<property>
- <name>hdds.command.status.report.interval</name>
- <value>60000ms</value>
- <tag>OZONE, CONTAINER, MANAGEMENT</tag>
- <description>Time interval of the datanode to send status of command
- execution. Each datanode periodically the execution status of commands
- received from SCM to SCM. Unit could be defined with postfix
- (ns,ms,s,m,h,d)</description>
- </property>
- <property>
<name>hdds.pipeline.report.interval</name>
<value>60000ms</value>
<tag>OZONE, PIPELINE, MANAGEMENT</tag>
@@ -1300,7 +1291,7 @@
<property>
<name>hdds.scm.safemode.pipeline-availability.check</name>
- <value>false</value>
+ <value>true</value>
<tag>HDDS,SCM,OPERATION</tag>
<description>
Boolean value to enable pipeline availability check during SCM safe mode.
@@ -1386,6 +1377,25 @@
</property>
<property>
+ <name>hdds.scm.safemode.pipeline.creation</name>
+ <value>true</value>
+ <tag>HDDS,SCM,OPERATION</tag>
+ <description>
+ Boolean value to enable background pipeline creation in SCM safe mode.
+ </description>
+ </property>
+
+ <property>
+ <name>hdds.scm.safemode.min.pipeline</name>
+ <value>1</value>
+ <tag>HDDS,SCM,OPERATION</tag>
+ <description>
+ Minimum RATIS pipeline number to exit SCM safe mode. Considered only when
+ "hdds.scm.safemode.pipeline.creation" is True.
+ </description>
+ </property>
+
+ <property>
<name>hdds.lock.max.concurrency</name>
<value>100</value>
<tag>HDDS</tag>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 5424b6b..e7fda00 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -39,8 +39,12 @@ import org.apache.hadoop.ozone.container.common.report.ReportManager;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.CloseContainerCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+ .ClosePipelineCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.CommandDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+ .CreatePipelineCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.DeleteBlocksCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.DeleteContainerCommandHandler;
@@ -132,6 +136,8 @@ public class DatanodeStateMachine implements Closeable {
.addHandler(new ReplicateContainerCommandHandler(conf, supervisor))
.addHandler(new DeleteContainerCommandHandler(
dnConf.getContainerDeleteThreads()))
+ .addHandler(new ClosePipelineCommandHandler())
+ .addHandler(new CreatePipelineCommandHandler())
.setConnectionManager(connectionManager)
.setContainer(container)
.setContext(context)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index ef06c14..3c4e24a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -39,6 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Handler for close container command received from SCM.
@@ -48,7 +49,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
private static final Logger LOG =
LoggerFactory.getLogger(CloseContainerCommandHandler.class);
- private int invocationCount;
+ private AtomicLong invocationCount = new AtomicLong(0);
private long totalTime;
/**
@@ -69,7 +70,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
public void handle(SCMCommand command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
LOG.debug("Processing Close Container command.");
- invocationCount++;
+ invocationCount.incrementAndGet();
final long startTime = Time.monotonicNow();
final DatanodeDetails datanodeDetails = context.getParent()
.getDatanodeDetails();
@@ -162,7 +163,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
*/
@Override
public int getInvocationCount() {
- return invocationCount;
+ return (int)invocationCount.get();
}
/**
@@ -172,8 +173,8 @@ public class CloseContainerCommandHandler implements CommandHandler {
*/
@Override
public long getAverageRunTime() {
- if (invocationCount > 0) {
- return totalTime / invocationCount;
+ if (invocationCount.get() > 0) {
+ return totalTime / invocationCount.get();
}
return 0;
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
new file mode 100644
index 0000000..b1c6090
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.transport.server
+ .XceiverServerSpi;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Handler for close pipeline command received from SCM.
+ */
+public class ClosePipelineCommandHandler implements CommandHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ClosePipelineCommandHandler.class);
+
+ private AtomicLong invocationCount = new AtomicLong(0);
+ private long totalTime;
+
+ /**
+ * Constructs a closePipelineCommand handler.
+ */
+ public ClosePipelineCommandHandler() {
+ }
+
+ /**
+ * Handles a given SCM command.
+ *
+ * @param command - SCM Command
+ * @param ozoneContainer - Ozone Container.
+ * @param context - Current Context.
+ * @param connectionManager - The SCMs that we are talking to.
+ */
+ @Override
+ public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+ StateContext context, SCMConnectionManager connectionManager) {
+ invocationCount.incrementAndGet();
+ final long startTime = Time.monotonicNow();
+ final DatanodeDetails dn = context.getParent().getDatanodeDetails();
+ final ClosePipelineCommandProto closeCommand =
+ ((ClosePipelineCommand)command).getProto();
+ final HddsProtos.PipelineID pipelineID = closeCommand.getPipelineID();
+
+ try {
+ XceiverServerSpi server = ozoneContainer.getWriteChannel();
+ server.removeGroup(pipelineID);
+ LOG.info("Close Pipeline #{} command on datanode #{}.", pipelineID,
+ dn.getUuidString());
+ } catch (IOException e) {
+ LOG.error("Can't close pipeline #{}", pipelineID, e);
+ } finally {
+ long endTime = Time.monotonicNow();
+ totalTime += endTime - startTime;
+ }
+ }
+
+ /**
+ * Returns the command type that this command handler handles.
+ *
+ * @return Type
+ */
+ @Override
+ public SCMCommandProto.Type getCommandType() {
+ return SCMCommandProto.Type.closePipelineCommand;
+ }
+
+ /**
+ * Returns number of times this handler has been invoked.
+ *
+ * @return int
+ */
+ @Override
+ public int getInvocationCount() {
+ return (int)invocationCount.get();
+ }
+
+ /**
+ * Returns the average time this function takes to run.
+ *
+ * @return long
+ */
+ @Override
+ public long getAverageRunTime() {
+ if (invocationCount.get() > 0) {
+ return totalTime / invocationCount.get();
+ }
+ return 0;
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
index 70ed9ca..3a6566f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
@@ -68,7 +68,7 @@ public interface CommandHandler {
default void updateCommandStatus(StateContext context, SCMCommand command,
Consumer<CommandStatus> cmdStatusUpdater, Logger log) {
if (!context.updateCommandStatus(command.getId(), cmdStatusUpdater)) {
- log.debug("{} with Id:{} not found.", command.getType(),
+ log.warn("{} with Id:{} not found.", command.getType(),
command.getId());
}
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
new file mode 100644
index 0000000..3a60d7e
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.CreatePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.transport.server
+ .XceiverServerSpi;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/**
+ * Handler for create pipeline command received from SCM.
+ */
+public class CreatePipelineCommandHandler implements CommandHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CreatePipelineCommandHandler.class);
+
+ private AtomicLong invocationCount = new AtomicLong(0);
+ private long totalTime;
+
+ /**
+ * Constructs a createPipelineCommand handler.
+ */
+ public CreatePipelineCommandHandler() {
+ }
+
+ /**
+ * Handles a given SCM command.
+ *
+ * @param command - SCM Command
+ * @param ozoneContainer - Ozone Container.
+ * @param context - Current Context.
+ * @param connectionManager - The SCMs that we are talking to.
+ */
+ @Override
+ public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+ StateContext context, SCMConnectionManager connectionManager) {
+ invocationCount.incrementAndGet();
+ final long startTime = Time.monotonicNow();
+ final DatanodeDetails dn = context.getParent()
+ .getDatanodeDetails();
+ final CreatePipelineCommandProto createCommand =
+ ((CreatePipelineCommand)command).getProto();
+ final HddsProtos.PipelineID pipelineID = createCommand.getPipelineID();
+ Collection<DatanodeDetails> peers =
+ createCommand.getDatanodeList().stream()
+ .map(DatanodeDetails::getFromProtoBuf)
+ .collect(Collectors.toList());
+
+ try {
+ XceiverServerSpi server = ozoneContainer.getWriteChannel();
+ server.addGroup(pipelineID, peers);
+ LOG.info("Create Pipeline {} {} #{} command succeed on datanode {}.",
+ createCommand.getType(), createCommand.getFactor(), pipelineID,
+ dn.getUuidString());
+ // Trigger heartbeat report
+ context.addReport(context.getParent().getContainer().getPipelineReport());
+ context.getParent().triggerHeartbeat();
+ } catch (NotLeaderException e) {
+ LOG.debug("Follower cannot create pipeline #{}.", pipelineID);
+ } catch (IOException e) {
+ LOG.error("Can't create pipeline {} {} #{}", createCommand.getType(),
+ createCommand.getFactor(), pipelineID, e);
+ } finally {
+ long endTime = Time.monotonicNow();
+ totalTime += endTime - startTime;
+ }
+ }
+
+ /**
+ * Returns the command type that this command handler handles.
+ *
+ * @return Type
+ */
+ @Override
+ public SCMCommandProto.Type getCommandType() {
+ return SCMCommandProto.Type.createPipelineCommand;
+ }
+
+ /**
+ * Returns number of times this handler has been invoked.
+ *
+ * @return int
+ */
+ @Override
+ public int getInvocationCount() {
+ return (int)invocationCount.get();
+ }
+
+ /**
+ * Returns the average time this function takes to run.
+ *
+ * @return long
+ */
+ @Override
+ public long getAverageRunTime() {
+ if (invocationCount.get() > 0) {
+ return totalTime / invocationCount.get();
+ }
+ return 0;
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index c50f457..a55d0d6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine.EndPointStates;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
@@ -309,6 +311,26 @@ public class HeartbeatEndpointTask
}
this.context.addCommand(deleteContainerCommand);
break;
+ case createPipelineCommand:
+ CreatePipelineCommand createPipelineCommand =
+ CreatePipelineCommand.getFromProtobuf(
+ commandResponseProto.getCreatePipelineCommandProto());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received SCM create pipeline request {}",
+ createPipelineCommand.getPipelineID());
+ }
+ this.context.addCommand(createPipelineCommand);
+ break;
+ case closePipelineCommand:
+ ClosePipelineCommand closePipelineCommand =
+ ClosePipelineCommand.getFromProtobuf(
+ commandResponseProto.getClosePipelineCommandProto());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received SCM close pipeline request {}",
+ closePipelineCommand.getPipelineID());
+ }
+ this.context.addCommand(closePipelineCommand);
+ break;
default:
throw new IllegalArgumentException("Unknown response : "
+ commandResponseProto.getCommandType().name());
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
index 4e0d343..01f463c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.common.transport.server;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -25,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReport;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
/** A server endpoint that acts as the communication layer for Ozone
@@ -60,6 +62,22 @@ public interface XceiverServerSpi {
*/
boolean isExist(HddsProtos.PipelineID pipelineId);
+
+ /**
+ * Join a new pipeline.
+ */
+ default void addGroup(HddsProtos.PipelineID pipelineId,
+ Collection<DatanodeDetails> peers) throws IOException {
+ }
+
+
+ /**
+ * Exit a pipeline.
+ */
+ default void removeGroup(HddsProtos.PipelineID pipelineId)
+ throws IOException {
+ }
+
/**
* Get pipeline report for the XceiverServer instance.
* @return list of report for each pipeline.
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 1146394..a76944b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -67,6 +67,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -620,6 +621,41 @@ public final class XceiverServerRatis implements XceiverServerSpi {
return pipelineIDs;
}
+ @Override
+ public void addGroup(HddsProtos.PipelineID pipelineId,
+ Collection<DatanodeDetails> peers) throws IOException {
+ final PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineId);
+ final RaftGroupId groupId = RaftGroupId.valueOf(pipelineID.getId());
+ final RaftGroup group = RatisHelper.newRaftGroup(groupId, peers);
+ GroupManagementRequest request = GroupManagementRequest.newAdd(
+ clientId, server.getId(), nextCallId(), group);
+
+ RaftClientReply reply;
+ try {
+ reply = server.groupManagement(request);
+ } catch (Exception e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ processReply(reply);
+ }
+
+ @Override
+ public void removeGroup(HddsProtos.PipelineID pipelineId)
+ throws IOException {
+ GroupManagementRequest request = GroupManagementRequest.newRemove(
+ clientId, server.getId(), nextCallId(),
+ RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineId).getId()),
+ true);
+
+ RaftClientReply reply;
+ try {
+ reply = server.groupManagement(request);
+ } catch (Exception e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ processReply(reply);
+ }
+
void handleNodeSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
handlePipelineFailure(groupId, roleInfoProto);
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java
new file mode 100644
index 0000000..1f75bc3
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+
+/**
+ * Asks datanode to close a pipeline.
+ */
+public class ClosePipelineCommand
+ extends SCMCommand<ClosePipelineCommandProto> {
+
+ private final PipelineID pipelineID;
+
+ public ClosePipelineCommand(final PipelineID pipelineID) {
+ super();
+ this.pipelineID = pipelineID;
+ }
+
+ public ClosePipelineCommand(long cmdId, final PipelineID pipelineID) {
+ super(cmdId);
+ this.pipelineID = pipelineID;
+ }
+
+ /**
+ * Returns the type of this command.
+ *
+ * @return Type
+ */
+ @Override
+ public SCMCommandProto.Type getType() {
+ return SCMCommandProto.Type.closePipelineCommand;
+ }
+
+ @Override
+ public ClosePipelineCommandProto getProto() {
+ ClosePipelineCommandProto.Builder builder =
+ ClosePipelineCommandProto.newBuilder();
+ builder.setCmdId(getId());
+ builder.setPipelineID(pipelineID.getProtobuf());
+ return builder.build();
+ }
+
+ public static ClosePipelineCommand getFromProtobuf(
+ ClosePipelineCommandProto createPipelineProto) {
+ Preconditions.checkNotNull(createPipelineProto);
+ return new ClosePipelineCommand(createPipelineProto.getCmdId(),
+ PipelineID.getFromProtobuf(createPipelineProto.getPipelineID()));
+ }
+
+ public PipelineID getPipelineID() {
+ return pipelineID;
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
new file mode 100644
index 0000000..9e22cbc
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.CreatePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Asks datanode to create a pipeline.
+ */
+public class CreatePipelineCommand
+ extends SCMCommand<CreatePipelineCommandProto> {
+
+ private final PipelineID pipelineID;
+ private final ReplicationFactor factor;
+ private final ReplicationType type;
+ private final List<DatanodeDetails> nodelist;
+
+ public CreatePipelineCommand(final PipelineID pipelineID,
+ final ReplicationType type, final ReplicationFactor factor,
+ final List<DatanodeDetails> datanodeList) {
+ super();
+ this.pipelineID = pipelineID;
+ this.factor = factor;
+ this.type = type;
+ this.nodelist = datanodeList;
+ }
+
+ public CreatePipelineCommand(long cmdId, final PipelineID pipelineID,
+ final ReplicationType type, final ReplicationFactor factor,
+ final List<DatanodeDetails> datanodeList) {
+ super(cmdId);
+ this.pipelineID = pipelineID;
+ this.factor = factor;
+ this.type = type;
+ this.nodelist = datanodeList;
+ }
+
+ /**
+ * Returns the type of this command.
+ *
+ * @return Type
+ */
+ @Override
+ public SCMCommandProto.Type getType() {
+ return SCMCommandProto.Type.createPipelineCommand;
+ }
+
+ @Override
+ public CreatePipelineCommandProto getProto() {
+ return CreatePipelineCommandProto.newBuilder()
+ .setCmdId(getId())
+ .setPipelineID(pipelineID.getProtobuf())
+ .setFactor(factor)
+ .setType(type)
+ .addAllDatanode(nodelist.stream()
+ .map(DatanodeDetails::getProtoBufMessage)
+ .collect(Collectors.toList()))
+ .build();
+ }
+
+ public static CreatePipelineCommand getFromProtobuf(
+ CreatePipelineCommandProto createPipelineProto) {
+ Preconditions.checkNotNull(createPipelineProto);
+ return new CreatePipelineCommand(createPipelineProto.getCmdId(),
+ PipelineID.getFromProtobuf(createPipelineProto.getPipelineID()),
+ createPipelineProto.getType(), createPipelineProto.getFactor(),
+ createPipelineProto.getDatanodeList().stream()
+ .map(DatanodeDetails::getFromProtoBuf)
+ .collect(Collectors.toList()));
+ }
+
+ public PipelineID getPipelineID() {
+ return pipelineID;
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 45a1db6..8b272c8 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -283,6 +283,8 @@ message SCMCommandProto {
closeContainerCommand = 3;
deleteContainerCommand = 4;
replicateContainerCommand = 5;
+ createPipelineCommand = 6;
+ closePipelineCommand = 7;
}
// TODO: once we start using protoc 3.x, refactor this message using "oneof"
required Type commandType = 1;
@@ -291,6 +293,8 @@ message SCMCommandProto {
optional CloseContainerCommandProto closeContainerCommandProto = 4;
optional DeleteContainerCommandProto deleteContainerCommandProto = 5;
optional ReplicateContainerCommandProto replicateContainerCommandProto = 6;
+ optional CreatePipelineCommandProto createPipelineCommandProto = 7;
+ optional ClosePipelineCommandProto closePipelineCommandProto = 8;
}
/**
@@ -360,6 +364,25 @@ message ReplicateContainerCommandProto {
}
/**
+This command asks the datanode to create a pipeline.
+*/
+message CreatePipelineCommandProto {
+ required PipelineID pipelineID = 1;
+ required ReplicationType type = 2;
+ required ReplicationFactor factor = 3;
+ repeated DatanodeDetailsProto datanode = 4;
+ required int64 cmdId = 5;
+}
+
+/**
+This command asks the datanode to close a pipeline.
+*/
+message ClosePipelineCommandProto {
+ required PipelineID pipelineID = 1;
+ required int64 cmdId = 2;
+}
+
+/**
* Protocol used from a datanode to StorageContainerManager.
*
* Please see the request and response messages for details of the RPC calls.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 845bdf1..b7a7525 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.management.ObjectName;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.BlockID;
@@ -196,6 +197,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
// TODO: #CLUTIL Remove creation logic when all replication types and
// factors are handled by pipeline creator
pipeline = pipelineManager.createPipeline(type, factor);
+ // wait until pipeline is ready
+ pipelineManager.waitPipelineReady(pipeline.getId(), 0);
} catch (IOException e) {
LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " +
"get pipelines call once.", type, factor, e);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 7dde8d7..cefc185 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -253,6 +253,7 @@ public class ContainerStateManager {
// TODO: #CLUTIL remove creation logic when all replication types and
// factors are handled by pipeline creator job.
pipeline = pipelineManager.createPipeline(type, replicationFactor);
+ pipelineManager.waitPipelineReady(pipeline.getId(), 0);
} catch (IOException e) {
final List<Pipeline> pipelines = pipelineManager
.getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 43d396e..6de05fd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.events;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -97,15 +98,14 @@ public final class SCMEvents {
new TypedEvent<>(PipelineReportFromDatanode.class, "Pipeline_Report");
/**
- * PipelineReport processed by pipeline report handler. This event is
+ * Open pipeline event sent by PipelineReportHandler. This event is
* received by HealthyPipelineSafeModeRule.
*/
- public static final TypedEvent<PipelineReportFromDatanode>
- PROCESSED_PIPELINE_REPORT = new TypedEvent<>(
- PipelineReportFromDatanode.class, "Processed_Pipeline_Report");
+ public static final TypedEvent<Pipeline>
+ OPEN_PIPELINE = new TypedEvent<>(Pipeline.class, "Open_Pipeline");
/**
- * PipelineActions are sent by Datanode. This event is received by
+ * PipelineActions are sent by Datanode to close a pipeline. It's received by
* SCMDatanodeHeartbeatDispatcher and PIPELINE_ACTIONS event is generated.
*/
public static final TypedEvent<PipelineActionsFromDatanode>
@@ -113,7 +113,7 @@ public final class SCMEvents {
"Pipeline_Actions");
/**
- * A Command status report will be sent by datanodes. This repoort is received
+ * A Command status report will be sent by datanodes. This report is received
* by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated.
*/
public static final TypedEvent<CommandStatusReportFromDatanode>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index 6873566..4065c2f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -41,6 +42,7 @@ class BackgroundPipelineCreator {
private final AtomicBoolean isPipelineCreatorRunning;
private final PipelineManager pipelineManager;
private final Configuration conf;
+ private ScheduledFuture<?> periodicTask;
BackgroundPipelineCreator(PipelineManager pipelineManager,
Scheduler scheduler, Configuration conf) {
@@ -57,13 +59,16 @@ class BackgroundPipelineCreator {
/**
* Schedules a fixed interval job to create pipelines.
*/
- void startFixedIntervalPipelineCreator() {
+ synchronized void startFixedIntervalPipelineCreator() {
+ if (periodicTask != null) {
+ return;
+ }
long intervalInMillis = conf
.getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL,
ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
// TODO: #CLUTIL We can start the job asap
- scheduler.scheduleWithFixedDelay(() -> {
+ periodicTask = scheduler.scheduleWithFixedDelay(() -> {
if (!shouldSchedulePipelineCreator()) {
return;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index 77e037a..86ad5ee 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -24,7 +24,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+
import java.io.IOException;
import java.util.HashMap;
@@ -39,12 +40,13 @@ public final class PipelineFactory {
private Map<ReplicationType, PipelineProvider> providers;
PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager,
- Configuration conf, GrpcTlsConfig tlsConfig) {
+ Configuration conf, EventPublisher eventPublisher) {
providers = new HashMap<>();
providers.put(ReplicationType.STAND_ALONE,
new SimplePipelineProvider(nodeManager));
providers.put(ReplicationType.RATIS,
- new RatisPipelineProvider(nodeManager, stateManager, conf, tlsConfig));
+ new RatisPipelineProvider(nodeManager, stateManager, conf,
+ eventPublisher));
}
@VisibleForTesting
@@ -63,6 +65,11 @@ public final class PipelineFactory {
return providers.get(type).create(factor, nodes);
}
+ public void close(ReplicationType type, Pipeline pipeline)
+ throws IOException {
+ providers.get(type).close(pipeline);
+ }
+
public void shutdown() {
providers.values().forEach(provider -> provider.shutdown());
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 9ba5f31..779008f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.ratis.grpc.GrpcTlsConfig;
import java.io.Closeable;
import java.io.IOException;
@@ -51,6 +50,9 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
ReplicationFactor factor);
List<Pipeline> getPipelines(ReplicationType type,
+ Pipeline.PipelineState state);
+
+ List<Pipeline> getPipelines(ReplicationType type,
ReplicationFactor factor, Pipeline.PipelineState state);
List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor,
@@ -95,5 +97,14 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
*/
void deactivatePipeline(PipelineID pipelineID) throws IOException;
- GrpcTlsConfig getGrpcTlsConfig();
+ /**
+ * Wait a pipeline to be OPEN.
+ *
+ * @param pipelineID ID of the pipeline to wait for.
+ * @param timeout wait timeout(millisecond), if 0, use default timeout
+ * @throws IOException in case of any Exception, such as timeout
+ */
+ default void waitPipelineReady(PipelineID pipelineID, long timeout)
+ throws IOException {
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
index a0ce216..c00ff78 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
@@ -33,5 +33,7 @@ public interface PipelineProvider {
Pipeline create(ReplicationFactor factor, List<DatanodeDetails> nodes);
+ void close(Pipeline pipeline) throws IOException;
+
void shutdown();
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
index b8cb7b4..a7e2bf1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
@@ -19,18 +19,23 @@
package org.apache.hadoop.hdds.scm.pipeline;
import java.io.IOException;
-import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineReport;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server
+ .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,10 +55,8 @@ public class PipelineReportHandler implements
private final boolean pipelineAvailabilityCheck;
public PipelineReportHandler(SCMSafeModeManager scmSafeModeManager,
- PipelineManager pipelineManager,
- Configuration conf) {
+ PipelineManager pipelineManager, Configuration conf) {
Preconditions.checkNotNull(pipelineManager);
- Objects.requireNonNull(scmSafeModeManager);
this.scmSafeModeManager = scmSafeModeManager;
this.pipelineManager = pipelineManager;
this.conf = conf;
@@ -76,48 +79,45 @@ public class PipelineReportHandler implements
}
for (PipelineReport report : pipelineReport.getPipelineReportList()) {
try {
- processPipelineReport(report, dn);
+ processPipelineReport(report, dn, publisher);
} catch (IOException e) {
LOGGER.error("Could not process pipeline report={} from dn={} {}",
report, dn, e);
}
}
- if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) {
- publisher.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
- pipelineReportFromDatanode);
- }
}
- private void processPipelineReport(PipelineReport report, DatanodeDetails dn)
- throws IOException {
+ private void processPipelineReport(PipelineReport report, DatanodeDetails dn,
+ EventPublisher publisher) throws IOException {
PipelineID pipelineID = PipelineID.getFromProtobuf(report.getPipelineID());
Pipeline pipeline;
try {
pipeline = pipelineManager.getPipeline(pipelineID);
} catch (PipelineNotFoundException e) {
- RatisPipelineUtils.destroyPipeline(dn, pipelineID, conf,
- pipelineManager.getGrpcTlsConfig());
+ final ClosePipelineCommand closeCommand =
+ new ClosePipelineCommand(pipelineID);
+ final CommandForDatanode datanodeCommand =
+ new CommandForDatanode<>(dn.getUuid(), closeCommand);
+ publisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
return;
}
pipeline.reportDatanode(dn);
- if (report.getIsLeader()) {
+ // ONE replica pipeline doesn't have leader flag
+ if (report.getIsLeader() ||
+ pipeline.getFactor() == HddsProtos.ReplicationFactor.ONE) {
pipeline.setLeaderId(dn.getUuid());
}
- if ((pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED)
- && pipeline.isHealthy()) {
- pipelineManager.openPipeline(pipelineID);
- }
if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
-
-
- if (report.getIsLeader()) {
- // Pipeline reported as the leader
- pipeline.setLeaderId(dn.getUuid());
+ LOGGER.info("Pipeline {} {} reported by {}", pipeline.getFactor(),
+ pipeline.getId(), dn);
+ if (pipeline.isHealthy()) {
pipelineManager.openPipeline(pipelineID);
+ if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) {
+ publisher.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
+ }
}
}
- pipeline.reportDatanode(dn);
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index 2410b54..180d0bf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -129,9 +129,9 @@ class PipelineStateManager {
throw new IOException("Closed pipeline can not be opened");
}
if (pipeline.getPipelineState() == PipelineState.ALLOCATED) {
- pipeline = pipelineStateMap.updatePipelineState(
- pipelineId, PipelineState.OPEN);
LOG.info("Pipeline {} moved to OPEN state", pipeline.toString());
+ pipeline = pipelineStateMap
+ .updatePipelineState(pipelineId, PipelineState.OPEN);
}
return pipeline;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 94443dd..a2ee50a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -24,37 +24,28 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+ .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+ .SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
-import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.hdds.ratis.RatisHelper;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.function.CheckedBiConsumer;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -69,6 +60,7 @@ public class RatisPipelineProvider implements PipelineProvider {
private final NodeManager nodeManager;
private final PipelineStateManager stateManager;
private final Configuration conf;
+ private final EventPublisher eventPublisher;
// Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines.
private final int parallelismForPool = 3;
@@ -83,15 +75,14 @@ public class RatisPipelineProvider implements PipelineProvider {
private final ForkJoinPool forkJoinPool = new ForkJoinPool(
parallelismForPool, factory, null, false);
- private final GrpcTlsConfig tlsConfig;
RatisPipelineProvider(NodeManager nodeManager,
PipelineStateManager stateManager, Configuration conf,
- GrpcTlsConfig tlsConfig) {
+ EventPublisher eventPublisher) {
this.nodeManager = nodeManager;
this.stateManager = stateManager;
this.conf = conf;
- this.tlsConfig = tlsConfig;
+ this.eventPublisher = eventPublisher;
}
@@ -153,8 +144,27 @@ public class RatisPipelineProvider implements PipelineProvider {
throw new InsufficientDatanodesException(e);
}
- Pipeline pipeline = create(factor, dns);
- initializePipeline(pipeline);
+ Pipeline pipeline = Pipeline.newBuilder()
+ .setId(PipelineID.randomId())
+ .setState(PipelineState.ALLOCATED)
+ .setType(ReplicationType.RATIS)
+ .setFactor(factor)
+ .setNodes(dns)
+ .build();
+
+ // Send command to datanodes to create pipeline
+ final CreatePipelineCommand createCommand =
+ new CreatePipelineCommand(pipeline.getId(), pipeline.getType(),
+ factor, dns);
+
+ dns.stream().forEach(node -> {
+ final CommandForDatanode datanodeCommand =
+ new CommandForDatanode<>(node.getUuid(), createCommand);
+ LOG.info("Send pipeline:{} create command to datanode {}",
+ pipeline.getId(), datanodeCommand.getDatanodeId());
+ eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+ });
+
return pipeline;
}
@@ -181,69 +191,22 @@ public class RatisPipelineProvider implements PipelineProvider {
}
}
- protected void initializePipeline(Pipeline pipeline) throws IOException {
- final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
- if (LOG.isDebugEnabled()) {
- LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group);
- }
- callRatisRpc(pipeline.getNodes(),
- (raftClient, peer) -> {
- RaftClientReply reply = raftClient.groupAdd(group, peer.getId());
- if (reply == null || !reply.isSuccess()) {
- String msg = "Pipeline initialization failed for pipeline:"
- + pipeline.getId() + " node:" + peer.getId();
- LOG.error(msg);
- throw new IOException(msg);
- }
- });
- }
-
- private void callRatisRpc(List<DatanodeDetails> datanodes,
- CheckedBiConsumer< RaftClient, RaftPeer, IOException> rpc)
- throws IOException {
- if (datanodes.isEmpty()) {
- return;
- }
-
- final String rpcType = conf
- .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
- ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
- final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf);
- final List< IOException > exceptions =
- Collections.synchronizedList(new ArrayList<>());
- final int maxOutstandingRequests =
- HddsClientUtils.getMaxOutstandingRequests(conf);
- final TimeDuration requestTimeout =
- RatisHelper.getClientRequestTimeout(conf);
- try {
- forkJoinPool.submit(() -> {
- datanodes.parallelStream().forEach(d -> {
- final RaftPeer p = RatisHelper.toRaftPeer(d);
- try (RaftClient client = RatisHelper
- .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
- retryPolicy, maxOutstandingRequests, tlsConfig,
- requestTimeout)) {
- rpc.accept(client, p);
- } catch (IOException ioe) {
- String errMsg =
- "Failed invoke Ratis rpc " + rpc + " for " + d.getUuid();
- LOG.error(errMsg, ioe);
- exceptions.add(new IOException(errMsg, ioe));
- }
- });
- }).get();
- } catch (ExecutionException | RejectedExecutionException ex) {
- LOG.error(ex.getClass().getName() + " exception occurred during " +
- "createPipeline", ex);
- throw new IOException(ex.getClass().getName() + " exception occurred " +
- "during createPipeline", ex);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupt exception occurred during " +
- "createPipeline", ex);
- }
- if (!exceptions.isEmpty()) {
- throw MultipleIOException.createIOException(exceptions);
- }
+ /**
+ * Removes pipeline from SCM. Sends command to destroy pipeline on all
+ * the datanodes.
+ *
+ * @param pipeline - Pipeline to be destroyed
+ * @throws IOException
+ */
+ public void close(Pipeline pipeline) {
+ final ClosePipelineCommand closeCommand =
+ new ClosePipelineCommand(pipeline.getId());
+ pipeline.getNodes().stream().forEach(node -> {
+ final CommandForDatanode datanodeCommand =
+ new CommandForDatanode<>(node.getUuid(), closeCommand);
+ LOG.info("Send pipeline:{} close command to datanode {}",
+ pipeline.getId(), datanodeCommand.getDatanodeId());
+ eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+ });
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
deleted file mode 100644
index 497e717..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.pipeline;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.ratis.RatisHelper;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.util.TimeDuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Utility class for Ratis pipelines. Contains methods to create and destroy
- * ratis pipelines.
- */
-public final class RatisPipelineUtils {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(RatisPipelineUtils.class);
-
- private RatisPipelineUtils() {
- }
- /**
- * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
- * the datanodes.
- *
- * @param pipeline - Pipeline to be destroyed
- * @param ozoneConf - Ozone configuration
- * @param grpcTlsConfig
- * @throws IOException
- */
- public static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf,
- GrpcTlsConfig grpcTlsConfig) {
- final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
- if (LOG.isDebugEnabled()) {
- LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
- }
- for (DatanodeDetails dn : pipeline.getNodes()) {
- try {
- destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig);
- } catch (IOException e) {
- LOG.warn("Pipeline destroy failed for pipeline={} dn={}",
- pipeline.getId(), dn);
- }
- }
- }
-
- /**
- * Sends ratis command to destroy pipeline on the given datanode.
- *
- * @param dn - Datanode on which pipeline needs to be destroyed
- * @param pipelineID - ID of pipeline to be destroyed
- * @param ozoneConf - Ozone configuration
- * @param grpcTlsConfig - grpc tls configuration
- * @throws IOException
- */
- static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
- Configuration ozoneConf, GrpcTlsConfig grpcTlsConfig) throws IOException {
- final String rpcType = ozoneConf
- .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
- ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
- final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
- final RaftPeer p = RatisHelper.toRaftPeer(dn);
- final int maxOutstandingRequests =
- HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
- final TimeDuration requestTimeout =
- RatisHelper.getClientRequestTimeout(ozoneConf);
- try(RaftClient client = RatisHelper
- .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
- retryPolicy, maxOutstandingRequests, grpcTlsConfig,
- requestTimeout)) {
- client.groupRemove(RaftGroupId.valueOf(pipelineID.getId()),
- true, p.getId());
- }
- }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 0964f6d..00a4429 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -37,7 +38,7 @@ import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.MetadataStore;
import org.apache.hadoop.hdds.utils.MetadataStoreBuilder;
import org.apache.hadoop.hdds.utils.Scheduler;
-import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,18 +82,18 @@ public class SCMPipelineManager implements PipelineManager {
private final NodeManager nodeManager;
private final SCMPipelineMetrics metrics;
private final Configuration conf;
+ private long pipelineWaitDefaultTimeout;
// Pipeline Manager MXBean
private ObjectName pmInfoBean;
- private GrpcTlsConfig grpcTlsConfig;
public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
- EventPublisher eventPublisher, GrpcTlsConfig grpcTlsConfig)
+ EventPublisher eventPublisher)
throws IOException {
this.lock = new ReentrantReadWriteLock();
this.conf = conf;
this.stateManager = new PipelineStateManager(conf);
this.pipelineFactory = new PipelineFactory(nodeManager, stateManager,
- conf, grpcTlsConfig);
+ conf, eventPublisher);
// TODO: See if thread priority needs to be set for these threads
scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
this.backgroundPipelineCreator =
@@ -113,8 +114,11 @@ public class SCMPipelineManager implements PipelineManager {
this.metrics = SCMPipelineMetrics.create();
this.pmInfoBean = MBeans.register("SCMPipelineManager",
"SCMPipelineManagerInfo", this);
+ this.pipelineWaitDefaultTimeout = conf.getTimeDuration(
+ HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL,
+ HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
initializePipelineState();
- this.grpcTlsConfig = grpcTlsConfig;
}
public PipelineStateManager getStateManager() {
@@ -148,8 +152,8 @@ public class SCMPipelineManager implements PipelineManager {
}
@Override
- public synchronized Pipeline createPipeline(
- ReplicationType type, ReplicationFactor factor) throws IOException {
+ public synchronized Pipeline createPipeline(ReplicationType type,
+ ReplicationFactor factor) throws IOException {
lock.writeLock().lock();
try {
Pipeline pipeline = pipelineFactory.create(type, factor);
@@ -157,8 +161,11 @@ public class SCMPipelineManager implements PipelineManager {
pipeline.getProtobufMessage().toByteArray());
stateManager.addPipeline(pipeline);
nodeManager.addPipeline(pipeline);
- metrics.incNumPipelineCreated();
- metrics.createPerPipelineMetrics(pipeline);
+ metrics.incNumPipelineAllocated();
+ if (pipeline.isOpen()) {
+ metrics.incNumPipelineCreated();
+ metrics.createPerPipelineMetrics(pipeline);
+ }
return pipeline;
} catch (InsufficientDatanodesException idEx) {
throw idEx;
@@ -225,6 +232,16 @@ public class SCMPipelineManager implements PipelineManager {
}
}
+ public List<Pipeline> getPipelines(ReplicationType type,
+ Pipeline.PipelineState state) {
+ lock.readLock().lock();
+ try {
+ return stateManager.getPipelines(type, state);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
@Override
public List<Pipeline> getPipelines(ReplicationType type,
ReplicationFactor factor, Pipeline.PipelineState state) {
@@ -293,6 +310,7 @@ public class SCMPipelineManager implements PipelineManager {
lock.writeLock().lock();
try {
Pipeline pipeline = stateManager.openPipeline(pipelineId);
+ metrics.incNumPipelineCreated();
metrics.createPerPipelineMetrics(pipeline);
} finally {
lock.writeLock().unlock();
@@ -380,6 +398,45 @@ public class SCMPipelineManager implements PipelineManager {
}
/**
+ * Wait a pipeline to be OPEN.
+ *
+ * @param pipelineID ID of the pipeline to wait for.
+ * @param timeout wait timeout, millisecond, 0 to use default value
+ * @throws IOException in case of any Exception, such as timeout
+ */
+ @Override
+ public void waitPipelineReady(PipelineID pipelineID, long timeout)
+ throws IOException {
+ long st = Time.monotonicNow();
+ if (timeout == 0) {
+ timeout = pipelineWaitDefaultTimeout;
+ }
+
+ boolean ready;
+ Pipeline pipeline;
+ do {
+ try {
+ pipeline = stateManager.getPipeline(pipelineID);
+ } catch (PipelineNotFoundException e) {
+ throw new PipelineNotFoundException(String.format(
+ "Pipeline %s cannot be found", pipelineID));
+ }
+ ready = pipeline.isOpen();
+ if (!ready) {
+ try {
+ Thread.sleep((long)100);
+ } catch (InterruptedException e) {
+ }
+ }
+ } while (!ready && Time.monotonicNow() - st < timeout);
+
+ if (!ready) {
+ throw new IOException(String.format("Pipeline %s is not ready in %d ms",
+ pipelineID, timeout));
+ }
+ }
+
+ /**
* Moves the pipeline to CLOSED state and sends close container command for
* all the containers in the pipeline.
*
@@ -408,7 +465,7 @@ public class SCMPipelineManager implements PipelineManager {
* @throws IOException
*/
private void destroyPipeline(Pipeline pipeline) throws IOException {
- RatisPipelineUtils.destroyPipeline(pipeline, conf, grpcTlsConfig);
+ pipelineFactory.close(pipeline.getType(), pipeline);
// remove the pipeline from the pipeline manager
removePipeline(pipeline.getId());
triggerPipelineCreation();
@@ -441,11 +498,6 @@ public class SCMPipelineManager implements PipelineManager {
}
@Override
- public GrpcTlsConfig getGrpcTlsConfig() {
- return grpcTlsConfig;
- }
-
- @Override
public void close() throws IOException {
if (scheduler != null) {
scheduler.close();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
index b6a1445..40a6f29 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
@@ -47,6 +47,7 @@ public final class SCMPipelineMetrics implements MetricsSource {
private MetricsRegistry registry;
+ private @Metric MutableCounterLong numPipelineAllocated;
private @Metric MutableCounterLong numPipelineCreated;
private @Metric MutableCounterLong numPipelineCreationFailed;
private @Metric MutableCounterLong numPipelineDestroyed;
@@ -84,6 +85,7 @@ public final class SCMPipelineMetrics implements MetricsSource {
@SuppressWarnings("SuspiciousMethodCalls")
public void getMetrics(MetricsCollector collector, boolean all) {
MetricsRecordBuilder recordBuilder = collector.addRecord(SOURCE_NAME);
+ numPipelineAllocated.snapshot(recordBuilder, true);
numPipelineCreated.snapshot(recordBuilder, true);
numPipelineCreationFailed.snapshot(recordBuilder, true);
numPipelineDestroyed.snapshot(recordBuilder, true);
@@ -118,6 +120,14 @@ public final class SCMPipelineMetrics implements MetricsSource {
}
/**
+ * Increments number of pipeline allocation count, including succeeded
+ * and failed.
+ */
+ void incNumPipelineAllocated() {
+ numPipelineAllocated.incr();
+ }
+
+ /**
* Increments number of successful pipeline creation count.
*/
void incNumPipelineCreated() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index ab98dfa..00cb7ae 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -74,6 +74,11 @@ public class SimplePipelineProvider implements PipelineProvider {
}
@Override
+ public void close(Pipeline pipeline) throws IOException {
+
+ }
+
+ @Override
public void shutdown() {
// Do nothing.
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 2f9a66f..9b19acf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -17,27 +17,19 @@
*/
package org.apache.hadoop.hdds.scm.safemode;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.server.events.TypedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
/**
* Class defining Safe mode exit criteria for Pipelines.
@@ -47,43 +39,55 @@ import com.google.common.base.Preconditions;
* through in a cluster.
*/
public class HealthyPipelineSafeModeRule
- extends SafeModeExitRule<PipelineReportFromDatanode>{
+ extends SafeModeExitRule<Pipeline>{
public static final Logger LOG =
LoggerFactory.getLogger(HealthyPipelineSafeModeRule.class);
- private final PipelineManager pipelineManager;
private int healthyPipelineThresholdCount;
private int currentHealthyPipelineCount = 0;
- private final Map<PipelineID, Boolean> processedPipelines = new HashMap<>();
private final double healthyPipelinesPercent;
HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
PipelineManager pipelineManager,
SCMSafeModeManager manager, Configuration configuration) {
super(manager, ruleName, eventQueue);
- this.pipelineManager = pipelineManager;
healthyPipelinesPercent =
configuration.getDouble(HddsConfigKeys.
HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT,
HddsConfigKeys.
HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT);
+ int minHealthyPipelines = 0;
+
+ boolean createPipelineInSafemode = configuration.getBoolean(
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
+
+ if (createPipelineInSafemode) {
+ minHealthyPipelines =
+ configuration.getInt(HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_PIPELINE,
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_PIPELINE_DEFAULT);
+ }
+
Preconditions.checkArgument(
(healthyPipelinesPercent >= 0.0 && healthyPipelinesPercent <= 1.0),
HddsConfigKeys.
HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT
+ " value should be >= 0.0 and <= 1.0");
- // As we want to wait for 3 node pipelines
- int pipelineCount =
+ // We want to wait for RATIS THREE factor write pipelines
+ int pipelineCount = pipelineManager.getPipelines(
+ HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
+ Pipeline.PipelineState.OPEN).size() +
pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
- HddsProtos.ReplicationFactor.THREE).size();
+ HddsProtos.ReplicationFactor.THREE,
+ Pipeline.PipelineState.ALLOCATED).size();
// This value will be zero when pipeline count is 0.
// On a fresh installed cluster, there will be zero pipelines in the SCM
// pipeline DB.
- healthyPipelineThresholdCount =
- (int) Math.ceil(healthyPipelinesPercent * pipelineCount);
+ healthyPipelineThresholdCount = Math.max(minHealthyPipelines,
+ (int) Math.ceil(healthyPipelinesPercent * pipelineCount));
LOG.info(" Total pipeline count is {}, healthy pipeline " +
"threshold count is {}", pipelineCount, healthyPipelineThresholdCount);
@@ -99,8 +103,8 @@ public class HealthyPipelineSafeModeRule
}
@Override
- protected TypedEvent<PipelineReportFromDatanode> getEventType() {
- return SCMEvents.PROCESSED_PIPELINE_REPORT;
+ protected TypedEvent<Pipeline> getEventType() {
+ return SCMEvents.OPEN_PIPELINE;
}
@Override
@@ -112,38 +116,18 @@ public class HealthyPipelineSafeModeRule
}
@Override
- protected void process(PipelineReportFromDatanode
- pipelineReportFromDatanode) {
+ protected void process(Pipeline pipeline) {
// When SCM is in safe mode for long time, already registered
- // datanode can send pipeline report again, then pipeline handler fires
- // processed report event, we should not consider this pipeline report
- // from datanode again during threshold calculation.
- Preconditions.checkNotNull(pipelineReportFromDatanode);
-
- PipelineReportsProto pipelineReport =
- pipelineReportFromDatanode.getReport();
-
- for (PipelineReport report : pipelineReport.getPipelineReportList()) {
- PipelineID pipelineID = PipelineID.getFromProtobuf(
- report.getPipelineID());
- Pipeline pipeline;
- try {
- pipeline = pipelineManager.getPipeline(pipelineID);
- } catch (PipelineNotFoundException e) {
- continue;
- }
-
- if (!processedPipelines.containsKey(pipelineID)) {
- if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
- report.getIsLeader()) {
- // If the pipeline gets reported with a leader we mark it as healthy
- currentHealthyPipelineCount++;
- getSafeModeMetrics().incCurrentHealthyPipelinesCount();
- processedPipelines.put(pipelineID, Boolean.TRUE);
- }
- }
+ // datanode can send pipeline report again, or SCMPipelineManager will
+ // create new pipelines.
+ Preconditions.checkNotNull(pipeline);
+ if (pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
+ pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE) {
+ getSafeModeMetrics().incCurrentHealthyPipelinesCount();
+ currentHealthyPipelineCount++;
}
+
if (scmInSafeMode()) {
SCMSafeModeManager.getLogger().info(
"SCM in safe mode. Healthy pipelines reported count is {}, " +
@@ -154,7 +138,6 @@ public class HealthyPipelineSafeModeRule
@Override
protected void cleanup() {
- processedPipelines.clear();
}
@VisibleForTesting
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
index 841d8ff..0783d02 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
@@ -22,17 +22,10 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.
- PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.server.events.TypedEvent;
import org.slf4j.Logger;
@@ -40,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* This rule covers whether we have at least one datanode is reported for each
@@ -47,14 +41,14 @@ import java.util.Set;
* replica available for read when we exit safe mode.
*/
public class OneReplicaPipelineSafeModeRule extends
- SafeModeExitRule<PipelineReportFromDatanode> {
+ SafeModeExitRule<Pipeline> {
private static final Logger LOG =
LoggerFactory.getLogger(OneReplicaPipelineSafeModeRule.class);
private int thresholdCount;
private Set<PipelineID> reportedPipelineIDSet = new HashSet<>();
- private final PipelineManager pipelineManager;
+ private Set<PipelineID> oldPipelineIDSet;
private int currentReportedPipelineCount = 0;
@@ -62,7 +56,6 @@ public class OneReplicaPipelineSafeModeRule extends
PipelineManager pipelineManager,
SCMSafeModeManager safeModeManager, Configuration configuration) {
super(safeModeManager, ruleName, eventQueue);
- this.pipelineManager = pipelineManager;
double percent =
configuration.getDouble(
@@ -75,24 +68,25 @@ public class OneReplicaPipelineSafeModeRule extends
HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT +
" value should be >= 0.0 and <= 1.0");
- int totalPipelineCount =
- pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
- HddsProtos.ReplicationFactor.THREE).size();
+ oldPipelineIDSet = pipelineManager.getPipelines(
+ HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE)
+ .stream().map(p -> p.getId()).collect(Collectors.toSet());
+ int totalPipelineCount = oldPipelineIDSet.size();
thresholdCount = (int) Math.ceil(percent * totalPipelineCount);
- LOG.info(" Total pipeline count is {}, pipeline's with atleast one " +
+ LOG.info("Total pipeline count is {}, pipeline's with at least one " +
"datanode reported threshold count is {}", totalPipelineCount,
thresholdCount);
getSafeModeMetrics().setNumPipelinesWithAtleastOneReplicaReportedThreshold(
thresholdCount);
-
}
@Override
- protected TypedEvent<PipelineReportFromDatanode> getEventType() {
- return SCMEvents.PROCESSED_PIPELINE_REPORT;
+ protected TypedEvent<Pipeline> getEventType() {
+ return SCMEvents.OPEN_PIPELINE;
}
@Override
@@ -104,40 +98,26 @@ public class OneReplicaPipelineSafeModeRule extends
}
@Override
- protected void process(PipelineReportFromDatanode
- pipelineReportFromDatanode) {
- Pipeline pipeline;
- Preconditions.checkNotNull(pipelineReportFromDatanode);
- PipelineReportsProto pipelineReport =
- pipelineReportFromDatanode.getReport();
-
- for (PipelineReport report : pipelineReport.getPipelineReportList()) {
- PipelineID pipelineID = PipelineID
- .getFromProtobuf(report.getPipelineID());
- try {
- pipeline = pipelineManager.getPipeline(pipelineID);
- } catch (PipelineNotFoundException e) {
- continue;
- }
-
- if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
- !reportedPipelineIDSet.contains(pipelineID)) {
- reportedPipelineIDSet.add(pipelineID);
+ protected void process(Pipeline pipeline) {
+ Preconditions.checkNotNull(pipeline);
+ if (pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
+ pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
+ !reportedPipelineIDSet.contains(pipeline.getId())) {
+ if (oldPipelineIDSet.contains(pipeline.getId())) {
getSafeModeMetrics()
.incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount();
+ currentReportedPipelineCount++;
+ reportedPipelineIDSet.add(pipeline.getId());
}
}
- currentReportedPipelineCount = reportedPipelineIDSet.size();
-
if (scmInSafeMode()) {
SCMSafeModeManager.getLogger().info(
- "SCM in safe mode. Pipelines with atleast one datanode reported " +
- "count is {}, required atleast one datanode reported per " +
+ "SCM in safe mode. Pipelines with at least one datanode reported " +
+ "count is {}, required at least one datanode reported per " +
"pipeline count is {}",
currentReportedPipelineCount, thresholdCount);
}
-
}
@Override
@@ -154,5 +134,4 @@ public class OneReplicaPipelineSafeModeRule extends
public int getCurrentReportedPipelineCount() {
return currentReportedPipelineCount;
}
-
-}
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
index a22d162..1e83bc4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
@@ -59,17 +59,17 @@ import org.slf4j.LoggerFactory;
* number of datanode registered is met or not.
*
* 3. HealthyPipelineSafeModeRule:
- * Once the pipelineReportHandler processes the
+ * Once the PipelineReportHandler processes the
* {@link SCMEvents#PIPELINE_REPORT}, it fires
- * {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this
+ * {@link SCMEvents#OPEN_PIPELINE}. This rule handles this
* event. This rule processes this report, and check if pipeline is healthy
* and increments current healthy pipeline count. Then validate it cutoff
* threshold for healthy pipeline is met or not.
*
* 4. OneReplicaPipelineSafeModeRule:
- * Once the pipelineReportHandler processes the
+ * Once the PipelineReportHandler processes the
* {@link SCMEvents#PIPELINE_REPORT}, it fires
- * {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this
+ * {@link SCMEvents#OPEN_PIPELINE}. This rule handles this
* event. This rule processes this report, and add the reported pipeline to
* reported pipeline set. Then validate it cutoff threshold for one replica
* per pipeline is met or not.
@@ -135,6 +135,13 @@ public class SCMSafeModeManager {
oneReplicaPipelineSafeModeRule);
}
emitSafeModeStatus();
+ boolean createPipelineInSafemode = conf.getBoolean(
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
+
+ if (createPipelineInSafemode) {
+ pipelineManager.startPipelineCreator();
+ }
} else {
this.safeModeMetrics = null;
exitSafeMode(eventQueue);
@@ -166,6 +173,7 @@ public class SCMSafeModeManager {
if (exitRules.get(ruleName) != null) {
validatedRules.add(ruleName);
+ LOG.info("{} rule is successfully validated", ruleName);
} else {
// This should never happen
LOG.error("No Such Exit rule {}", ruleName);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
index 44d1c94..2fbe893 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
@@ -128,7 +128,8 @@ public class SafeModeHandler implements EventHandler<SafeModeStatus> {
List<Pipeline> pipelineList = scmPipelineManager.getPipelines();
pipelineList.forEach((pipeline) -> {
try {
- if (!pipeline.isHealthy()) {
+ if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
+ pipeline.isAllocationTimeout()) {
scmPipelineManager.finalizeAndDestroyPipeline(pipeline, false);
}
} catch (IOException ex) {
@@ -141,6 +142,4 @@ public class SafeModeHandler implements EventHandler<SafeModeStatus> {
public boolean getSafeModeStatus() {
return isInSafeMode.get();
}
-
-
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index 9f6077b..3dbb4cb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -164,6 +164,7 @@ public final class SCMDatanodeHeartbeatDispatcher {
}
if (heartbeat.getCommandStatusReportsCount() != 0) {
+ LOG.debug("Dispatching Command Status Report.");
for (CommandStatusReportsProto commandStatusReport : heartbeat
.getCommandStatusReportsList()) {
eventPublisher.fireEvent(
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 530c0a6..901bc2c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -61,6 +61,8 @@ import org.apache.hadoop.ozone.audit.Auditor;
import org.apache.hadoop.ozone.audit.SCMAction;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
@@ -79,6 +81,12 @@ import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProt
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.replicateContainerCommand;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reregisterCommand;
+import static org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
+ .createPipelineCommand;
+import static org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
+ .closePipelineCommand;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
@@ -329,6 +337,18 @@ public class SCMDatanodeProtocolServer implements
.setReplicateContainerCommandProto(
((ReplicateContainerCommand)cmd).getProto())
.build();
+ case createPipelineCommand:
+ return builder
+ .setCommandType(createPipelineCommand)
+ .setCreatePipelineCommandProto(
+ ((CreatePipelineCommand)cmd).getProto())
+ .build();
+ case closePipelineCommand:
+ return builder
+ .setCommandType(closePipelineCommand)
+ .setClosePipelineCommandProto(
+ ((ClosePipelineCommand)cmd).getProto())
+ .build();
default:
throw new IllegalArgumentException("Scm command " +
cmd.getType().toString() + " is not implemented");
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 21127f4..16ea094 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -400,8 +400,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
pipelineManager = configurator.getPipelineManager();
} else {
pipelineManager =
- new SCMPipelineManager(conf, scmNodeManager, eventQueue,
- grpcTlsConfig);
+ new SCMPipelineManager(conf, scmNodeManager, eventQueue);
}
if (configurator.getContainerManager() != null) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 1cb8376..baeb6dc 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.hdds.scm.server
.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
@@ -369,6 +371,16 @@ public final class TestUtils {
return new PipelineReportFromDatanode(dn, reportBuilder.build());
}
+ public static void openAllRatisPipelines(PipelineManager pipelineManager)
+ throws IOException {
+ // Pipeline is created by background thread
+ List<Pipeline> pipelines =
+ pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS);
+ // Trigger the processed pipeline report event
+ for (Pipeline pipeline : pipelines) {
+ pipelineManager.openPipeline(pipeline.getId());
+ }
+ }
public static PipelineActionsFromDatanode getPipelineActionFromDatanode(
DatanodeDetails dn, PipelineID... pipelineIDs) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index a012d64..aa190f4 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -24,11 +24,14 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
@@ -45,9 +48,13 @@ import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
@@ -94,14 +101,18 @@ public class TestBlockManager {
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, folder.newFolder().toString());
+ conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
+ conf.setTimeDuration(HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL, 5,
+ TimeUnit.SECONDS);
// Override the default Node Manager in SCM with this Mock Node Manager.
nodeManager = new MockNodeManager(true, 10);
+ eventQueue = new EventQueue();
pipelineManager =
- new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+ new SCMPipelineManager(conf, nodeManager, eventQueue);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
- pipelineManager.getStateManager(), conf);
+ pipelineManager.getStateManager(), conf, eventQueue);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
SCMConfigurator configurator = new SCMConfigurator();
@@ -112,12 +123,10 @@ public class TestBlockManager {
// Initialize these fields so that the tests can pass.
mapping = (SCMContainerManager) scm.getContainerManager();
blockManager = (BlockManagerImpl) scm.getScmBlockManager();
-
- eventQueue = new EventQueue();
- eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS,
- scm.getSafeModeHandler());
eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS,
scm.getSafeModeHandler());
+ DatanodeCommandHandler handler = new DatanodeCommandHandler();
+ eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, handler);
CloseContainerEventHandler closeContainerHandler =
new CloseContainerEventHandler(pipelineManager, mapping);
eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
@@ -136,6 +145,8 @@ public class TestBlockManager {
GenericTestUtils.waitFor(() -> {
return !blockManager.isScmInSafeMode();
}, 10, 1000 * 5);
+ pipelineManager.createPipeline(type, factor);
+ TestUtils.openAllRatisPipelines(pipelineManager);
AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
type, factor, OzoneConsts.OZONE, new ExcludeList());
Assert.assertNotNull(block);
@@ -153,6 +164,7 @@ public class TestBlockManager {
}
} catch (IOException e) {
}
+ TestUtils.openAllRatisPipelines(pipelineManager);
ExcludeList excludeList = new ExcludeList();
excludeList
.addPipeline(pipelineManager.getPipelines(type, factor).get(0).getId());
@@ -259,6 +271,7 @@ public class TestBlockManager {
pipelineManager.createPipeline(type, factor);
pipelineManager.createPipeline(type, factor);
+ TestUtils.openAllRatisPipelines(pipelineManager);
AllocatedBlock allocatedBlock = blockManager
.allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, OzoneConsts.OZONE,
@@ -305,6 +318,7 @@ public class TestBlockManager {
.getNumber(); i++) {
pipelineManager.createPipeline(type, factor);
}
+ TestUtils.openAllRatisPipelines(pipelineManager);
// wait till each pipeline has the configured number of containers.
// After this each pipeline has numContainerPerOwnerInPipeline containers
@@ -359,7 +373,23 @@ public class TestBlockManager {
Assert.assertNotNull(blockManager
.allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, OzoneConsts.OZONE,
new ExcludeList()));
- Assert.assertEquals(1, pipelineManager.getPipelines(type, factor).size());
}
+ private class DatanodeCommandHandler implements
+ EventHandler<CommandForDatanode> {
+
+ @Override
+ public void onMessage(final CommandForDatanode command,
+ final EventPublisher publisher) {
+ final SCMCommandProto.Type commandType = command.getCommand().getType();
+ if (commandType == SCMCommandProto.Type.createPipelineCommand) {
+ CreatePipelineCommand createCommand =
+ (CreatePipelineCommand) command.getCommand();
+ try {
+ pipelineManager.openPipeline(createCommand.getPipelineID());
+ } catch (IOException e) {
+ }
+ }
+ }
+ }
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index b022fd9..4f503e4 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
@@ -67,8 +68,9 @@ public class TestCloseContainerEventHandler {
configuration
.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
nodeManager = new MockNodeManager(true, 10);
+ eventQueue = new EventQueue();
pipelineManager =
- new SCMPipelineManager(configuration, nodeManager, eventQueue, null);
+ new SCMPipelineManager(configuration, nodeManager, eventQueue);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), configuration);
@@ -77,10 +79,13 @@ public class TestCloseContainerEventHandler {
containerManager = new
SCMContainerManager(configuration, nodeManager,
pipelineManager, new EventQueue());
- eventQueue = new EventQueue();
+ pipelineManager.triggerPipelineCreation();
eventQueue.addHandler(CLOSE_CONTAINER,
new CloseContainerEventHandler(pipelineManager, containerManager));
eventQueue.addHandler(DATANODE_COMMAND, nodeManager);
+ // Move all pipelines created by background from ALLOCATED to OPEN state
+ Thread.sleep(2000);
+ TestUtils.openAllRatisPipelines(pipelineManager);
}
@AfterClass
@@ -116,7 +121,6 @@ public class TestCloseContainerEventHandler {
@Test
public void testCloseContainerEventWithValidContainers() throws IOException {
-
ContainerInfo container = containerManager
.allocateContainer(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE, OzoneConsts.OZONE);
@@ -134,7 +138,6 @@ public class TestCloseContainerEventHandler {
@Test
public void testCloseContainerEventWithRatis() throws IOException {
-
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(CloseContainerEventHandler.LOG);
ContainerInfo container = containerManager
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
index 6436af0..fde94d7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
@@ -96,7 +96,7 @@ public class TestSCMContainerManager {
}
nodeManager = new MockNodeManager(true, 10);
pipelineManager =
- new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+ new SCMPipelineManager(conf, nodeManager, new EventQueue());
containerManager = new SCMContainerManager(conf, nodeManager,
pipelineManager, new EventQueue());
xceiverClientManager = new XceiverClientManager(conf);
@@ -147,7 +147,7 @@ public class TestSCMContainerManager {
containerInfo.getPipelineID()).getFirstNode()
.getUuid());
}
- Assert.assertTrue(pipelineList.size() > 5);
+ Assert.assertTrue(pipelineList.size() >= 1);
}
@Test
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 3e4508d..f786060 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -108,7 +108,7 @@ public class TestContainerPlacement {
final int cacheSize = config.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
PipelineManager pipelineManager =
- new SCMPipelineManager(config, scmNodeManager, eventQueue, null);
+ new SCMPipelineManager(config, scmNodeManager, eventQueue);
return new SCMContainerManager(config, scmNodeManager, pipelineManager,
eventQueue);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index c140119..1676af1 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -74,6 +74,7 @@ public class TestDeadNodeHandler {
private SCMNodeManager nodeManager;
private ContainerManager containerManager;
private NodeReportHandler nodeReportHandler;
+ private SCMPipelineManager pipelineManager;
private DeadNodeHandler deadNodeHandler;
private EventPublisher publisher;
private EventQueue eventQueue;
@@ -88,12 +89,12 @@ public class TestDeadNodeHandler {
eventQueue = new EventQueue();
scm = HddsTestUtils.getScm(conf);
nodeManager = (SCMNodeManager) scm.getScmNodeManager();
- SCMPipelineManager manager =
+ pipelineManager =
(SCMPipelineManager)scm.getPipelineManager();
PipelineProvider mockRatisProvider =
- new MockRatisPipelineProvider(nodeManager, manager.getStateManager(),
- conf);
- manager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+ new MockRatisPipelineProvider(nodeManager,
+ pipelineManager.getStateManager(), conf);
+ pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
containerManager = scm.getContainerManager();
deadNodeHandler = new DeadNodeHandler(nodeManager,
@@ -149,6 +150,8 @@ public class TestDeadNodeHandler {
nodeManager.register(TestUtils.randomDatanodeDetails(),
TestUtils.createNodeReport(storageOne), null);
+ TestUtils.openAllRatisPipelines(pipelineManager);
+
ContainerInfo container1 =
TestUtils.allocateContainer(containerManager);
ContainerInfo container2 =
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index db76d66..f4eb797 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -121,6 +121,7 @@ public class TestSCMNodeManager {
testDir.getAbsolutePath());
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
TimeUnit.MILLISECONDS);
+ conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
return conf;
}
@@ -1035,9 +1036,11 @@ public class TestSCMNodeManager {
eq.processAll(1000L);
List<SCMCommand> command =
nodemanager.processHeartbeat(datanodeDetails);
- Assert.assertEquals(1, command.size());
- Assert
- .assertEquals(command.get(0).getClass(), CloseContainerCommand.class);
+ // With dh registered, SCM will send create pipeline command to dn
+ Assert.assertTrue(command.size() >= 1);
+ Assert.assertTrue(command.get(0).getClass().equals(
+ CloseContainerCommand.class) ||
+ command.get(1).getClass().equals(CloseContainerCommand.class));
} catch (IOException e) {
e.printStackTrace();
throw e;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
index 342ee5b..25b0adc 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventQueue;
import java.io.IOException;
import java.util.List;
@@ -34,7 +36,13 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
public MockRatisPipelineProvider(NodeManager nodeManager,
PipelineStateManager stateManager,
Configuration conf) {
- super(nodeManager, stateManager, conf, null);
+ super(nodeManager, stateManager, conf, new EventQueue());
+ }
+
+ public MockRatisPipelineProvider(NodeManager nodeManager,
+ PipelineStateManager stateManager, Configuration conf,
+ EventPublisher eventPublisher) {
+ super(nodeManager, stateManager, conf, eventPublisher);
}
protected void initializePipeline(Pipeline pipeline) throws IOException {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 6f0425d..065b08b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -61,11 +61,13 @@ public class TestRatisPipelineProvider {
private void createPipelineAndAssertions(
HddsProtos.ReplicationFactor factor) throws IOException {
Pipeline pipeline = provider.create(factor);
- assertPipelineProperties(pipeline, factor, REPLICATION_TYPE);
+ assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
+ Pipeline.PipelineState.ALLOCATED);
stateManager.addPipeline(pipeline);
Pipeline pipeline1 = provider.create(factor);
- assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE);
+ assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE,
+ Pipeline.PipelineState.ALLOCATED);
// New pipeline should not overlap with the previous created pipeline
assertTrue(
intersection(pipeline.getNodes(), pipeline1.getNodes())
@@ -77,12 +79,14 @@ public class TestRatisPipelineProvider {
public void testCreatePipelineWithFactor() throws IOException {
HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
Pipeline pipeline = provider.create(factor);
- assertPipelineProperties(pipeline, factor, REPLICATION_TYPE);
+ assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
+ Pipeline.PipelineState.ALLOCATED);
stateManager.addPipeline(pipeline);
factor = HddsProtos.ReplicationFactor.ONE;
Pipeline pipeline1 = provider.create(factor);
- assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE);
+ assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE,
+ Pipeline.PipelineState.ALLOCATED);
stateManager.addPipeline(pipeline1);
// New pipeline should overlap with the previous created pipeline,
// and one datanode should overlap between the two types.
@@ -113,11 +117,13 @@ public class TestRatisPipelineProvider {
HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
Pipeline pipeline =
provider.create(factor, createListOfNodes(factor.getNumber()));
- assertPipelineProperties(pipeline, factor, REPLICATION_TYPE);
+ assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
+ Pipeline.PipelineState.OPEN);
factor = HddsProtos.ReplicationFactor.ONE;
pipeline = provider.create(factor, createListOfNodes(factor.getNumber()));
- assertPipelineProperties(pipeline, factor, REPLICATION_TYPE);
+ assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
+ Pipeline.PipelineState.OPEN);
}
@Test
@@ -141,7 +147,8 @@ public class TestRatisPipelineProvider {
// only 2 healthy DNs left that are not part of any pipeline
Pipeline pipeline = provider.create(factor);
- assertPipelineProperties(pipeline, factor, REPLICATION_TYPE);
+ assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
+ Pipeline.PipelineState.ALLOCATED);
List<DatanodeDetails> nodes = pipeline.getNodes();
@@ -156,8 +163,9 @@ public class TestRatisPipelineProvider {
private static void assertPipelineProperties(
Pipeline pipeline, HddsProtos.ReplicationFactor expectedFactor,
- HddsProtos.ReplicationType expectedReplicationType) {
- assertEquals(Pipeline.PipelineState.OPEN, pipeline.getPipelineState());
+ HddsProtos.ReplicationType expectedReplicationType,
+ Pipeline.PipelineState expectedState) {
+ assertEquals(expectedState, pipeline.getPipelineState());
assertEquals(expectedReplicationType, pipeline.getType());
assertEquals(expectedFactor, pipeline.getFactor());
assertEquals(expectedFactor.getNumber(), pipeline.getNodes().size());
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
index f6d9b0e..6ea1bfe 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
@@ -22,10 +22,6 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.
- StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.
- StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -34,7 +30,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
@@ -68,10 +63,11 @@ public class TestHealthyPipelineSafeModeRule {
// enable pipeline check
config.setBoolean(
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+ config.setBoolean(
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
- nodeManager, eventQueue, null);
+ nodeManager, eventQueue);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), config);
@@ -88,10 +84,8 @@ public class TestHealthyPipelineSafeModeRule {
} finally {
FileUtil.fullyDelete(new File(storageDir));
}
-
}
-
@Test
public void testHealthyPipelineSafeModeRuleWithPipelines() throws Exception {
@@ -113,10 +107,11 @@ public class TestHealthyPipelineSafeModeRule {
// enable pipeline check
config.setBoolean(
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+ config.setBoolean(
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
- nodeManager, eventQueue, null);
+ nodeManager, eventQueue);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
@@ -162,7 +157,6 @@ public class TestHealthyPipelineSafeModeRule {
} finally {
FileUtil.fullyDelete(new File(storageDir));
}
-
}
@@ -188,10 +182,11 @@ public class TestHealthyPipelineSafeModeRule {
// enable pipeline check
config.setBoolean(
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+ config.setBoolean(
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
- nodeManager, eventQueue, null);
+ nodeManager, eventQueue);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), config);
@@ -217,7 +212,7 @@ public class TestHealthyPipelineSafeModeRule {
scmSafeModeManager.getHealthyPipelineSafeModeRule();
- // No datanodes have sent pipelinereport from datanode
+ // No pipeline event have sent to SCMSafemodeManager
Assert.assertFalse(healthyPipelineSafeModeRule.validate());
@@ -225,12 +220,12 @@ public class TestHealthyPipelineSafeModeRule {
GenericTestUtils.LogCapturer.captureLogs(LoggerFactory.getLogger(
SCMSafeModeManager.class));
- // fire event with pipeline report with ratis type and factor 1
+ // fire event with pipeline create status with ratis type and factor 1
// pipeline, validate() should return false
firePipelineEvent(pipeline1, eventQueue);
GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains(
- "reported count is 0"),
+ "reported count is 1"),
1000, 5000);
Assert.assertFalse(healthyPipelineSafeModeRule.validate());
@@ -246,20 +241,7 @@ public class TestHealthyPipelineSafeModeRule {
}
-
private void firePipelineEvent(Pipeline pipeline, EventQueue eventQueue) {
- PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
- .newBuilder();
-
- reportBuilder.addPipelineReport(PipelineReport.newBuilder()
- .setPipelineID(pipeline.getId().getProtobuf())
- .setIsLeader(Boolean.TRUE));
-
- // Here no need to fire event from 3 nodes, as already pipeline is in
- // open state, but doing it.
- eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
- new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
- pipeline.getNodes().get(0), reportBuilder.build()));
+ eventQueue.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
}
-
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
index 7a09977..0fa5eae 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
@@ -20,10 +20,6 @@ package org.apache.hadoop.hdds.scm.safemode;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -32,7 +28,6 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
@@ -63,6 +58,8 @@ public class TestOneReplicaPipelineSafeModeRule {
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
ozoneConfiguration.set(HddsConfigKeys.OZONE_METADATA_DIRS,
folder.newFolder().toString());
+ ozoneConfiguration.setBoolean(
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
List<ContainerInfo> containers = new ArrayList<>();
containers.addAll(HddsTestUtils.getContainerInfo(1));
@@ -71,7 +68,7 @@ public class TestOneReplicaPipelineSafeModeRule {
eventQueue = new EventQueue();
pipelineManager =
new SCMPipelineManager(ozoneConfiguration, mockNodeManager,
- eventQueue, null);
+ eventQueue);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(mockNodeManager,
@@ -123,7 +120,6 @@ public class TestOneReplicaPipelineSafeModeRule {
firePipelineEvent(pipelines.get(pipelineFactorThreeCount - 1));
GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000);
-
}
@@ -170,11 +166,8 @@ public class TestOneReplicaPipelineSafeModeRule {
firePipelineEvent(pipelines.get(pipelineCountThree - 1));
GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000);
-
}
-
-
private void createPipelines(int count,
HddsProtos.ReplicationFactor factor) throws Exception {
for (int i = 0; i < count; i++) {
@@ -184,27 +177,6 @@ public class TestOneReplicaPipelineSafeModeRule {
}
private void firePipelineEvent(Pipeline pipeline) {
- PipelineReportsProto.Builder reportBuilder =
- PipelineReportsProto.newBuilder();
-
- reportBuilder.addPipelineReport(PipelineReport.newBuilder()
- .setPipelineID(pipeline.getId().getProtobuf())
- .setIsLeader(Boolean.TRUE));
-
- if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE) {
- eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
- new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
- pipeline.getNodes().get(0), reportBuilder.build()));
- eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
- new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
- pipeline.getNodes().get(1), reportBuilder.build()));
- eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
- new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
- pipeline.getNodes().get(2), reportBuilder.build()));
- } else {
- eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
- new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
- pipeline.getNodes().get(0), reportBuilder.build()));
- }
+ eventQueue.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
}
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index 1e608b3..b5839bc 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -32,8 +32,6 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -43,7 +41,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
@@ -73,6 +70,8 @@ public class TestSCMSafeModeManager {
public static void setUp() {
queue = new EventQueue();
config = new OzoneConfiguration();
+ config.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+ false);
}
@Test
@@ -177,7 +176,7 @@ public class TestSCMSafeModeManager {
HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT, healthyPercent);
conf.setDouble(HddsConfigKeys.
HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT, oneReplicaPercent);
-
+ conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
return conf;
}
@@ -199,7 +198,7 @@ public class TestSCMSafeModeManager {
0.9);
MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
PipelineManager pipelineManager = new SCMPipelineManager(conf,
- mockNodeManager, queue, null);
+ mockNodeManager, queue);
scmSafeModeManager = new SCMSafeModeManager(
conf, containers, pipelineManager, queue);
fail("testFailWithIncorrectValueForHealthyPipelinePercent");
@@ -217,7 +216,7 @@ public class TestSCMSafeModeManager {
200);
MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
PipelineManager pipelineManager = new SCMPipelineManager(conf,
- mockNodeManager, queue, null);
+ mockNodeManager, queue);
scmSafeModeManager = new SCMSafeModeManager(
conf, containers, pipelineManager, queue);
fail("testFailWithIncorrectValueForOneReplicaPipelinePercent");
@@ -234,7 +233,7 @@ public class TestSCMSafeModeManager {
conf.setDouble(HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT, -1.0);
MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
PipelineManager pipelineManager = new SCMPipelineManager(conf,
- mockNodeManager, queue, null);
+ mockNodeManager, queue);
scmSafeModeManager = new SCMSafeModeManager(
conf, containers, pipelineManager, queue);
fail("testFailWithIncorrectValueForSafeModePercent");
@@ -258,7 +257,7 @@ public class TestSCMSafeModeManager {
MockNodeManager mockNodeManager = new MockNodeManager(true, nodeCount);
SCMPipelineManager pipelineManager = new SCMPipelineManager(conf,
- mockNodeManager, queue, null);
+ mockNodeManager, queue);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(mockNodeManager,
pipelineManager.getStateManager(), config);
@@ -302,12 +301,12 @@ public class TestSCMSafeModeManager {
// we shall a get an event when datanode is registered. In that case,
// validate will return true, and add this to validatedRules.
if (Math.max(healthyPipelinePercent, oneReplicaThresholdCount) == 0) {
- firePipelineEvent(pipelines.get(0));
+ firePipelineEvent(pipelineManager, pipelines.get(0));
}
for (int i = 0; i < Math.max(healthyPipelineThresholdCount,
- oneReplicaThresholdCount); i++) {
- firePipelineEvent(pipelines.get(i));
+ Math.min(oneReplicaThresholdCount, pipelines.size())); i++) {
+ firePipelineEvent(pipelineManager, pipelines.get(i));
if (i < healthyPipelineThresholdCount) {
checkHealthy(i + 1);
@@ -352,16 +351,11 @@ public class TestSCMSafeModeManager {
1000, 5000);
}
- private void firePipelineEvent(Pipeline pipeline) throws Exception {
- PipelineReportsProto.Builder reportBuilder =
- PipelineReportsProto.newBuilder();
-
- reportBuilder.addPipelineReport(PipelineReport.newBuilder()
- .setPipelineID(pipeline.getId().getProtobuf())
- .setIsLeader(Boolean.TRUE));
- queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
- new PipelineReportFromDatanode(pipeline.getNodes().get(0),
- reportBuilder.build()));
+ private void firePipelineEvent(SCMPipelineManager pipelineManager,
+ Pipeline pipeline) throws Exception {
+ pipelineManager.openPipeline(pipeline.getId());
+ queue.fireEvent(SCMEvents.OPEN_PIPELINE,
+ pipelineManager.getPipeline(pipeline.getId()));
}
@@ -480,7 +474,7 @@ public class TestSCMSafeModeManager {
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
- nodeManager, queue, null);
+ nodeManager, queue);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
@@ -491,11 +485,6 @@ public class TestSCMSafeModeManager {
Pipeline pipeline = pipelineManager.createPipeline(
HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
- PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
- .newBuilder();
- reportBuilder.addPipelineReport(PipelineReport.newBuilder()
- .setPipelineID(pipeline.getId().getProtobuf())
- .setIsLeader(Boolean.TRUE));
scmSafeModeManager = new SCMSafeModeManager(
config, containers, pipelineManager, queue);
@@ -504,10 +493,9 @@ public class TestSCMSafeModeManager {
HddsTestUtils.createNodeRegistrationContainerReport(containers));
assertTrue(scmSafeModeManager.getInSafeMode());
- // Trigger the processed pipeline report event
- queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
- new PipelineReportFromDatanode(pipeline.getNodes().get(0),
- reportBuilder.build()));
+
+
+ firePipelineEvent(pipelineManager, pipeline);
GenericTestUtils.waitFor(() -> {
return !scmSafeModeManager.getInSafeMode();
diff --git a/hadoop-ozone/dist/src/main/compose/testlib.sh b/hadoop-ozone/dist/src/main/compose/testlib.sh
index b20dca8..684f2f5 100755
--- a/hadoop-ozone/dist/src/main/compose/testlib.sh
+++ b/hadoop-ozone/dist/src/main/compose/testlib.sh
@@ -80,6 +80,40 @@ wait_for_datanodes(){
return 1
}
+## @description wait until safemode exit (or 30 seconds)
+## @param the docker-compose file
+wait_for_safemode_exit(){
+ local compose_file=$1
+
+ #Reset the timer
+ SECONDS=0
+
+ #Don't give it up until 30 seconds
+ while [[ $SECONDS -lt 90 ]]; do
+
+ #This line checks the safemode status in scm
+ local command="ozone scmcli safemode status"
+ if [[ "${SECURITY_ENABLED}" == 'true' ]]; then
+ status=`docker-compose -f "${compose_file}" exec -T scm bash -c "kinit -k HTTP/scm@EXAMPLE.COM -t /etc/security/keytabs/HTTP.keytab && $command'"`
+ else
+ status=`docker-compose -f "${compose_file}" exec -T scm bash -c "$command"`
+ fi
+
+ echo $status
+ if [[ "$status" ]]; then
+ if [[ ${status} == "SCM is out of safe mode." ]]; then
+ #Safemode exits. Let's return from the function.
+ echo "Safe mode is off"
+ return
+ fi
+ fi
+
+ sleep 2
+ done
+ echo "WARNING! Safemode is still on. Please check the docker-compose files"
+ return 1
+}
+
## @description Starts a docker-compose based test environment
## @param number of datanodes to start and wait for (default: 3)
start_docker_env(){
@@ -90,6 +124,7 @@ start_docker_env(){
docker-compose -f "$COMPOSE_FILE" --no-ansi down
docker-compose -f "$COMPOSE_FILE" --no-ansi up -d --scale datanode="${datanode_count}" \
&& wait_for_datanodes "$COMPOSE_FILE" "${datanode_count}" \
+ && wait_for_safemode_exit "$COMPOSE_FILE" \
&& sleep 10
if [[ $? -gt 0 ]]; then
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
index fe612a0..b4f8c37 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
@@ -73,7 +73,7 @@ public class TestContainerStateManagerIntegration {
numContainerPerOwnerInPipeline =
conf.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
- cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
+ cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
cluster.waitForClusterToBeReady();
cluster.waitTobeOutOfSafeMode();
xceiverClientManager = new XceiverClientManager(conf);
@@ -165,7 +165,9 @@ public class TestContainerStateManagerIntegration {
}
}
- cluster.restartStorageContainerManager(true);
+ // Restart SCM will not trigger container report to satisfy the safe mode
+ // exit rule.
+ cluster.restartStorageContainerManager(false);
List<ContainerInfo> result = cluster.getStorageContainerManager()
.getContainerManager().listContainer(null, 100);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/metrics/TestSCMContainerManagerMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/metrics/TestSCMContainerManagerMetrics.java
index f2c31d1..26c8c01 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/metrics/TestSCMContainerManagerMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/metrics/TestSCMContainerManagerMetrics.java
@@ -40,6 +40,8 @@ import java.io.IOException;
import java.util.HashMap;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+ .HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.fail;
@@ -56,6 +58,7 @@ public class TestSCMContainerManagerMetrics {
public void setup() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(HDDS_CONTAINER_REPORT_INTERVAL, "3000s");
+ conf.setBoolean(HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
cluster.waitForClusterToBeReady();
scm = cluster.getStorageContainerManager();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index c583559..21fa7bd 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -81,7 +81,7 @@ public class TestPipelineClose {
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1000,
TimeUnit.MILLISECONDS);
- pipelineDestroyTimeoutInMillis = 5000;
+ pipelineDestroyTimeoutInMillis = 1000;
conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS);
cluster.waitForClusterToBeReady();
@@ -169,7 +169,7 @@ public class TestPipelineClose {
new PipelineActionHandler(pipelineManager, conf);
pipelineActionHandler
.onMessage(pipelineActionsFromDatanode, new EventQueue());
- Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 1.2));
+ Thread.sleep(5000);
OzoneContainer ozoneContainer =
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
new file mode 100644
index 0000000..8b8c64f
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test for RatisPipelineProvider.
+ */
+public class TestRatisPipelineProvider {
+
+ private NodeManager nodeManager;
+ private PipelineProvider provider;
+ private PipelineStateManager stateManager;
+
+ @Before
+ public void init() throws Exception {
+ nodeManager = new MockNodeManager(true, 10);
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
+ stateManager = new PipelineStateManager(conf);
+ provider = new MockRatisPipelineProvider(nodeManager,
+ stateManager, new OzoneConfiguration());
+ }
+
+ private void createPipelineAndAssertions(
+ HddsProtos.ReplicationFactor factor) throws IOException {
+ Pipeline pipeline = provider.create(factor);
+ stateManager.addPipeline(pipeline);
+ Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+ Assert.assertEquals(pipeline.getFactor(), factor);
+ Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
+ pipeline.getPipelineState());
+ Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+ Pipeline pipeline1 = provider.create(factor);
+ stateManager.addPipeline(pipeline1);
+ // New pipeline should not overlap with the previous created pipeline
+ Assert.assertTrue(
+ CollectionUtils.intersection(pipeline.getNodes(), pipeline1.getNodes())
+ .isEmpty());
+ Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
+ Assert.assertEquals(pipeline1.getFactor(), factor);
+ Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
+ pipeline1.getPipelineState());
+ Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
+ }
+
+ @Test
+ public void testCreatePipelineWithFactor() throws IOException {
+ HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
+ Pipeline pipeline = provider.create(factor);
+ stateManager.addPipeline(pipeline);
+ Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+ Assert.assertEquals(pipeline.getFactor(), factor);
+ Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
+ pipeline.getPipelineState());
+ Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+
+ factor = HddsProtos.ReplicationFactor.ONE;
+ Pipeline pipeline1 = provider.create(factor);
+ stateManager.addPipeline(pipeline1);
+ // New pipeline should overlap with the previous created pipeline,
+ // and one datanode should overlap between the two types.
+ Assert.assertEquals(
+ CollectionUtils.intersection(pipeline.getNodes(),
+ pipeline1.getNodes()).size(), 1);
+ Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
+ Assert.assertEquals(pipeline1.getFactor(), factor);
+ Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
+ pipeline1.getPipelineState());
+ Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
+ }
+
+ @Test
+ public void testCreatePipelineWithFactorThree() throws IOException {
+ createPipelineAndAssertions(HddsProtos.ReplicationFactor.THREE);
+ }
+
+ @Test
+ public void testCreatePipelineWithFactorOne() throws IOException {
+ createPipelineAndAssertions(HddsProtos.ReplicationFactor.ONE);
+ }
+
+ private List<DatanodeDetails> createListOfNodes(int nodeCount) {
+ List<DatanodeDetails> nodes = new ArrayList<>();
+ for (int i = 0; i < nodeCount; i++) {
+ nodes.add(TestUtils.randomDatanodeDetails());
+ }
+ return nodes;
+ }
+
+ @Test
+ public void testCreatePipelineWithNodes() {
+ HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
+ Pipeline pipeline =
+ provider.create(factor, createListOfNodes(factor.getNumber()));
+ Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+ Assert.assertEquals(pipeline.getFactor(), factor);
+ Assert.assertEquals(
+ pipeline.getPipelineState(), Pipeline.PipelineState.OPEN);
+ Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+
+ factor = HddsProtos.ReplicationFactor.ONE;
+ pipeline = provider.create(factor, createListOfNodes(factor.getNumber()));
+ Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+ Assert.assertEquals(pipeline.getFactor(), factor);
+ Assert.assertEquals(pipeline.getPipelineState(),
+ Pipeline.PipelineState.OPEN);
+ Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+ }
+
+ @Test
+ public void testCreatePipelinesDnExclude() throws IOException {
+
+ // We need 9 Healthy DNs in MockNodeManager.
+ NodeManager mockNodeManager = new MockNodeManager(true, 12);
+ PipelineStateManager stateManagerMock =
+ new PipelineStateManager(new OzoneConfiguration());
+ PipelineProvider providerMock = new MockRatisPipelineProvider(
+ mockNodeManager, stateManagerMock, new OzoneConfiguration());
+
+ // Use up first 3 DNs for an open pipeline.
+ List<DatanodeDetails> openPiplineDns = mockNodeManager.getAllNodes()
+ .subList(0, 3);
+ HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
+
+ Pipeline openPipeline = Pipeline.newBuilder()
+ .setType(HddsProtos.ReplicationType.RATIS)
+ .setFactor(factor)
+ .setNodes(openPiplineDns)
+ .setState(Pipeline.PipelineState.OPEN)
+ .setId(PipelineID.randomId())
+ .build();
+
+ stateManagerMock.addPipeline(openPipeline);
+
+ // Use up next 3 DNs also for an open pipeline.
+ List<DatanodeDetails> moreOpenPiplineDns = mockNodeManager.getAllNodes()
+ .subList(3, 6);
+ Pipeline anotherOpenPipeline = Pipeline.newBuilder()
+ .setType(HddsProtos.ReplicationType.RATIS)
+ .setFactor(factor)
+ .setNodes(moreOpenPiplineDns)
+ .setState(Pipeline.PipelineState.OPEN)
+ .setId(PipelineID.randomId())
+ .build();
+ stateManagerMock.addPipeline(anotherOpenPipeline);
+
+ // Use up next 3 DNs also for a closed pipeline.
+ List<DatanodeDetails> closedPiplineDns = mockNodeManager.getAllNodes()
+ .subList(6, 9);
+ Pipeline anotherClosedPipeline = Pipeline.newBuilder()
+ .setType(HddsProtos.ReplicationType.RATIS)
+ .setFactor(factor)
+ .setNodes(closedPiplineDns)
+ .setState(Pipeline.PipelineState.CLOSED)
+ .setId(PipelineID.randomId())
+ .build();
+ stateManagerMock.addPipeline(anotherClosedPipeline);
+
+ Pipeline pipeline = providerMock.create(factor);
+ Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+ Assert.assertEquals(pipeline.getFactor(), factor);
+ Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
+ pipeline.getPipelineState());
+ Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+ List<DatanodeDetails> pipelineNodes = pipeline.getNodes();
+
+ // Pipline nodes cannot be from open pipelines.
+ Assert.assertTrue(
+ pipelineNodes.parallelStream().filter(dn ->
+ (openPiplineDns.contains(dn) || moreOpenPiplineDns.contains(dn)))
+ .count() == 0);
+
+ // Since we have only 9 Healthy DNs, at least 1 pipeline node should have
+ // been from the closed pipeline DN list.
+ Assert.assertTrue(pipelineNodes.parallelStream().filter(
+ closedPiplineDns::contains).count() > 0);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 105d2e2..9e2cfa9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -62,6 +62,7 @@ public class TestSCMPipelineManager {
testDir = GenericTestUtils
.getTestDir(TestSCMPipelineManager.class.getSimpleName());
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
boolean folderExisted = testDir.exists() || testDir.mkdirs();
if (!folderExisted) {
throw new IOException("Unable to create test directory path");
@@ -77,7 +78,7 @@ public class TestSCMPipelineManager {
@Test
public void testPipelineReload() throws IOException {
SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+ new SCMPipelineManager(conf, nodeManager, new EventQueue());
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf);
@@ -94,7 +95,7 @@ public class TestSCMPipelineManager {
// new pipeline manager should be able to load the pipelines from the db
pipelineManager =
- new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+ new SCMPipelineManager(conf, nodeManager, new EventQueue());
mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf);
@@ -117,7 +118,7 @@ public class TestSCMPipelineManager {
@Test
public void testRemovePipeline() throws IOException {
SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+ new SCMPipelineManager(conf, nodeManager, new EventQueue());
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf);
@@ -135,7 +136,71 @@ public class TestSCMPipelineManager {
// new pipeline manager should not be able to load removed pipelines
pipelineManager =
- new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+ new SCMPipelineManager(conf, nodeManager, new EventQueue());
+ try {
+ pipelineManager.getPipeline(pipeline.getId());
+ Assert.fail("Pipeline should not have been retrieved");
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage().contains("not found"));
+ }
+
+ // clean up
+ pipelineManager.close();
+ }
+
+ @Test
+ public void testPipelineReport() throws IOException {
+ EventQueue eventQueue = new EventQueue();
+ SCMPipelineManager pipelineManager =
+ new SCMPipelineManager(conf, nodeManager, eventQueue);
+ PipelineProvider mockRatisProvider =
+ new MockRatisPipelineProvider(nodeManager,
+ pipelineManager.getStateManager(), conf);
+ pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+ mockRatisProvider);
+
+ SCMSafeModeManager scmSafeModeManager =
+ new SCMSafeModeManager(conf, new ArrayList<>(), pipelineManager,
+ eventQueue);
+
+ // create a pipeline in allocated state with no dns yet reported
+ Pipeline pipeline = pipelineManager
+ .createPipeline(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+
+ Assert
+ .assertFalse(pipelineManager.getPipeline(pipeline.getId()).isHealthy());
+ Assert
+ .assertFalse(pipelineManager.getPipeline(pipeline.getId()).isOpen());
+
+ // pipeline is not healthy until all dns report
+ PipelineReportHandler pipelineReportHandler =
+ new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf);
+ List<DatanodeDetails> nodes = pipeline.getNodes();
+ Assert.assertFalse(
+ pipelineManager.getPipeline(pipeline.getId()).isHealthy());
+ // get pipeline report from each dn in the pipeline
+ nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
+ pipelineReportHandler, false, eventQueue));
+ sendPipelineReport(nodes.get(nodes.size() - 1), pipeline,
+ pipelineReportHandler, true, eventQueue);
+
+ // pipeline is healthy when all dns report
+ Assert
+ .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isHealthy());
+ // pipeline should now move to open state
+ Assert
+ .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen());
+
+ // close the pipeline
+ pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+
+ // pipeline report for destroyed pipeline should be ignored
+ nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
+ pipelineReportHandler, false, eventQueue));
+ sendPipelineReport(nodes.get(nodes.size() - 1), pipeline,
+ pipelineReportHandler, true, eventQueue);
+
try {
pipelineManager.getPipeline(pipeline.getId());
Assert.fail("Pipeline should not have been retrieved");
@@ -152,7 +217,7 @@ public class TestSCMPipelineManager {
MockNodeManager nodeManagerMock = new MockNodeManager(true,
20);
SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManagerMock, new EventQueue(), null);
+ new SCMPipelineManager(conf, nodeManagerMock, new EventQueue());
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManagerMock,
pipelineManager.getStateManager(), conf);
@@ -161,9 +226,9 @@ public class TestSCMPipelineManager {
MetricsRecordBuilder metrics = getMetrics(
SCMPipelineMetrics.class.getSimpleName());
- long numPipelineCreated = getLongCounter("NumPipelineCreated",
+ long numPipelineAllocated = getLongCounter("NumPipelineAllocated",
metrics);
- Assert.assertTrue(numPipelineCreated == 0);
+ Assert.assertTrue(numPipelineAllocated == 0);
// 3 DNs are unhealthy.
// Create 5 pipelines (Use up 15 Datanodes)
@@ -176,8 +241,8 @@ public class TestSCMPipelineManager {
metrics = getMetrics(
SCMPipelineMetrics.class.getSimpleName());
- numPipelineCreated = getLongCounter("NumPipelineCreated", metrics);
- Assert.assertTrue(numPipelineCreated == 5);
+ numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
+ Assert.assertTrue(numPipelineAllocated == 5);
long numPipelineCreateFailed = getLongCounter(
"NumPipelineCreationFailed", metrics);
@@ -196,8 +261,8 @@ public class TestSCMPipelineManager {
metrics = getMetrics(
SCMPipelineMetrics.class.getSimpleName());
- numPipelineCreated = getLongCounter("NumPipelineCreated", metrics);
- Assert.assertTrue(numPipelineCreated == 5);
+ numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
+ Assert.assertTrue(numPipelineAllocated == 5);
numPipelineCreateFailed = getLongCounter(
"NumPipelineCreationFailed", metrics);
@@ -210,7 +275,7 @@ public class TestSCMPipelineManager {
@Test
public void testActivateDeactivatePipeline() throws IOException {
final SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+ new SCMPipelineManager(conf, nodeManager, new EventQueue());
final PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf);
@@ -257,7 +322,7 @@ public class TestSCMPipelineManager {
public void testPipelineOpenOnlyWhenLeaderReported() throws Exception {
EventQueue eventQueue = new EventQueue();
SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManager, eventQueue, null);
+ new SCMPipelineManager(conf, nodeManager, eventQueue);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf);
@@ -270,7 +335,7 @@ public class TestSCMPipelineManager {
pipelineManager.close();
// new pipeline manager loads the pipelines from the db in ALLOCATED state
pipelineManager =
- new SCMPipelineManager(conf, nodeManager, eventQueue, null);
+ new SCMPipelineManager(conf, nodeManager, eventQueue);
mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 59cef37..0102246 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -262,7 +262,7 @@ public interface MiniOzoneCluster {
// Use relative smaller number of handlers for testing
protected int numOfOmHandlers = 20;
protected int numOfScmHandlers = 20;
- protected int numOfDatanodes = 1;
+ protected int numOfDatanodes = 3;
protected boolean startDataNodes = true;
protected CertificateClient certClient;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 2813711..2ad6c12 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -28,8 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.safemode.HealthyPipelineSafeModeRule;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
@@ -67,7 +65,6 @@ import java.nio.file.Paths;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
@@ -98,7 +95,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
private final List<HddsDatanodeService> hddsDatanodes;
// Timeout for the cluster to be ready
- private int waitForClusterToBeReadyTimeout = 60000; // 1 min
+ private int waitForClusterToBeReadyTimeout = 120000; // 2 min
private CertificateClient caClient;
/**
@@ -147,32 +144,17 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
final int healthy = scm.getNodeCount(HEALTHY);
- boolean isReady = healthy == hddsDatanodes.size();
- boolean printIsReadyMsg = true;
- List<Pipeline> pipelines = scm.getPipelineManager().getPipelines();
- if (!pipelines.isEmpty()) {
- List<Pipeline> raftPipelines = pipelines.stream().filter(p ->
- p.getType() == HddsProtos.ReplicationType.RATIS).collect(
- Collectors.toList());
- if (!raftPipelines.isEmpty()) {
- List<Pipeline> notOpenPipelines = raftPipelines.stream().filter(p ->
- p.getPipelineState() != Pipeline.PipelineState.OPEN &&
- p.getPipelineState() != Pipeline.PipelineState.CLOSED)
- .collect(Collectors.toList());
- if (notOpenPipelines.size() > 0) {
- LOG.info("Waiting for {} number of pipelines out of {}, to report "
- + "a leader.", notOpenPipelines.size(), raftPipelines.size());
- isReady = false;
- printIsReadyMsg = false;
- }
- }
- }
- if (printIsReadyMsg) {
- LOG.info("{}. Got {} of {} DN Heartbeats.",
- isReady ? "Cluster is ready" : "Waiting for cluster to be ready",
- healthy, hddsDatanodes.size());
- }
- return isReady;
+ final boolean isNodeReady = healthy == hddsDatanodes.size();
+ final boolean exitSafeMode = !scm.isInSafeMode();
+
+ LOG.info("{}. Got {} of {} DN Heartbeats.",
+ isNodeReady? "Nodes are ready" : "Waiting for nodes to be ready",
+ healthy, hddsDatanodes.size());
+ LOG.info(exitSafeMode? "Cluster exits safe mode" :
+ "Waiting for cluster to exit safe mode",
+ healthy, hddsDatanodes.size());
+
+ return isNodeReady && exitSafeMode;
}, 1000, waitForClusterToBeReadyTimeout);
}
@@ -660,7 +642,6 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
if (hbInterval.isPresent()) {
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
hbInterval.get(), TimeUnit.MILLISECONDS);
-
} else {
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
DEFAULT_HB_INTERVAL_MS,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
index cd975cf..eadb520 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
@@ -49,7 +49,7 @@ public class TestContainerOperations {
ozoneConf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
ozoneConf.setStorageSize(OZONE_SCM_CONTAINER_SIZE, 5, StorageUnit.GB);
- cluster = MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(1).build();
+ cluster = MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(3).build();
storageClient = new ContainerOperationClient(ozoneConf);
cluster.waitForClusterToBeReady();
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java
index 76eee6a..548f9b6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java
@@ -61,7 +61,7 @@ public class TestContainerStateMachineIdempotency {
ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
cluster =
- MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build();
+ MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(3).build();
cluster.waitForClusterToBeReady();
storageContainerLocationClient =
cluster.getStorageContainerLocationClient();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index efc2736..99b4083 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -282,11 +282,11 @@ public class TestMiniOzoneCluster {
* Test that a DN can register with SCM even if it was started before the SCM.
* @throws Exception
*/
- @Test (timeout = 300_000)
+ @Test (timeout = 60000)
public void testDNstartAfterSCM() throws Exception {
- // Start a cluster with 1 DN
+ // Start a cluster with 3 DN
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(1)
+ .setNumDatanodes(3)
.build();
cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 86dd75a..706f880 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -19,8 +19,12 @@ package org.apache.hadoop.ozone;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+ .HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+ .HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+ .HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
@@ -328,10 +332,12 @@ public class TestStorageContainerManager {
100, TimeUnit.MILLISECONDS);
conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
numKeys);
+ conf.setBoolean(HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
.setHbInterval(1000)
.setHbProcessorInterval(3000)
+ .setNumDatanodes(1)
.build();
cluster.waitForClusterToBeReady();
@@ -458,7 +464,7 @@ public class TestStorageContainerManager {
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
//This will set the cluster id in the version file
MiniOzoneCluster cluster =
- MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
+ MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
cluster.waitForClusterToBeReady();
try {
// This will initialize SCM
@@ -570,6 +576,7 @@ public class TestStorageContainerManager {
100, TimeUnit.MILLISECONDS);
conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
numKeys);
+ conf.setBoolean(HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
.setHbInterval(1000)
@@ -597,7 +604,7 @@ public class TestStorageContainerManager {
scm.getContainerManager().updateContainerState(selectedContainer
.containerID(), HddsProtos.LifeCycleEvent.FINALIZE);
- cluster.restartStorageContainerManager(true);
+ cluster.restartStorageContainerManager(false);
scm = cluster.getStorageContainerManager();
EventPublisher publisher = mock(EventPublisher.class);
ReplicationManager replicationManager = scm.getReplicationManager();
@@ -607,8 +614,7 @@ public class TestStorageContainerManager {
modifiersField.setAccessible(true);
modifiersField.setInt(f, f.getModifiers() & ~Modifier.FINAL);
f.set(replicationManager, publisher);
- scm.getReplicationManager().start();
- Thread.sleep(2000);
+ Thread.sleep(10000);
UUID dnUuid = cluster.getHddsDatanodes().iterator().next()
.getDatanodeDetails().getUuid();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java
index 623b11d..e6ed498 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java
@@ -45,6 +45,8 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.
HDDS_COMMAND_STATUS_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.
HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+ .HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
@@ -81,6 +83,7 @@ public class TestBCSID {
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
conf.setQuietMode(false);
+ conf.setBoolean(HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
cluster =
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200)
.build();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
index 82c4910..e4b34fb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
@@ -52,6 +52,8 @@ import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+ .HDDS_SCM_SAFEMODE_MIN_PIPELINE;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
@@ -93,6 +95,7 @@ public class TestContainerStateMachine {
OzoneManager.setTestSecureOmFlag(true);
conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
// conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString());
+ conf.setInt(HDDS_SCM_SAFEMODE_MIN_PIPELINE, 0);
cluster =
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1)
.setHbInterval(200)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 0fb15d0..e6302e3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -73,6 +73,8 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.
HDDS_COMMAND_STATUS_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.
HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+ .HDDS_PIPELINE_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
ContainerDataProto.State.UNHEALTHY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
@@ -118,6 +120,8 @@ public class TestContainerStateMachineFailures {
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 200,
+ TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10,
@@ -129,7 +133,7 @@ public class TestContainerStateMachineFailures {
conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
conf.setQuietMode(false);
cluster =
- MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200)
+ MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(200)
.build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
@@ -576,6 +580,7 @@ public class TestContainerStateMachineFailures {
Assert.assertTrue(!dispatcher.getMissingContainerSet().isEmpty());
Assert
.assertTrue(dispatcher.getMissingContainerSet().contains(containerID));
+
// write a new key
key = objectStore.getVolume(volumeName).getBucket(bucketName)
.createKey("ratis", 1024, ReplicationType.RATIS, ReplicationFactor.ONE,
@@ -599,7 +604,7 @@ public class TestContainerStateMachineFailures {
byte[] blockCommitSequenceIdKey =
DFSUtil.string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX);
- // modify the bcsid for the container in the ROCKS DB tereby inducing
+ // modify the bcsid for the container in the ROCKS DB thereby inducing
// corruption
db.getStore().put(blockCommitSequenceIdKey, Longs.toByteArray(0));
db.decrementReference();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
index 9768943..8310183 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
@@ -73,7 +73,7 @@ public class TestContainerReplication {
@Before
public void setup() throws Exception {
conf = newOzoneConfiguration();
- cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(2)
+ cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3)
.setRandomContainerPort(true).build();
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index c7b7992..ffe5b6f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -112,7 +112,7 @@ public class TestBlockDeletion {
3, TimeUnit.SECONDS);
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(1)
+ .setNumDatanodes(3)
.setHbInterval(200)
.build();
cluster.waitForClusterToBeReady();
@@ -143,7 +143,7 @@ public class TestBlockDeletion {
String keyName = UUID.randomUUID().toString();
OzoneOutputStream out = bucket.createKey(keyName, value.getBytes().length,
- ReplicationType.RATIS, ReplicationFactor.ONE, new HashMap<>());
+ ReplicationType.RATIS, ReplicationFactor.THREE, new HashMap<>());
for (int i = 0; i < 100; i++) {
out.write(value.getBytes());
}
@@ -152,7 +152,7 @@ public class TestBlockDeletion {
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
.setBucketName(bucketName).setKeyName(keyName).setDataSize(0)
.setType(HddsProtos.ReplicationType.RATIS)
- .setFactor(HddsProtos.ReplicationFactor.ONE)
+ .setFactor(HddsProtos.ReplicationFactor.THREE)
.setRefreshPipeline(true)
.build();
List<OmKeyLocationInfoGroup> omKeyLocationInfoGroupList =
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
index 5c7f2c1..3320f94 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import java.util.HashMap;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -58,6 +59,7 @@ public class TestCloseContainerHandler {
//setup a cluster (1G free space is enough for a unit test)
conf = new OzoneConfiguration();
conf.set(OZONE_SCM_CONTAINER_SIZE, "1GB");
+ conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(1).build();
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
index 1cbf69e..4df6f69 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -66,6 +67,7 @@ public class TestDeleteContainerHandler {
public static void setup() throws Exception {
conf = new OzoneConfiguration();
conf.set(OZONE_SCM_CONTAINER_SIZE, "1GB");
+ conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(1).build();
cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java
index 7fb9825..1d04330 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java
@@ -85,6 +85,8 @@ public class TestDataScrubber {
ozoneConfig.set(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, "1s");
ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+ ozoneConfig.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+ false);
cluster = MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1)
.build();
cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
index 732fb34..e1e0cfa 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
@@ -67,7 +67,7 @@ public class TestKeyPurging {
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(1)
+ .setNumDatanodes(3)
.setHbInterval(200)
.build();
cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestScmSafeMode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestScmSafeMode.java
index 3614a05..1c1f034 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestScmSafeMode.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestScmSafeMode.java
@@ -341,7 +341,7 @@ public class TestScmSafeMode {
builder = MiniOzoneCluster.newBuilder(conf)
.setHbInterval(1000)
.setHbProcessorInterval(500)
- .setNumDatanodes(1);
+ .setNumDatanodes(3);
cluster = builder.build();
StorageContainerManager scm = cluster.getStorageContainerManager();
assertFalse(scm.isInSafeMode());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
index 48ce4a6..9d187ff 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
@@ -61,7 +61,7 @@ public class TestContainerSmallFile {
ozoneConfig = new OzoneConfiguration();
ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
- cluster = MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1)
+ cluster = MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(3)
.build();
cluster.waitForClusterToBeReady();
storageContainerLocationClient = cluster
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
index b19020f..2f8c755 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
@@ -65,7 +65,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
cluster =
- MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build();
+ MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(3).build();
cluster.waitForClusterToBeReady();
storageContainerLocationClient =
cluster.getStorageContainerLocationClient();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
index e700a0e..59a28f7 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
@@ -57,7 +57,7 @@ import static org.junit.Assert.fail;
public class TestSCMMXBean {
public static final Log LOG = LogFactory.getLog(TestSCMMXBean.class);
- private static int numOfDatanodes = 1;
+ private static int numOfDatanodes = 3;
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
private static StorageContainerManager scm;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
index 65a6357..4aa1eae 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.scm.node;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
@@ -49,7 +50,8 @@ public class TestSCMNodeMetrics {
@Before
public void setup() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
- cluster = MiniOzoneCluster.newBuilder(conf).build();
+ conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
+ cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
cluster.waitForClusterToBeReady();
}
diff --git a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsRenameDir.java b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsRenameDir.java
index 1d58465..2b95cbd 100644
--- a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsRenameDir.java
+++ b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsRenameDir.java
@@ -52,7 +52,7 @@ public class TestOzoneFsRenameDir {
public void init() throws Exception {
conf = new OzoneConfiguration();
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(1)
+ .setNumDatanodes(3)
.build();
cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java
index 112674a..b6a8f18 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java
@@ -65,7 +65,7 @@ public class TestContainerMapper {
conf.set(OZONE_OM_DB_DIRS, dbPath);
conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, "100MB");
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(1)
+ .setNumDatanodes(3)
.setScmId(SCM_ID)
.build();
cluster.waitForClusterToBeReady();
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org