You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2019/10/13 06:53:05 UTC
[hadoop-ozone] 01/03: HDDS-2034. sync RATIS pipeline creation and
destroy through heartbeat commands.
This is an automated email from the ASF dual-hosted git repository.
elek pushed a commit to branch HDDS-2034
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit f7b9ad365cd811b785db5a738241f139832b1fc2
Author: Sammi Chen <sa...@apache.org>
AuthorDate: Mon Sep 23 20:24:19 2019 +0800
HDDS-2034. sync RATIS pipeline creation and destroy through heartbeat commands.
---
.../org/apache/hadoop/hdds/HddsConfigKeys.java | 12 +-
.../apache/hadoop/hdds/scm/pipeline/Pipeline.java | 7 +
.../common/src/main/resources/ozone-default.xml | 30 ++-
hadoop-hdds/container-service/pom.xml | 5 +
.../common/statemachine/DatanodeStateMachine.java | 14 ++
.../common/statemachine/StateContext.java | 10 +
.../CloseContainerCommandHandler.java | 11 +-
.../ClosePipelineCommandHandler.java | 166 +++++++++++++++
.../commandhandler/CommandHandler.java | 2 +-
.../CreatePipelineCommandHandler.java | 228 +++++++++++++++++++++
.../states/endpoint/HeartbeatEndpointTask.java | 22 ++
.../protocol/commands/ClosePipelineCommand.java | 73 +++++++
.../protocol/commands/CreatePipelineCommand.java | 100 +++++++++
.../commands/CreatePipelineCommandStatus.java | 97 +++++++++
.../proto/StorageContainerDatanodeProtocol.proto | 31 +++
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 29 +++
.../scm/command/CommandStatusReportHandler.java | 24 ++-
.../apache/hadoop/hdds/scm/events/SCMEvents.java | 22 +-
.../hadoop/hdds/scm/pipeline/PipelineFactory.java | 13 +-
.../hadoop/hdds/scm/pipeline/PipelineManager.java | 11 +-
.../hadoop/hdds/scm/pipeline/PipelineProvider.java | 2 +
.../hdds/scm/pipeline/PipelineReportHandler.java | 32 +--
.../hdds/scm/pipeline/PipelineStateManager.java | 2 +-
.../hdds/scm/pipeline/RatisPipelineProvider.java | 124 ++++-------
.../hdds/scm/pipeline/RatisPipelineUtils.java | 101 ---------
.../hdds/scm/pipeline/SCMPipelineManager.java | 119 +++++++++--
.../hdds/scm/pipeline/SCMPipelineMetrics.java | 11 +
.../hdds/scm/pipeline/SimplePipelineProvider.java | 5 +
.../scm/safemode/HealthyPipelineSafeModeRule.java | 97 ++++-----
.../safemode/OneReplicaPipelineSafeModeRule.java | 64 ++----
.../hdds/scm/safemode/SCMSafeModeManager.java | 13 +-
.../hadoop/hdds/scm/safemode/SafeModeHandler.java | 5 +-
.../scm/server/SCMDatanodeHeartbeatDispatcher.java | 1 +
.../hdds/scm/server/SCMDatanodeProtocolServer.java | 20 ++
.../hdds/scm/server/StorageContainerManager.java | 4 +-
.../java/org/apache/hadoop/hdds/scm/TestUtils.java | 34 +++
.../container/TestCloseContainerEventHandler.java | 3 +
.../hadoop/hdds/scm/node/TestDeadNodeHandler.java | 11 +-
.../hadoop/hdds/scm/node/TestSCMNodeManager.java | 9 +-
.../scm/pipeline/MockRatisPipelineProvider.java | 3 +-
.../safemode/TestHealthyPipelineSafeModeRule.java | 39 +---
.../TestOneReplicaPipelineSafeModeRule.java | 33 +--
.../hdds/scm/safemode/TestSCMSafeModeManager.java | 36 ++--
.../TestContainerStateManagerIntegration.java | 4 +-
.../hdds/scm/pipeline/TestPipelineClose.java | 2 +-
.../scm/pipeline/TestRatisPipelineProvider.java | 15 +-
.../hdds/scm/pipeline/TestSCMPipelineManager.java | 46 ++---
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 16 +-
48 files changed, 1265 insertions(+), 493 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 99972ae..5e161b3 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -81,7 +81,12 @@ public final class HddsConfigKeys {
public static final String HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK =
"hdds.scm.safemode.pipeline-availability.check";
public static final boolean
- HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT = false;
+ HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT = true;
+
+ public static final String HDDS_SCM_SAFEMODE_PIPELINE_CREATION =
+ "hdds.scm.safemode.pipeline.creation";
+ public static final boolean
+ HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT = true;
// % of containers which should have at least one reported replica
// before SCM comes out of safe mode.
@@ -89,13 +94,16 @@ public final class HddsConfigKeys {
"hdds.scm.safemode.threshold.pct";
public static final double HDDS_SCM_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.99;
-
// percentage of healthy pipelines, where all 3 datanodes are reported in the
// pipeline.
public static final String HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT =
"hdds.scm.safemode.healthy.pipelie.pct";
public static final double
HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT = 0.10;
+ // number of healthy RATIS pipeline(ONE or THREE factor)
+ public static final String HDDS_SCM_SAFEMODE_MIN_PIPELINE =
+ "hdds.scm.safemode.min.pipeline";
+ public static final int HDDS_SCM_SAFEMODE_MIN_PIPELINE_DEFAULT = 1;
public static final String HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT =
"hdds.scm.safemode.atleast.one.node.reported.pipeline.pct";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 1627569..9d4b728 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -51,6 +51,7 @@ public final class Pipeline {
private Map<DatanodeDetails, Long> nodeStatus;
// nodes with ordered distance to client
private ThreadLocal<List<DatanodeDetails>> nodesInOrder = new ThreadLocal<>();
+ private final long creationTime;
/**
* The immutable properties of pipeline object is used in
@@ -65,6 +66,7 @@ public final class Pipeline {
this.factor = factor;
this.state = state;
this.nodeStatus = nodeStatus;
+ this.creationTime = System.currentTimeMillis();
}
/**
@@ -135,6 +137,11 @@ public final class Pipeline {
return state == PipelineState.OPEN;
}
+ public boolean isAllocationTimeout() {
+ //TODO: define a system property to control the timeout value
+ return false;
+ }
+
public void setNodesInOrder(List<DatanodeDetails> nodes) {
nodesInOrder.set(nodes);
}
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index a038047..1d592fa 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -311,15 +311,6 @@
defined with postfix (ns,ms,s,m,h,d)</description>
</property>
<property>
- <name>hdds.command.status.report.interval</name>
- <value>60000ms</value>
- <tag>OZONE, CONTAINER, MANAGEMENT</tag>
- <description>Time interval of the datanode to send status of command
- execution. Each datanode periodically the execution status of commands
- received from SCM to SCM. Unit could be defined with postfix
- (ns,ms,s,m,h,d)</description>
- </property>
- <property>
<name>hdds.pipeline.report.interval</name>
<value>60000ms</value>
<tag>OZONE, PIPELINE, MANAGEMENT</tag>
@@ -1315,7 +1306,7 @@
<property>
<name>hdds.scm.safemode.pipeline-availability.check</name>
- <value>false</value>
+ <value>true</value>
<tag>HDDS,SCM,OPERATION</tag>
<description>
Boolean value to enable pipeline availability check during SCM safe mode.
@@ -1401,6 +1392,25 @@
</property>
<property>
+ <name>hdds.scm.safemode.pipeline.creation</name>
+ <value>true</value>
+ <tag>HDDS,SCM,OPERATION</tag>
+ <description>
+ Boolean value to enable background pipeline creation in SCM safe mode.
+ </description>
+ </property>
+
+ <property>
+ <name>hdds.scm.safemode.min.pipeline</name>
+ <value>1</value>
+ <tag>HDDS,SCM,OPERATION</tag>
+ <description>
+ Minimum RATIS pipeline number to exit SCM safe mode. Considered only when
+ "hdds.scm.safemode.pipeline.creation" is True.
+ </description>
+ </property>
+
+ <property>
<name>hdds.lock.max.concurrency</name>
<value>100</value>
<tag>HDDS</tag>
diff --git a/hadoop-hdds/container-service/pom.xml b/hadoop-hdds/container-service/pom.xml
index 2f89fa2..bb85f3c 100644
--- a/hadoop-hdds/container-service/pom.xml
+++ b/hadoop-hdds/container-service/pom.xml
@@ -38,6 +38,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>hadoop-hdds-server-framework</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdds-client</artifactId>
+ <version>${hdds.version}</version>
+ </dependency>
+ <dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index c9eb702..926f19c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -39,8 +39,12 @@ import org.apache.hadoop.ozone.container.common.report.ReportManager;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.CloseContainerCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+ .ClosePipelineCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.CommandDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+ .CreatePipelineCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.DeleteBlocksCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.DeleteContainerCommandHandler;
@@ -126,6 +130,8 @@ public class DatanodeStateMachine implements Closeable {
conf))
.addHandler(new ReplicateContainerCommandHandler(conf, supervisor))
.addHandler(new DeleteContainerCommandHandler())
+ .addHandler(new ClosePipelineCommandHandler())
+ .addHandler(new CreatePipelineCommandHandler())
.setConnectionManager(connectionManager)
.setContainer(container)
.setContext(context)
@@ -486,4 +492,12 @@ public class DatanodeStateMachine implements Closeable {
public ReplicationSupervisor getSupervisor() {
return supervisor;
}
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public CertificateClient getCertificateClient() {
+ return dnCertClient;
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 2c01f3a..17e5502 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -37,6 +37,8 @@ import org.apache.hadoop.ozone.container.common.states.datanode
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.ozone.protocol.commands
.DeleteBlockCommandStatus.DeleteBlockCommandStatusBuilder;
+import org.apache.hadoop.ozone.protocol.commands
+ .CreatePipelineCommandStatus.CreatePipelineCommandStatusBuilder;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import static java.lang.Math.min;
@@ -464,6 +466,14 @@ public class StateContext {
.setType(cmd.getType())
.build());
}
+ if (cmd.getType() == SCMCommandProto.Type.createPipelineCommand) {
+ addCmdStatus(cmd.getId(),
+ CreatePipelineCommandStatusBuilder.newBuilder()
+ .setCmdId(cmd.getId())
+ .setStatus(Status.PENDING)
+ .setType(cmd.getType())
+ .build());
+ }
}
/**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index 881fea0..f7b80cd 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -39,6 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Handler for close container command received from SCM.
@@ -48,7 +49,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
private static final Logger LOG =
LoggerFactory.getLogger(CloseContainerCommandHandler.class);
- private int invocationCount;
+ private AtomicLong invocationCount = new AtomicLong(0);
private long totalTime;
/**
@@ -69,7 +70,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
public void handle(SCMCommand command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
LOG.debug("Processing Close Container command.");
- invocationCount++;
+ invocationCount.incrementAndGet();
final long startTime = Time.monotonicNow();
final DatanodeDetails datanodeDetails = context.getParent()
.getDatanodeDetails();
@@ -159,7 +160,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
*/
@Override
public int getInvocationCount() {
- return invocationCount;
+ return (int)invocationCount.get();
}
/**
@@ -169,8 +170,8 @@ public class CloseContainerCommandHandler implements CommandHandler {
*/
@Override
public long getAverageRunTime() {
- if (invocationCount > 0) {
- return totalTime / invocationCount;
+ if (invocationCount.get() > 0) {
+ return totalTime / invocationCount.get();
}
return 0;
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
new file mode 100644
index 0000000..bd48839
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.client
+ .CertificateClient;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Handler for close pipeline command received from SCM.
+ */
+public class ClosePipelineCommandHandler implements CommandHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ClosePipelineCommandHandler.class);
+
+ private AtomicLong invocationCount = new AtomicLong(0);
+ private long totalTime;
+
+ /**
+ * Constructs a closePipelineCommand handler.
+ */
+ public ClosePipelineCommandHandler() {
+ }
+
+ /**
+ * Handles a given SCM command.
+ *
+ * @param command - SCM Command
+ * @param ozoneContainer - Ozone Container.
+ * @param context - Current Context.
+ * @param connectionManager - The SCMs that we are talking to.
+ */
+ @Override
+ public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+ StateContext context, SCMConnectionManager connectionManager) {
+ invocationCount.incrementAndGet();
+ final long startTime = Time.monotonicNow();
+ final DatanodeDetails datanode = context.getParent()
+ .getDatanodeDetails();
+ final ClosePipelineCommandProto closeCommand =
+ ((ClosePipelineCommand)command).getProto();
+ final PipelineID pipelineID = PipelineID.getFromProtobuf(
+ closeCommand.getPipelineID());
+
+ try {
+ destroyPipeline(datanode, pipelineID, context);
+ LOG.info("Close Pipeline #{} command on datanode #{}.", pipelineID,
+ datanode.getUuidString());
+ } catch (IOException e) {
+ LOG.error("Can't close pipeline #{}", pipelineID, e);
+ } finally {
+ long endTime = Time.monotonicNow();
+ totalTime += endTime - startTime;
+ }
+
+ }
+
+ /**
+ * Returns the command type that this command handler handles.
+ *
+ * @return Type
+ */
+ @Override
+ public SCMCommandProto.Type getCommandType() {
+ return SCMCommandProto.Type.closePipelineCommand;
+ }
+
+ /**
+ * Returns number of times this handler has been invoked.
+ *
+ * @return int
+ */
+ @Override
+ public int getInvocationCount() {
+ return (int)invocationCount.get();
+ }
+
+ /**
+ * Returns the average time this function takes to run.
+ *
+ * @return long
+ */
+ @Override
+ public long getAverageRunTime() {
+ if (invocationCount.get() > 0) {
+ return totalTime / invocationCount.get();
+ }
+ return 0;
+ }
+
+ /**
+ * Destroy pipeline on this datanode.
+ *
+ * @param dn - Datanode on which pipeline needs to be destroyed
+ * @param pipelineID - ID of pipeline to be destroyed
+ * @param context - Ozone datanode context
+ * @throws IOException
+ */
+ void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
+ StateContext context) throws IOException {
+ final Configuration ozoneConf = context.getParent().getConf();
+ final String rpcType = ozoneConf
+ .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
+ ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+ final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
+ final RaftPeer p = RatisHelper.toRaftPeer(dn);
+ final int maxOutstandingRequests =
+ HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
+ final CertificateClient dnCertClient =
+ context.getParent().getCertificateClient();
+ final GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfigForDN(
+ new SecurityConfig(ozoneConf), dnCertClient);
+ final TimeDuration requestTimeout =
+ RatisHelper.getClientRequestTimeout(ozoneConf);
+ try(RaftClient client = RatisHelper
+ .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
+ retryPolicy, maxOutstandingRequests, tlsConfig, requestTimeout)) {
+ client.groupRemove(RaftGroupId.valueOf(pipelineID.getId()),
+ true, p.getId());
+ }
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
index 1ea0ea8..dca02f6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
@@ -68,7 +68,7 @@ public interface CommandHandler {
default void updateCommandStatus(StateContext context, SCMCommand command,
Consumer<CommandStatus> cmdStatusUpdater, Logger log) {
if (!context.updateCommandStatus(command.getId(), cmdStatusUpdater)) {
- log.debug("{} with Id:{} not found.", command.getType(),
+ log.warn("{} with Id:{} not found.", command.getType(),
command.getId());
}
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
new file mode 100644
index 0000000..e6d20e7
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.CreatePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.CreatePipelineACKProto;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.client
+ .CertificateClient;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommandStatus;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Handler for create pipeline command received from SCM.
+ */
+public class CreatePipelineCommandHandler implements CommandHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CreatePipelineCommandHandler.class);
+
+ private AtomicLong invocationCount = new AtomicLong(0);
+ private long totalTime;
+
+ /**
+ * Constructs a createPipelineCommand handler.
+ */
+ public CreatePipelineCommandHandler() {
+ }
+
+ /**
+ * Handles a given SCM command.
+ *
+ * @param command - SCM Command
+ * @param ozoneContainer - Ozone Container.
+ * @param context - Current Context.
+ * @param connectionManager - The SCMs that we are talking to.
+ */
+ @Override
+ public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+ StateContext context, SCMConnectionManager connectionManager) {
+ invocationCount.incrementAndGet();
+ final long startTime = Time.monotonicNow();
+ final DatanodeDetails dn = context.getParent()
+ .getDatanodeDetails();
+ final CreatePipelineCommandProto createCommand =
+ ((CreatePipelineCommand)command).getProto();
+ final PipelineID pipelineID = PipelineID.getFromProtobuf(
+ createCommand.getPipelineID());
+ Collection<DatanodeDetails> peers =
+ createCommand.getDatanodeList().stream()
+ .map(DatanodeDetails::getFromProtoBuf)
+ .collect(Collectors.toList());
+
+ final CreatePipelineACKProto createPipelineACK =
+ CreatePipelineACKProto.newBuilder()
+ .setPipelineID(createCommand.getPipelineID())
+ .setDatanodeUUID(dn.getUuidString()).build();
+ boolean success = false;
+ try {
+ createPipeline(dn, pipelineID, peers, context);
+ success = true;
+ LOG.info("Create Pipeline {} {} #{} command succeed on datanode {}.",
+ createCommand.getType(), createCommand.getFactor(), pipelineID,
+ dn.getUuidString());
+ } catch (NotLeaderException e) {
+ LOG.debug("Follower cannot create pipeline #{}.", pipelineID);
+ } catch (IOException e) {
+ LOG.error("Can't create pipeline {} {} #{}", createCommand.getType(),
+ createCommand.getFactor(), pipelineID, e);
+ } finally {
+ final boolean cmdExecuted = success;
+ Consumer<CommandStatus> statusUpdater = (cmdStatus) -> {
+ cmdStatus.setStatus(cmdExecuted);
+ ((CreatePipelineCommandStatus)cmdStatus)
+ .setCreatePipelineAck(createPipelineACK);
+ };
+ updateCommandStatus(context, command, statusUpdater, LOG);
+ long endTime = Time.monotonicNow();
+ totalTime += endTime - startTime;
+ }
+ }
+
+ /**
+ * Returns the command type that this command handler handles.
+ *
+ * @return Type
+ */
+ @Override
+ public SCMCommandProto.Type getCommandType() {
+ return SCMCommandProto.Type.createPipelineCommand;
+ }
+
+ /**
+ * Returns number of times this handler has been invoked.
+ *
+ * @return int
+ */
+ @Override
+ public int getInvocationCount() {
+ return (int)invocationCount.get();
+ }
+
+ /**
+ * Returns the average time this function takes to run.
+ *
+ * @return long
+ */
+ @Override
+ public long getAverageRunTime() {
+ if (invocationCount.get() > 0) {
+ return totalTime / invocationCount.get();
+ }
+ return 0;
+ }
+
+ /**
+ * Sends ratis command to create pipeline on this datanode.
+ *
+ * @param dn - this datanode
+ * @param pipelineId - pipeline ID
+ * @param peers - datanodes of the pipeline
+ * @param context - Ozone datanode context
+ * @throws IOException
+ */
+ private void createPipeline(DatanodeDetails dn, PipelineID pipelineId,
+ Collection<DatanodeDetails> peers, StateContext context)
+ throws IOException {
+ final Configuration ozoneConf = context.getParent().getConf();
+ final RaftGroup group = RatisHelper.newRaftGroup(
+ RaftGroupId.valueOf(pipelineId.getId()), peers);
+ LOG.debug("creating pipeline:#{} with {}", pipelineId, group);
+
+ final String rpcType = ozoneConf
+ .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
+ ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+ final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
+ final List< IOException > exceptions =
+ Collections.synchronizedList(new ArrayList<>());
+ final int maxOutstandingRequests =
+ HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
+ final CertificateClient dnCertClient =
+ context.getParent().getCertificateClient();
+ final GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfigForDN(
+ new SecurityConfig(ozoneConf), dnCertClient);
+ final TimeDuration requestTimeout =
+ RatisHelper.getClientRequestTimeout(ozoneConf);
+ try {
+ final RaftPeer peer = RatisHelper.toRaftPeer(dn);
+ try (RaftClient client = RatisHelper
+ .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), peer,
+ retryPolicy, maxOutstandingRequests, tlsConfig,
+ requestTimeout)) {
+
+ RaftClientReply reply = client.groupAdd(group, peer.getId());
+ if (reply == null || !reply.isSuccess()) {
+ String msg = "Pipeline initialization failed for pipeline:"
+ + pipelineId.getId() + " node:" + peer.getId();
+ throw new IOException(msg);
+ }
+ } catch (IOException ioe) {
+ String errMsg =
+ "Failed invoke Ratis rpc for " + dn.getUuid();
+ exceptions.add(new IOException(errMsg, ioe));
+ }
+ } catch (RejectedExecutionException ex) {
+ throw new IOException(ex.getClass().getName() + " exception occurred " +
+ "during createPipeline", ex);
+ }
+
+ if (!exceptions.isEmpty()) {
+ throw MultipleIOException.createIOException(exceptions);
+ }
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index c50f457..a55d0d6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine.EndPointStates;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
@@ -309,6 +311,26 @@ public class HeartbeatEndpointTask
}
this.context.addCommand(deleteContainerCommand);
break;
+ case createPipelineCommand:
+ CreatePipelineCommand createPipelineCommand =
+ CreatePipelineCommand.getFromProtobuf(
+ commandResponseProto.getCreatePipelineCommandProto());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received SCM create pipeline request {}",
+ createPipelineCommand.getPipelineID());
+ }
+ this.context.addCommand(createPipelineCommand);
+ break;
+ case closePipelineCommand:
+ ClosePipelineCommand closePipelineCommand =
+ ClosePipelineCommand.getFromProtobuf(
+ commandResponseProto.getClosePipelineCommandProto());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received SCM close pipeline request {}",
+ closePipelineCommand.getPipelineID());
+ }
+ this.context.addCommand(closePipelineCommand);
+ break;
default:
throw new IllegalArgumentException("Unknown response : "
+ commandResponseProto.getCommandType().name());
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java
new file mode 100644
index 0000000..1f75bc3
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+
+/**
+ * Asks datanode to close a pipeline.
+ */
+public class ClosePipelineCommand
+ extends SCMCommand<ClosePipelineCommandProto> {
+
+ private final PipelineID pipelineID;
+
+ public ClosePipelineCommand(final PipelineID pipelineID) {
+ super();
+ this.pipelineID = pipelineID;
+ }
+
+ public ClosePipelineCommand(long cmdId, final PipelineID pipelineID) {
+ super(cmdId);
+ this.pipelineID = pipelineID;
+ }
+
+ /**
+ * Returns the type of this command.
+ *
+ * @return Type
+ */
+ @Override
+ public SCMCommandProto.Type getType() {
+ return SCMCommandProto.Type.closePipelineCommand;
+ }
+
+ @Override
+ public ClosePipelineCommandProto getProto() {
+ ClosePipelineCommandProto.Builder builder =
+ ClosePipelineCommandProto.newBuilder();
+ builder.setCmdId(getId());
+ builder.setPipelineID(pipelineID.getProtobuf());
+ return builder.build();
+ }
+
+ public static ClosePipelineCommand getFromProtobuf(
+ ClosePipelineCommandProto createPipelineProto) {
+ Preconditions.checkNotNull(createPipelineProto);
+ return new ClosePipelineCommand(createPipelineProto.getCmdId(),
+ PipelineID.getFromProtobuf(createPipelineProto.getPipelineID()));
+ }
+
+ public PipelineID getPipelineID() {
+ return pipelineID;
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
new file mode 100644
index 0000000..9e22cbc
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.CreatePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Asks datanode to create a pipeline.
+ */
+public class CreatePipelineCommand
+ extends SCMCommand<CreatePipelineCommandProto> {
+
+ private final PipelineID pipelineID;
+ private final ReplicationFactor factor;
+ private final ReplicationType type;
+ private final List<DatanodeDetails> nodelist;
+
+ public CreatePipelineCommand(final PipelineID pipelineID,
+ final ReplicationType type, final ReplicationFactor factor,
+ final List<DatanodeDetails> datanodeList) {
+ super();
+ this.pipelineID = pipelineID;
+ this.factor = factor;
+ this.type = type;
+ this.nodelist = datanodeList;
+ }
+
+ public CreatePipelineCommand(long cmdId, final PipelineID pipelineID,
+ final ReplicationType type, final ReplicationFactor factor,
+ final List<DatanodeDetails> datanodeList) {
+ super(cmdId);
+ this.pipelineID = pipelineID;
+ this.factor = factor;
+ this.type = type;
+ this.nodelist = datanodeList;
+ }
+
+ /**
+ * Returns the type of this command.
+ *
+ * @return Type
+ */
+ @Override
+ public SCMCommandProto.Type getType() {
+ return SCMCommandProto.Type.createPipelineCommand;
+ }
+
+ @Override
+ public CreatePipelineCommandProto getProto() {
+ return CreatePipelineCommandProto.newBuilder()
+ .setCmdId(getId())
+ .setPipelineID(pipelineID.getProtobuf())
+ .setFactor(factor)
+ .setType(type)
+ .addAllDatanode(nodelist.stream()
+ .map(DatanodeDetails::getProtoBufMessage)
+ .collect(Collectors.toList()))
+ .build();
+ }
+
+ public static CreatePipelineCommand getFromProtobuf(
+ CreatePipelineCommandProto createPipelineProto) {
+ Preconditions.checkNotNull(createPipelineProto);
+ return new CreatePipelineCommand(createPipelineProto.getCmdId(),
+ PipelineID.getFromProtobuf(createPipelineProto.getPipelineID()),
+ createPipelineProto.getType(), createPipelineProto.getFactor(),
+ createPipelineProto.getDatanodeList().stream()
+ .map(DatanodeDetails::getFromProtoBuf)
+ .collect(Collectors.toList()));
+ }
+
+ public PipelineID getPipelineID() {
+ return pipelineID;
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommandStatus.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommandStatus.java
new file mode 100644
index 0000000..b5e7d6c
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommandStatus.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.CreatePipelineACKProto;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
+/**
+ * Command status to report about pipeline creation.
+ */
+public class CreatePipelineCommandStatus extends CommandStatus {
+
+ private CreatePipelineACKProto createPipelineAck;
+
+ public CreatePipelineCommandStatus(Type type, Long cmdId, Status status,
+ String msg, CreatePipelineACKProto ack) {
+ super(type, cmdId, status, msg);
+ this.createPipelineAck = ack;
+ }
+
+ public void setCreatePipelineAck(CreatePipelineACKProto ack) {
+ createPipelineAck = ack;
+ }
+
+ @Override
+ public CommandStatus getFromProtoBuf(
+ StorageContainerDatanodeProtocolProtos.CommandStatus cmdStatusProto) {
+ return CreatePipelineCommandStatusBuilder.newBuilder()
+ .setCreatePipelineAck(cmdStatusProto.getCreatePipelineAck())
+ .setCmdId(cmdStatusProto.getCmdId())
+ .setStatus(cmdStatusProto.getStatus())
+ .setType(cmdStatusProto.getType())
+ .setMsg(cmdStatusProto.getMsg())
+ .build();
+ }
+
+ @Override
+ public StorageContainerDatanodeProtocolProtos.CommandStatus
+ getProtoBufMessage() {
+ StorageContainerDatanodeProtocolProtos.CommandStatus.Builder builder =
+ StorageContainerDatanodeProtocolProtos.CommandStatus.newBuilder()
+ .setCmdId(this.getCmdId())
+ .setStatus(this.getStatus())
+ .setType(this.getType());
+ if (createPipelineAck != null) {
+ builder.setCreatePipelineAck(createPipelineAck);
+ }
+ if (this.getMsg() != null) {
+ builder.setMsg(this.getMsg());
+ }
+ return builder.build();
+ }
+
+ /**
+ * Builder for CreatePipelineCommandStatus.
+ */
+ public static final class CreatePipelineCommandStatusBuilder
+ extends CommandStatusBuilder {
+ private CreatePipelineACKProto createPipelineAck = null;
+
+ public static CreatePipelineCommandStatusBuilder newBuilder() {
+ return new CreatePipelineCommandStatusBuilder();
+ }
+
+ public CreatePipelineCommandStatusBuilder setCreatePipelineAck(
+ CreatePipelineACKProto ack) {
+ this.createPipelineAck = ack;
+ return this;
+ }
+
+ @Override
+ public CommandStatus build() {
+ return new CreatePipelineCommandStatus(getType(), getCmdId(), getStatus(),
+ getMsg(), createPipelineAck);
+ }
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 1d09dfa..90124c6 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -177,6 +177,7 @@ message CommandStatus {
required SCMCommandProto.Type type = 3;
optional string msg = 4;
optional ContainerBlocksDeletionACKProto blockDeletionAck = 5;
+ optional CreatePipelineACKProto createPipelineAck = 6;
}
message ContainerActionsProto {
@@ -243,6 +244,8 @@ message SCMCommandProto {
closeContainerCommand = 3;
deleteContainerCommand = 4;
replicateContainerCommand = 5;
+ createPipelineCommand = 6;
+ closePipelineCommand = 7;
}
// TODO: once we start using protoc 3.x, refactor this message using "oneof"
required Type commandType = 1;
@@ -251,6 +254,8 @@ message SCMCommandProto {
optional CloseContainerCommandProto closeContainerCommandProto = 4;
optional DeleteContainerCommandProto deleteContainerCommandProto = 5;
optional ReplicateContainerCommandProto replicateContainerCommandProto = 6;
+ optional CreatePipelineCommandProto createPipelineCommandProto = 7;
+ optional ClosePipelineCommandProto closePipelineCommandProto = 8;
}
/**
@@ -320,6 +325,32 @@ message ReplicateContainerCommandProto {
}
/**
+This command asks the datanode to create a pipeline.
+*/
+message CreatePipelineCommandProto {
+ required PipelineID pipelineID = 1;
+ required ReplicationType type = 2;
+ required ReplicationFactor factor = 3;
+ repeated DatanodeDetailsProto datanode = 4;
+ required int64 cmdId = 5;
+}
+
+// ACK message datanode sent to SCM, contains the result of
+// pipeline creation status.
+message CreatePipelineACKProto {
+ required PipelineID pipelineID = 1;
+ required string datanodeUUID = 2;
+}
+
+/**
+This command asks the datanode to close a pipeline.
+*/
+message ClosePipelineCommandProto {
+ required PipelineID pipelineID = 1;
+ required int64 cmdId = 2;
+}
+
+/**
* Protocol used from a datanode to StorageContainerManager.
*
* Please see the request and response messages for details of the RPC calls.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index a0a7222..1390c52 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -24,8 +24,11 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.management.ObjectName;
+
+import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -79,6 +82,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
private ObjectName mxBean;
private SafeModePrecheck safeModePrecheck;
+ private final long pipelineCreateWaitTimeout;
/**
* Constructor.
@@ -117,6 +121,22 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
scm.getScmNodeManager(), scm.getEventQueue(), svcInterval,
serviceTimeout, conf);
safeModePrecheck = new SafeModePrecheck(conf);
+
+ long heartbeatInterval = conf.getTimeDuration(
+ HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL,
+ HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ long commandStatusReportInterval = conf.getTimeDuration(
+ HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL,
+ HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ Preconditions.checkState(heartbeatInterval <= commandStatusReportInterval,
+ "Heartbeat interval is smaller than command status report interval");
+ pipelineCreateWaitTimeout =
+ ((commandStatusReportInterval + heartbeatInterval - 1)
+ / heartbeatInterval) * heartbeatInterval + 5000L;
}
/**
@@ -188,6 +208,15 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
// TODO: #CLUTIL Remove creation logic when all replication types and
// factors are handled by pipeline creator
pipeline = pipelineManager.createPipeline(type, factor);
+ // wait until pipeline is ready
+ long current = System.currentTimeMillis();
+ while (!pipeline.isOpen() && System.currentTimeMillis() <
+ (current + pipelineCreateWaitTimeout)) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
} catch (IOException e) {
LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " +
"get pipelines call once.", type, factor, e);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
index d1479f7..aad5573 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.hdds.scm.command;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CommandStatus;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
@@ -55,13 +53,21 @@ public class CommandStatusReportHandler implements
cmdStatusList.forEach(cmdStatus -> {
LOGGER.trace("Emitting command status for id:{} type: {}", cmdStatus
.getCmdId(), cmdStatus.getType());
- if (cmdStatus.getType() == SCMCommandProto.Type.deleteBlocksCommand) {
+ switch (cmdStatus.getType()) {
+ case deleteBlocksCommand:
if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS,
new DeleteBlockStatus(cmdStatus));
}
- } else {
- LOGGER.debug("CommandStatus of type:{} not handled in " +
+ break;
+ case createPipelineCommand:
+ if (cmdStatus.getStatus() != CommandStatus.Status.PENDING) {
+ publisher.fireEvent(SCMEvents.CREATE_PIPELINE_STATUS,
+ new CreatePipelineStatus(cmdStatus));
+ }
+ break;
+ default:
+ LOGGER.warn("CommandStatus of type:{} not handled in " +
"CommandStatusReportHandler.", cmdStatus.getType());
}
});
@@ -101,4 +107,12 @@ public class CommandStatusReportHandler implements
}
}
+ /**
+ * Wrapper event for CreatePipeline Command Status.
+ */
+ public static class CreatePipelineStatus extends CommandStatusEvent {
+ public CreatePipelineStatus(CommandStatus cmdStatus) {
+ super(cmdStatus);
+ }
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 43d396e..dd476a7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.events;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -40,6 +41,8 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.NodeReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
.NodeRegistrationContainerReport;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
+ .CreatePipelineStatus;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.TypedEvent;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@@ -97,15 +100,14 @@ public final class SCMEvents {
new TypedEvent<>(PipelineReportFromDatanode.class, "Pipeline_Report");
/**
- * PipelineReport processed by pipeline report handler. This event is
+ * Open pipeline event sent by ScmPipelineManager. This event is
* received by HealthyPipelineSafeModeRule.
*/
- public static final TypedEvent<PipelineReportFromDatanode>
- PROCESSED_PIPELINE_REPORT = new TypedEvent<>(
- PipelineReportFromDatanode.class, "Processed_Pipeline_Report");
+ public static final TypedEvent<Pipeline>
+ OPEN_PIPELINE = new TypedEvent<>(Pipeline.class, "Open_Pipeline");
/**
- * PipelineActions are sent by Datanode. This event is received by
+ * PipelineActions are sent by Datanode to close a pipeline. It's received by
* SCMDatanodeHeartbeatDispatcher and PIPELINE_ACTIONS event is generated.
*/
public static final TypedEvent<PipelineActionsFromDatanode>
@@ -113,7 +115,7 @@ public final class SCMEvents {
"Pipeline_Actions");
/**
- * A Command status report will be sent by datanodes. This repoort is received
+ * A Command status report will be sent by datanodes. This report is received
* by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated.
*/
public static final TypedEvent<CommandStatusReportFromDatanode>
@@ -197,6 +199,14 @@ public final class SCMEvents {
new TypedEvent<>(SafeModeStatus.class);
/**
+ * This event is triggered by CommandStatusReportHandler whenever a
+ * status for CreatePipeline SCMCommand is received.
+ */
+ public static final TypedEvent<CreatePipelineStatus>
+ CREATE_PIPELINE_STATUS =
+ new TypedEvent<>(CreatePipelineStatus.class, "Create_Pipeline_Status");
+
+ /**
* Private Ctor. Never Constructed.
*/
private SCMEvents() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index 77e037a..86ad5ee 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -24,7 +24,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+
import java.io.IOException;
import java.util.HashMap;
@@ -39,12 +40,13 @@ public final class PipelineFactory {
private Map<ReplicationType, PipelineProvider> providers;
PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager,
- Configuration conf, GrpcTlsConfig tlsConfig) {
+ Configuration conf, EventPublisher eventPublisher) {
providers = new HashMap<>();
providers.put(ReplicationType.STAND_ALONE,
new SimplePipelineProvider(nodeManager));
providers.put(ReplicationType.RATIS,
- new RatisPipelineProvider(nodeManager, stateManager, conf, tlsConfig));
+ new RatisPipelineProvider(nodeManager, stateManager, conf,
+ eventPublisher));
}
@VisibleForTesting
@@ -63,6 +65,11 @@ public final class PipelineFactory {
return providers.get(type).create(factor, nodes);
}
+ public void close(ReplicationType type, Pipeline pipeline)
+ throws IOException {
+ providers.get(type).close(pipeline);
+ }
+
public void shutdown() {
providers.values().forEach(provider -> provider.shutdown());
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 9ba5f31..e477860 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -21,8 +21,9 @@ package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.hadoop.hdds.server.events.EventHandler;
import java.io.Closeable;
import java.io.IOException;
@@ -33,7 +34,8 @@ import java.util.NavigableSet;
/**
* Interface which exposes the api for pipeline management.
*/
-public interface PipelineManager extends Closeable, PipelineManagerMXBean {
+public interface PipelineManager extends Closeable, PipelineManagerMXBean,
+ EventHandler<CommandStatusReportHandler.CreatePipelineStatus> {
Pipeline createPipeline(ReplicationType type, ReplicationFactor factor)
throws IOException;
@@ -51,6 +53,9 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
ReplicationFactor factor);
List<Pipeline> getPipelines(ReplicationType type,
+ Pipeline.PipelineState state);
+
+ List<Pipeline> getPipelines(ReplicationType type,
ReplicationFactor factor, Pipeline.PipelineState state);
List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor,
@@ -94,6 +99,4 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
* @throws IOException in case of any Exception
*/
void deactivatePipeline(PipelineID pipelineID) throws IOException;
-
- GrpcTlsConfig getGrpcTlsConfig();
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
index a0ce216..c00ff78 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
@@ -33,5 +33,7 @@ public interface PipelineProvider {
Pipeline create(ReplicationFactor factor, List<DatanodeDetails> nodes);
+ void close(Pipeline pipeline) throws IOException;
+
void shutdown();
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
index 2b11da9..6b9a839 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
@@ -26,17 +26,18 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.server
.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Objects;
/**
* Handles Pipeline Reports from datanode.
@@ -52,17 +53,14 @@ public class PipelineReportHandler implements
private final boolean pipelineAvailabilityCheck;
public PipelineReportHandler(SCMSafeModeManager scmSafeModeManager,
- PipelineManager pipelineManager,
- Configuration conf) {
+ PipelineManager pipelineManager, Configuration conf) {
Preconditions.checkNotNull(pipelineManager);
- Objects.requireNonNull(scmSafeModeManager);
this.scmSafeModeManager = scmSafeModeManager;
this.pipelineManager = pipelineManager;
this.conf = conf;
this.pipelineAvailabilityCheck = conf.getBoolean(
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK,
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT);
-
}
@Override
@@ -77,28 +75,26 @@ public class PipelineReportHandler implements
LOGGER.trace("Processing pipeline report for dn: {}", dn);
for (PipelineReport report : pipelineReport.getPipelineReportList()) {
try {
- processPipelineReport(report, dn);
+ processPipelineReport(report, dn, publisher);
} catch (IOException e) {
LOGGER.error("Could not process pipeline report={} from dn={} {}",
report, dn, e);
}
}
- if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) {
- publisher.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
- pipelineReportFromDatanode);
- }
-
}
- private void processPipelineReport(PipelineReport report, DatanodeDetails dn)
- throws IOException {
+ private void processPipelineReport(PipelineReport report, DatanodeDetails dn,
+ EventPublisher publisher) throws IOException {
PipelineID pipelineID = PipelineID.getFromProtobuf(report.getPipelineID());
Pipeline pipeline;
try {
pipeline = pipelineManager.getPipeline(pipelineID);
} catch (PipelineNotFoundException e) {
- RatisPipelineUtils.destroyPipeline(dn, pipelineID, conf,
- pipelineManager.getGrpcTlsConfig());
+ final ClosePipelineCommand closeCommand =
+ new ClosePipelineCommand(pipelineID);
+ final CommandForDatanode datanodeCommand =
+ new CommandForDatanode<>(dn.getUuid(), closeCommand);
+ publisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
return;
}
@@ -108,6 +104,10 @@ public class PipelineReportHandler implements
if (pipeline.isHealthy()) {
// if all the dns have reported, pipeline can be moved to OPEN state
pipelineManager.openPipeline(pipelineID);
+ } else {
+ if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) {
+ publisher.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
+ }
}
} else {
// In OPEN state case just report the datanode
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index 7615057..93fbbd1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -131,9 +131,9 @@ class PipelineStateManager {
throw new IOException("Closed pipeline can not be opened");
}
if (pipeline.getPipelineState() == PipelineState.ALLOCATED) {
+ LOG.info("Pipeline {} moved to OPEN state", pipeline.toString());
pipeline = pipelineStateMap
.updatePipelineState(pipelineId, PipelineState.OPEN);
- LOG.info("Pipeline {} moved to OPEN state", pipeline.toString());
}
return pipeline;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 9409728..b9aff86 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -24,37 +24,29 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+ .SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
-import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.hdds.ratis.RatisHelper;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.function.CheckedBiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -69,6 +61,7 @@ public class RatisPipelineProvider implements PipelineProvider {
private final NodeManager nodeManager;
private final PipelineStateManager stateManager;
private final Configuration conf;
+ private final EventPublisher eventPublisher;
// Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines.
private final int parallelismForPool = 3;
@@ -83,15 +76,14 @@ public class RatisPipelineProvider implements PipelineProvider {
private final ForkJoinPool forkJoinPool = new ForkJoinPool(
parallelismForPool, factory, null, false);
- private final GrpcTlsConfig tlsConfig;
RatisPipelineProvider(NodeManager nodeManager,
PipelineStateManager stateManager, Configuration conf,
- GrpcTlsConfig tlsConfig) {
+ EventPublisher eventPublisher) {
this.nodeManager = nodeManager;
this.stateManager = stateManager;
this.conf = conf;
- this.tlsConfig = tlsConfig;
+ this.eventPublisher = eventPublisher;
}
@@ -155,12 +147,25 @@ public class RatisPipelineProvider implements PipelineProvider {
Pipeline pipeline = Pipeline.newBuilder()
.setId(PipelineID.randomId())
- .setState(PipelineState.OPEN)
+ .setState(PipelineState.ALLOCATED)
.setType(ReplicationType.RATIS)
.setFactor(factor)
.setNodes(dns)
.build();
- initializePipeline(pipeline);
+
+ // Send command to datanode to create pipeline
+ final CreatePipelineCommand createCommand =
+ new CreatePipelineCommand(pipeline.getId(), pipeline.getType(),
+ factor, dns);
+
+ dns.stream().forEach(node -> {
+ final CommandForDatanode datanodeCommand =
+ new CommandForDatanode<>(node.getUuid(), createCommand);
+ LOG.info("Send pipeline:{} create command to datanode {}",
+ pipeline.getId(), datanodeCommand.getDatanodeId());
+ eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+ });
+
return pipeline;
}
@@ -188,67 +193,24 @@ public class RatisPipelineProvider implements PipelineProvider {
}
}
- protected void initializePipeline(Pipeline pipeline) throws IOException {
+ /**
+ * Removes pipeline from SCM. Sends command to destroy pipeline on all
+ * the datanodes.
+ *
+ * @param pipeline - Pipeline to be destroyed
+ * @throws IOException
+ */
+ public void close(Pipeline pipeline) {
final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
- LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group);
- callRatisRpc(pipeline.getNodes(),
- (raftClient, peer) -> {
- RaftClientReply reply = raftClient.groupAdd(group, peer.getId());
- if (reply == null || !reply.isSuccess()) {
- String msg = "Pipeline initialization failed for pipeline:"
- + pipeline.getId() + " node:" + peer.getId();
- LOG.error(msg);
- throw new IOException(msg);
- }
- });
- }
-
- private void callRatisRpc(List<DatanodeDetails> datanodes,
- CheckedBiConsumer< RaftClient, RaftPeer, IOException> rpc)
- throws IOException {
- if (datanodes.isEmpty()) {
- return;
- }
-
- final String rpcType = conf
- .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
- ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
- final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf);
- final List< IOException > exceptions =
- Collections.synchronizedList(new ArrayList<>());
- final int maxOutstandingRequests =
- HddsClientUtils.getMaxOutstandingRequests(conf);
- final TimeDuration requestTimeout =
- RatisHelper.getClientRequestTimeout(conf);
- try {
- forkJoinPool.submit(() -> {
- datanodes.parallelStream().forEach(d -> {
- final RaftPeer p = RatisHelper.toRaftPeer(d);
- try (RaftClient client = RatisHelper
- .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
- retryPolicy, maxOutstandingRequests, tlsConfig,
- requestTimeout)) {
- rpc.accept(client, p);
- } catch (IOException ioe) {
- String errMsg =
- "Failed invoke Ratis rpc " + rpc + " for " + d.getUuid();
- LOG.error(errMsg, ioe);
- exceptions.add(new IOException(errMsg, ioe));
- }
- });
- }).get();
- } catch (ExecutionException | RejectedExecutionException ex) {
- LOG.error(ex.getClass().getName() + " exception occurred during " +
- "createPipeline", ex);
- throw new IOException(ex.getClass().getName() + " exception occurred " +
- "during createPipeline", ex);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupt exception occurred during " +
- "createPipeline", ex);
- }
- if (!exceptions.isEmpty()) {
- throw MultipleIOException.createIOException(exceptions);
- }
+ LOG.debug("Destroy pipeline:{} with {} ", pipeline.getId(), group);
+ final ClosePipelineCommand closeCommand =
+ new ClosePipelineCommand(pipeline.getId());
+ pipeline.getNodes().stream().forEach(node -> {
+ final CommandForDatanode datanodeCommand =
+ new CommandForDatanode<>(node.getUuid(), closeCommand);
+ LOG.info("Send pipeline:{} close command to datanode {}",
+ pipeline.getId(), datanodeCommand.getDatanodeId());
+ eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+ });
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
deleted file mode 100644
index 777a0b0..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.pipeline;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.ratis.RatisHelper;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.util.TimeDuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Utility class for Ratis pipelines. Contains methods to create and destroy
- * ratis pipelines.
- */
-public final class RatisPipelineUtils {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(RatisPipelineUtils.class);
-
- private RatisPipelineUtils() {
- }
- /**
- * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
- * the datanodes.
- *
- * @param pipeline - Pipeline to be destroyed
- * @param ozoneConf - Ozone configuration
- * @param grpcTlsConfig
- * @throws IOException
- */
- static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf,
- GrpcTlsConfig grpcTlsConfig) {
- final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
- LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
- for (DatanodeDetails dn : pipeline.getNodes()) {
- try {
- destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig);
- } catch (IOException e) {
- LOG.warn("Pipeline destroy failed for pipeline={} dn={}",
- pipeline.getId(), dn);
- }
- }
- }
-
- /**
- * Sends ratis command to destroy pipeline on the given datanode.
- *
- * @param dn - Datanode on which pipeline needs to be destroyed
- * @param pipelineID - ID of pipeline to be destroyed
- * @param ozoneConf - Ozone configuration
- * @param grpcTlsConfig - grpc tls configuration
- * @throws IOException
- */
- static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
- Configuration ozoneConf, GrpcTlsConfig grpcTlsConfig) throws IOException {
- final String rpcType = ozoneConf
- .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
- ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
- final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
- final RaftPeer p = RatisHelper.toRaftPeer(dn);
- final int maxOutstandingRequests =
- HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
- final TimeDuration requestTimeout =
- RatisHelper.getClientRequestTimeout(ozoneConf);
- try(RaftClient client = RatisHelper
- .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
- retryPolicy, maxOutstandingRequests, grpcTlsConfig,
- requestTimeout)) {
- client.groupRemove(RaftGroupId.valueOf(pipelineID.getId()),
- true, p.getId());
- }
- }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 0964f6d..405baa8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -21,23 +21,30 @@ package org.apache.hadoop.hdds.scm.pipeline;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.CreatePipelineACKProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.CommandStatus;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
+ .CreatePipelineStatus;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.MetadataStore;
import org.apache.hadoop.hdds.utils.MetadataStoreBuilder;
import org.apache.hadoop.hdds.utils.Scheduler;
-import org.apache.ratis.grpc.GrpcTlsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,18 +88,20 @@ public class SCMPipelineManager implements PipelineManager {
private final NodeManager nodeManager;
private final SCMPipelineMetrics metrics;
private final Configuration conf;
+ private final StorageContainerManager scm;
+ private boolean pipelineAvailabilityCheck;
+ private boolean createPipelineInSafemode;
// Pipeline Manager MXBean
private ObjectName pmInfoBean;
- private GrpcTlsConfig grpcTlsConfig;
public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
- EventPublisher eventPublisher, GrpcTlsConfig grpcTlsConfig)
+ EventPublisher eventPublisher, StorageContainerManager scm)
throws IOException {
this.lock = new ReentrantReadWriteLock();
this.conf = conf;
this.stateManager = new PipelineStateManager(conf);
this.pipelineFactory = new PipelineFactory(nodeManager, stateManager,
- conf, grpcTlsConfig);
+ conf, eventPublisher);
// TODO: See if thread priority needs to be set for these threads
scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
this.backgroundPipelineCreator =
@@ -113,8 +122,14 @@ public class SCMPipelineManager implements PipelineManager {
this.metrics = SCMPipelineMetrics.create();
this.pmInfoBean = MBeans.register("SCMPipelineManager",
"SCMPipelineManagerInfo", this);
+ this.scm = scm;
+ this.pipelineAvailabilityCheck = conf.getBoolean(
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK,
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT);
+ this.createPipelineInSafemode = conf.getBoolean(
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
initializePipelineState();
- this.grpcTlsConfig = grpcTlsConfig;
}
public PipelineStateManager getStateManager() {
@@ -130,6 +145,9 @@ public class SCMPipelineManager implements PipelineManager {
private void initializePipelineState() throws IOException {
if (pipelineStore.isEmpty()) {
LOG.info("No pipeline exists in current db");
+ if (pipelineAvailabilityCheck && createPipelineInSafemode) {
+ startPipelineCreator();
+ }
return;
}
List<Map.Entry<byte[], byte[]>> pipelines =
@@ -148,8 +166,8 @@ public class SCMPipelineManager implements PipelineManager {
}
@Override
- public synchronized Pipeline createPipeline(
- ReplicationType type, ReplicationFactor factor) throws IOException {
+ public synchronized Pipeline createPipeline(ReplicationType type,
+ ReplicationFactor factor) throws IOException {
lock.writeLock().lock();
try {
Pipeline pipeline = pipelineFactory.create(type, factor);
@@ -157,8 +175,11 @@ public class SCMPipelineManager implements PipelineManager {
pipeline.getProtobufMessage().toByteArray());
stateManager.addPipeline(pipeline);
nodeManager.addPipeline(pipeline);
- metrics.incNumPipelineCreated();
- metrics.createPerPipelineMetrics(pipeline);
+ metrics.incNumPipelineAllocated();
+ if (pipeline.isOpen()) {
+ metrics.incNumPipelineCreated();
+ metrics.createPerPipelineMetrics(pipeline);
+ }
return pipeline;
} catch (InsufficientDatanodesException idEx) {
throw idEx;
@@ -225,6 +246,16 @@ public class SCMPipelineManager implements PipelineManager {
}
}
+ public List<Pipeline> getPipelines(ReplicationType type,
+ Pipeline.PipelineState state) {
+ lock.readLock().lock();
+ try {
+ return stateManager.getPipelines(type, state);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
@Override
public List<Pipeline> getPipelines(ReplicationType type,
ReplicationFactor factor, Pipeline.PipelineState state) {
@@ -293,7 +324,9 @@ public class SCMPipelineManager implements PipelineManager {
lock.writeLock().lock();
try {
Pipeline pipeline = stateManager.openPipeline(pipelineId);
- metrics.createPerPipelineMetrics(pipeline);
+ if (pipelineAvailabilityCheck && scm != null && scm.isInSafeMode()) {
+ eventPublisher.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
+ }
} finally {
lock.writeLock().unlock();
}
@@ -408,7 +441,7 @@ public class SCMPipelineManager implements PipelineManager {
* @throws IOException
*/
private void destroyPipeline(Pipeline pipeline) throws IOException {
- RatisPipelineUtils.destroyPipeline(pipeline, conf, grpcTlsConfig);
+ pipelineFactory.close(pipeline.getType(), pipeline);
// remove the pipeline from the pipeline manager
removePipeline(pipeline.getId());
triggerPipelineCreation();
@@ -441,11 +474,6 @@ public class SCMPipelineManager implements PipelineManager {
}
@Override
- public GrpcTlsConfig getGrpcTlsConfig() {
- return grpcTlsConfig;
- }
-
- @Override
public void close() throws IOException {
if (scheduler != null) {
scheduler.close();
@@ -466,4 +494,63 @@ public class SCMPipelineManager implements PipelineManager {
// shutdown pipeline provider.
pipelineFactory.shutdown();
}
+
+ @Override
+ public void onMessage(CreatePipelineStatus response,
+ EventPublisher publisher) {
+ CommandStatus result = response.getCmdStatus();
+ CreatePipelineACKProto ack = result.getCreatePipelineAck();
+ PipelineID pipelineID = PipelineID.getFromProtobuf(ack.getPipelineID());
+ CommandStatus.Status status = result.getStatus();
+ LOG.info("receive pipeline {} create status {}", pipelineID, status);
+ Pipeline pipeline;
+ try {
+ pipeline = stateManager.getPipeline(pipelineID);
+ } catch (PipelineNotFoundException e) {
+ LOG.warn("Pipeline {} cannot be found", pipelineID);
+ return;
+ }
+
+ switch (status) {
+ case EXECUTED:
+ DatanodeDetails dn = nodeManager.getNodeByUuid(ack.getDatanodeUUID());
+ if (dn == null) {
+ LOG.warn("Datanode {} for Pipeline {} cannot be found",
+ ack.getDatanodeUUID(), pipelineID);
+ return;
+ }
+ try {
+ pipeline.reportDatanode(dn);
+ } catch (IOException e) {
+ LOG.warn("Update {} for Pipeline {} failed for {}",
+ dn.getUuidString(), pipelineID, e.getMessage());
+ return;
+ }
+ // If all datanodes are updated, we believe pipeline is ready to OPEN
+ if (pipeline.isHealthy() ||
+ pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
+ try {
+ openPipeline(pipelineID);
+ } catch (IOException e) {
+ LOG.warn("Fail to open Pipeline {} for {}", pipelineID,
+ e.getMessage());
+ return;
+ }
+ metrics.incNumPipelineCreated();
+ metrics.createPerPipelineMetrics(pipeline);
+ }
+ return;
+ case FAILED:
+ metrics.incNumPipelineCreationFailed();
+ try {
+ finalizeAndDestroyPipeline(pipeline, false);
+ } catch (IOException e) {
+ LOG.warn("Fail to close Pipeline {} for {}", pipelineID,
+ e.getMessage());
+ }
+ return;
+ default:
+ LOG.error("Unknown or unexpected status {}", status);
+ }
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
index d0f7f6e..fa91572 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
@@ -46,6 +46,7 @@ public final class SCMPipelineMetrics implements MetricsSource {
private MetricsRegistry registry;
+ private @Metric MutableCounterLong numPipelineAllocated;
private @Metric MutableCounterLong numPipelineCreated;
private @Metric MutableCounterLong numPipelineCreationFailed;
private @Metric MutableCounterLong numPipelineDestroyed;
@@ -83,6 +84,7 @@ public final class SCMPipelineMetrics implements MetricsSource {
@SuppressWarnings("SuspiciousMethodCalls")
public void getMetrics(MetricsCollector collector, boolean all) {
MetricsRecordBuilder recordBuilder = collector.addRecord(SOURCE_NAME);
+ numPipelineAllocated.snapshot(recordBuilder, true);
numPipelineCreated.snapshot(recordBuilder, true);
numPipelineCreationFailed.snapshot(recordBuilder, true);
numPipelineDestroyed.snapshot(recordBuilder, true);
@@ -94,6 +96,7 @@ public final class SCMPipelineMetrics implements MetricsSource {
}
void createPerPipelineMetrics(Pipeline pipeline) {
+ System.out.println("add pipeline " + pipeline.getId() + " to metrics map");
numBlocksAllocated.put(pipeline.getId(), new MutableCounterLong(Interns
.info(getBlockAllocationMetricName(pipeline),
"Number of blocks allocated in pipeline " + pipeline.getId()), 0L));
@@ -117,6 +120,14 @@ public final class SCMPipelineMetrics implements MetricsSource {
}
/**
+ * Increments number of pipeline allocation count, including succeeded
+ * and failed.
+ */
+ void incNumPipelineAllocated() {
+ numPipelineAllocated.incr();
+ }
+
+ /**
* Increments number of successful pipeline creation count.
*/
void incNumPipelineCreated() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index 54e2141..a772a97 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -74,6 +74,11 @@ public class SimplePipelineProvider implements PipelineProvider {
}
@Override
+ public void close(Pipeline pipeline) throws IOException {
+
+ }
+
+ @Override
public void shutdown() {
// Do nothing.
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 7a00d76..5304270 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -20,17 +20,10 @@ package org.apache.hadoop.hdds.scm.safemode;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
-
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -38,9 +31,6 @@ import org.apache.hadoop.hdds.server.events.TypedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashSet;
-import java.util.Set;
-
/**
* Class defining Safe mode exit criteria for Pipelines.
*
@@ -49,43 +39,52 @@ import java.util.Set;
* through in a cluster.
*/
public class HealthyPipelineSafeModeRule
- extends SafeModeExitRule<PipelineReportFromDatanode>{
+ extends SafeModeExitRule<Pipeline>{
public static final Logger LOG =
LoggerFactory.getLogger(HealthyPipelineSafeModeRule.class);
- private final PipelineManager pipelineManager;
private final int healthyPipelineThresholdCount;
private int currentHealthyPipelineCount = 0;
- private final Set<DatanodeDetails> processedDatanodeDetails =
- new HashSet<>();
HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
PipelineManager pipelineManager,
SCMSafeModeManager manager, Configuration configuration) {
super(manager, ruleName, eventQueue);
- this.pipelineManager = pipelineManager;
double healthyPipelinesPercent =
configuration.getDouble(HddsConfigKeys.
HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT,
HddsConfigKeys.
HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT);
+ int minHealthyPipelines = 0;
+
+ boolean createPipelineInSafemode = configuration.getBoolean(
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
+
+ if (createPipelineInSafemode) {
+ minHealthyPipelines =
+ configuration.getInt(HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_PIPELINE,
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_PIPELINE_DEFAULT);
+ }
+
Preconditions.checkArgument(
(healthyPipelinesPercent >= 0.0 && healthyPipelinesPercent <= 1.0),
HddsConfigKeys.
HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT
+ " value should be >= 0.0 and <= 1.0");
- // As we want to wait for 3 node pipelines
- int pipelineCount =
+ // As we want to wait for RATIS write pipelines, no matter ONE or THREE
+ int pipelineCount = pipelineManager.getPipelines(
+ HddsProtos.ReplicationType.RATIS, Pipeline.PipelineState.OPEN).size() +
pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
- HddsProtos.ReplicationFactor.THREE).size();
+ Pipeline.PipelineState.ALLOCATED).size();
// This value will be zero when pipeline count is 0.
// On a fresh installed cluster, there will be zero pipelines in the SCM
// pipeline DB.
- healthyPipelineThresholdCount =
- (int) Math.ceil(healthyPipelinesPercent * pipelineCount);
+ healthyPipelineThresholdCount = Math.max(minHealthyPipelines,
+ (int) Math.ceil(healthyPipelinesPercent * pipelineCount));
LOG.info(" Total pipeline count is {}, healthy pipeline " +
"threshold count is {}", pipelineCount, healthyPipelineThresholdCount);
@@ -95,67 +94,41 @@ public class HealthyPipelineSafeModeRule
}
@Override
- protected TypedEvent<PipelineReportFromDatanode> getEventType() {
- return SCMEvents.PROCESSED_PIPELINE_REPORT;
+ protected TypedEvent<Pipeline> getEventType() {
+ return SCMEvents.OPEN_PIPELINE;
}
@Override
protected boolean validate() {
if (currentHealthyPipelineCount >= healthyPipelineThresholdCount) {
+ LOG.info("{} rule satisfied", this.getClass().getSimpleName());
return true;
}
return false;
}
@Override
- protected void process(PipelineReportFromDatanode
- pipelineReportFromDatanode) {
+ protected void process(Pipeline pipeline) {
// When SCM is in safe mode for long time, already registered
- // datanode can send pipeline report again, then pipeline handler fires
- // processed report event, we should not consider this pipeline report
- // from datanode again during threshold calculation.
- Preconditions.checkNotNull(pipelineReportFromDatanode);
- DatanodeDetails dnDetails = pipelineReportFromDatanode.getDatanodeDetails();
- if (!processedDatanodeDetails.contains(
- pipelineReportFromDatanode.getDatanodeDetails())) {
-
- Pipeline pipeline;
- PipelineReportsProto pipelineReport =
- pipelineReportFromDatanode.getReport();
-
- for (PipelineReport report : pipelineReport.getPipelineReportList()) {
- PipelineID pipelineID = PipelineID
- .getFromProtobuf(report.getPipelineID());
- try {
- pipeline = pipelineManager.getPipeline(pipelineID);
- } catch (PipelineNotFoundException e) {
- continue;
- }
-
- if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
- pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
- // If the pipeline is open state mean, all 3 datanodes are reported
- // for this pipeline.
- currentHealthyPipelineCount++;
- getSafeModeMetrics().incCurrentHealthyPipelinesCount();
- }
- }
- if (scmInSafeMode()) {
- SCMSafeModeManager.getLogger().info(
- "SCM in safe mode. Healthy pipelines reported count is {}, " +
- "required healthy pipeline reported count is {}",
- currentHealthyPipelineCount, healthyPipelineThresholdCount);
- }
-
- processedDatanodeDetails.add(dnDetails);
+ // datanode can send pipeline report again, or SCMPipelineManager will
+ // create new pipelines.
+ Preconditions.checkNotNull(pipeline);
+ if (pipeline.getType() == HddsProtos.ReplicationType.RATIS) {
+ currentHealthyPipelineCount++;
+ getSafeModeMetrics().incCurrentHealthyPipelinesCount();
}
+ if (scmInSafeMode()) {
+ SCMSafeModeManager.getLogger().info(
+ "SCM in safe mode. Healthy pipelines reported count is {}, " +
+ "required healthy pipeline reported count is {}",
+ currentHealthyPipelineCount, healthyPipelineThresholdCount);
+ }
}
@Override
protected void cleanup() {
- processedDatanodeDetails.clear();
}
@VisibleForTesting
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
index 841d8ff..34ba35c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
@@ -22,17 +22,10 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.
- PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.server.events.TypedEvent;
import org.slf4j.Logger;
@@ -47,14 +40,13 @@ import java.util.Set;
* replica available for read when we exit safe mode.
*/
public class OneReplicaPipelineSafeModeRule extends
- SafeModeExitRule<PipelineReportFromDatanode> {
+ SafeModeExitRule<Pipeline> {
private static final Logger LOG =
LoggerFactory.getLogger(OneReplicaPipelineSafeModeRule.class);
private int thresholdCount;
private Set<PipelineID> reportedPipelineIDSet = new HashSet<>();
- private final PipelineManager pipelineManager;
private int currentReportedPipelineCount = 0;
@@ -62,7 +54,6 @@ public class OneReplicaPipelineSafeModeRule extends
PipelineManager pipelineManager,
SCMSafeModeManager safeModeManager, Configuration configuration) {
super(safeModeManager, ruleName, eventQueue);
- this.pipelineManager = pipelineManager;
double percent =
configuration.getDouble(
@@ -75,69 +66,59 @@ public class OneReplicaPipelineSafeModeRule extends
HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT +
" value should be >= 0.0 and <= 1.0");
+ // Exclude CLOSED pipeline
int totalPipelineCount =
pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
- HddsProtos.ReplicationFactor.THREE).size();
+ HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
+ .size() +
+ pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE,
+ Pipeline.PipelineState.ALLOCATED).size();
thresholdCount = (int) Math.ceil(percent * totalPipelineCount);
- LOG.info(" Total pipeline count is {}, pipeline's with atleast one " +
+ LOG.info("Total pipeline count is {}, pipeline's with at least one " +
"datanode reported threshold count is {}", totalPipelineCount,
thresholdCount);
getSafeModeMetrics().setNumPipelinesWithAtleastOneReplicaReportedThreshold(
thresholdCount);
-
}
@Override
- protected TypedEvent<PipelineReportFromDatanode> getEventType() {
- return SCMEvents.PROCESSED_PIPELINE_REPORT;
+ protected TypedEvent<Pipeline> getEventType() {
+ return SCMEvents.OPEN_PIPELINE;
}
@Override
protected boolean validate() {
if (currentReportedPipelineCount >= thresholdCount) {
+ LOG.info("{} rule satisfied", this.getClass().getSimpleName());
return true;
}
return false;
}
@Override
- protected void process(PipelineReportFromDatanode
- pipelineReportFromDatanode) {
- Pipeline pipeline;
- Preconditions.checkNotNull(pipelineReportFromDatanode);
- PipelineReportsProto pipelineReport =
- pipelineReportFromDatanode.getReport();
-
- for (PipelineReport report : pipelineReport.getPipelineReportList()) {
- PipelineID pipelineID = PipelineID
- .getFromProtobuf(report.getPipelineID());
- try {
- pipeline = pipelineManager.getPipeline(pipelineID);
- } catch (PipelineNotFoundException e) {
- continue;
- }
-
- if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
- !reportedPipelineIDSet.contains(pipelineID)) {
- reportedPipelineIDSet.add(pipelineID);
- getSafeModeMetrics()
- .incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount();
- }
+ protected void process(Pipeline pipeline) {
+ Preconditions.checkNotNull(pipeline);
+ if (pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
+ pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
+ !reportedPipelineIDSet.contains(pipeline.getId())) {
+ reportedPipelineIDSet.add(pipeline.getId());
+ getSafeModeMetrics()
+ .incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount();
}
currentReportedPipelineCount = reportedPipelineIDSet.size();
if (scmInSafeMode()) {
SCMSafeModeManager.getLogger().info(
- "SCM in safe mode. Pipelines with atleast one datanode reported " +
- "count is {}, required atleast one datanode reported per " +
+ "SCM in safe mode. Pipelines with at least one datanode reported " +
+ "count is {}, required at least one datanode reported per " +
"pipeline count is {}",
currentReportedPipelineCount, thresholdCount);
}
-
}
@Override
@@ -154,5 +135,4 @@ public class OneReplicaPipelineSafeModeRule extends
public int getCurrentReportedPipelineCount() {
return currentReportedPipelineCount;
}
-
-}
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
index a22d162..1bad4cb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
@@ -59,17 +59,17 @@ import org.slf4j.LoggerFactory;
* number of datanode registered is met or not.
*
* 3. HealthyPipelineSafeModeRule:
- * Once the pipelineReportHandler processes the
- * {@link SCMEvents#PIPELINE_REPORT}, it fires
- * {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this
+ * Once the SCMPipelineManager processes the
+ * {@link SCMEvents#CREATE_PIPELINE_STATUS}, it fires
+ * {@link SCMEvents#OPEN_PIPELINE}. This rule handles this
* event. This rule processes this report, and check if pipeline is healthy
* and increments current healthy pipeline count. Then validate it cutoff
* threshold for healthy pipeline is met or not.
*
* 4. OneReplicaPipelineSafeModeRule:
- * Once the pipelineReportHandler processes the
- * {@link SCMEvents#PIPELINE_REPORT}, it fires
- * {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this
+ * Once the SCMPipelineManager processes the
+ * {@link SCMEvents#CREATE_PIPELINE_STATUS}, it fires
+ * {@link SCMEvents#OPEN_PIPELINE}. This rule handles this
* event. This rule processes this report, and add the reported pipeline to
* reported pipeline set. Then validate it cutoff threshold for one replica
* per pipeline is met or not.
@@ -166,6 +166,7 @@ public class SCMSafeModeManager {
if (exitRules.get(ruleName) != null) {
validatedRules.add(ruleName);
+ LOG.info("{} rule is successfully validated", ruleName);
} else {
// This should never happen
LOG.error("No Such Exit rule {}", ruleName);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
index b9e5333..6128266 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
@@ -129,7 +129,8 @@ public class SafeModeHandler implements EventHandler<SafeModeStatus> {
List<Pipeline> pipelineList = scmPipelineManager.getPipelines();
pipelineList.forEach((pipeline) -> {
try {
- if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
+ if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
+ pipeline.isAllocationTimeout()) {
scmPipelineManager.finalizeAndDestroyPipeline(pipeline, false);
}
} catch (IOException ex) {
@@ -142,6 +143,4 @@ public class SafeModeHandler implements EventHandler<SafeModeStatus> {
public boolean getSafeModeStatus() {
return isInSafeMode.get();
}
-
-
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index 9f6077b..3dbb4cb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -164,6 +164,7 @@ public final class SCMDatanodeHeartbeatDispatcher {
}
if (heartbeat.getCommandStatusReportsCount() != 0) {
+ LOG.debug("Dispatching Command Status Report.");
for (CommandStatusReportsProto commandStatusReport : heartbeat
.getCommandStatusReportsList()) {
eventPublisher.fireEvent(
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 6dd9dab..11c1a44 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -57,6 +57,12 @@ import static org.apache.hadoop.hdds.protocol.proto
.Type.closeContainerCommand;
import static org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto
+ .Type.closePipelineCommand;
+import static org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto
+ .Type.createPipelineCommand;
+import static org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto
.Type.deleteBlocksCommand;
import static org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto
@@ -90,6 +96,8 @@ import org.apache.hadoop.ozone.audit.Auditor;
import org.apache.hadoop.ozone.audit.SCMAction;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
@@ -350,6 +358,18 @@ public class SCMDatanodeProtocolServer implements
.setReplicateContainerCommandProto(
((ReplicateContainerCommand)cmd).getProto())
.build();
+ case createPipelineCommand:
+ return builder
+ .setCommandType(createPipelineCommand)
+ .setCreatePipelineCommandProto(
+ ((CreatePipelineCommand)cmd).getProto())
+ .build();
+ case closePipelineCommand:
+ return builder
+ .setCommandType(closePipelineCommand)
+ .setClosePipelineCommandProto(
+ ((ClosePipelineCommand)cmd).getProto())
+ .build();
default:
throw new IllegalArgumentException("Scm command " +
cmd.getType().toString() + " is not implemented");
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index c25b3a0..7c2cffa 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -356,6 +356,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler);
eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS, safeModeHandler);
+ eventQueue.addHandler(SCMEvents.CREATE_PIPELINE_STATUS, pipelineManager);
registerMXBean();
registerMetricsSource(this);
}
@@ -402,8 +403,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
pipelineManager = configurator.getPipelineManager();
} else {
pipelineManager =
- new SCMPipelineManager(conf, scmNodeManager, eventQueue,
- grpcTlsConfig);
+ new SCMPipelineManager(conf, scmNodeManager, eventQueue, this);
}
if (configurator.getContainerManager() != null) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 37321d7..46e8a51 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.hdds.scm.server
.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
@@ -41,6 +43,12 @@ import org.apache.hadoop.hdds.scm.server
.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
+ .CreatePipelineStatus;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.CreatePipelineACKProto;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
@@ -360,6 +368,32 @@ public final class TestUtils {
return new PipelineReportFromDatanode(dn, reportBuilder.build());
}
+ public static CreatePipelineStatus getPipelineCreateStatusFromDatanode(
+ DatanodeDetails dn, PipelineID pipelineID) {
+ CreatePipelineACKProto ack = CreatePipelineACKProto.newBuilder()
+ .setPipelineID(pipelineID.getProtobuf())
+ .setDatanodeUUID(dn.getUuidString())
+ .build();
+ CommandStatus status = CommandStatus.newBuilder()
+ .setCreatePipelineAck(ack)
+ .setStatus(CommandStatus.Status.EXECUTED)
+ .setCmdId(0L)
+ .setType(Type.createPipelineCommand)
+ .build();
+ return new CreatePipelineStatus(status);
+ }
+
+ public static void openAllRatisPipelines(PipelineManager pipelineManager)
+ throws IOException {
+ // Pipeline is created by background thread
+ List<Pipeline> pipelines =
+ pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS);
+ // Trigger the processed pipeline report event
+ for (Pipeline pipeline : pipelines) {
+ pipelineManager.openPipeline(pipeline.getId());
+ }
+ }
+
public static PipelineActionsFromDatanode getPipelineActionFromDatanode(
DatanodeDetails dn, PipelineID... pipelineIDs) {
PipelineActionsProto.Builder actionsProtoBuilder =
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index a8364a4..ba3d4f4 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
@@ -80,6 +81,8 @@ public class TestCloseContainerEventHandler {
eventQueue.addHandler(CLOSE_CONTAINER,
new CloseContainerEventHandler(pipelineManager, containerManager));
eventQueue.addHandler(DATANODE_COMMAND, nodeManager);
+ // Move all pipelines created by background from ALLOCATED to OPEN state
+ TestUtils.openAllRatisPipelines(pipelineManager);
}
@AfterClass
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 7657b54..14c24e0 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -73,6 +73,7 @@ public class TestDeadNodeHandler {
private SCMNodeManager nodeManager;
private ContainerManager containerManager;
private NodeReportHandler nodeReportHandler;
+ private SCMPipelineManager pipelineManager;
private DeadNodeHandler deadNodeHandler;
private EventPublisher publisher;
private EventQueue eventQueue;
@@ -87,12 +88,12 @@ public class TestDeadNodeHandler {
eventQueue = new EventQueue();
scm = HddsTestUtils.getScm(conf);
nodeManager = (SCMNodeManager) scm.getScmNodeManager();
- SCMPipelineManager manager =
+ pipelineManager =
(SCMPipelineManager)scm.getPipelineManager();
PipelineProvider mockRatisProvider =
- new MockRatisPipelineProvider(nodeManager, manager.getStateManager(),
- conf);
- manager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+ new MockRatisPipelineProvider(nodeManager,
+ pipelineManager.getStateManager(), conf);
+ pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
containerManager = scm.getContainerManager();
deadNodeHandler = new DeadNodeHandler(nodeManager,
@@ -147,6 +148,8 @@ public class TestDeadNodeHandler {
nodeManager.register(TestUtils.randomDatanodeDetails(),
TestUtils.createNodeReport(storageOne), null);
+ TestUtils.openAllRatisPipelines(pipelineManager);
+
ContainerInfo container1 =
TestUtils.allocateContainer(containerManager);
ContainerInfo container2 =
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index d028851..dd61102 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -121,6 +121,7 @@ public class TestSCMNodeManager {
testDir.getAbsolutePath());
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
TimeUnit.MILLISECONDS);
+ conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
return conf;
}
@@ -1035,9 +1036,11 @@ public class TestSCMNodeManager {
eq.processAll(1000L);
List<SCMCommand> command =
nodemanager.processHeartbeat(datanodeDetails);
- Assert.assertEquals(1, command.size());
- Assert
- .assertEquals(command.get(0).getClass(), CloseContainerCommand.class);
+ // With dh registered, SCM will send create pipeline command to dn
+ Assert.assertTrue(command.size() >= 1);
+ Assert.assertTrue(command.get(0).getClass().equals(
+ CloseContainerCommand.class) ||
+ command.get(1).getClass().equals(CloseContainerCommand.class));
} catch (IOException e) {
e.printStackTrace();
throw e;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
index 01c53ba..28a3484 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.server.events.EventQueue;
import java.io.IOException;
@@ -31,7 +32,7 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
public MockRatisPipelineProvider(NodeManager nodeManager,
PipelineStateManager stateManager,
Configuration conf) {
- super(nodeManager, stateManager, conf, null);
+ super(nodeManager, stateManager, conf, new EventQueue());
}
protected void initializePipeline(Pipeline pipeline) throws IOException {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
index 94c3039..639fc9a 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
@@ -22,10 +22,6 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.
- StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.
- StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -34,7 +30,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
@@ -68,7 +63,8 @@ public class TestHealthyPipelineSafeModeRule {
// enable pipeline check
config.setBoolean(
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+ config.setBoolean(
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
nodeManager, eventQueue, null);
@@ -88,10 +84,8 @@ public class TestHealthyPipelineSafeModeRule {
} finally {
FileUtil.fullyDelete(new File(storageDir));
}
-
}
-
@Test
public void testHealthyPipelineSafeModeRuleWithPipelines() throws Exception {
@@ -113,7 +107,8 @@ public class TestHealthyPipelineSafeModeRule {
// enable pipeline check
config.setBoolean(
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+ config.setBoolean(
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
nodeManager, eventQueue, null);
@@ -162,7 +157,6 @@ public class TestHealthyPipelineSafeModeRule {
} finally {
FileUtil.fullyDelete(new File(storageDir));
}
-
}
@@ -188,7 +182,8 @@ public class TestHealthyPipelineSafeModeRule {
// enable pipeline check
config.setBoolean(
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+ config.setBoolean(
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
nodeManager, eventQueue, null);
@@ -217,7 +212,7 @@ public class TestHealthyPipelineSafeModeRule {
scmSafeModeManager.getHealthyPipelineSafeModeRule();
- // No datanodes have sent pipelinereport from datanode
+ // No pipeline event have sent to SCMSafemodeManager
Assert.assertFalse(healthyPipelineSafeModeRule.validate());
@@ -225,14 +220,14 @@ public class TestHealthyPipelineSafeModeRule {
GenericTestUtils.LogCapturer.captureLogs(LoggerFactory.getLogger(
SCMSafeModeManager.class));
- // fire event with pipeline report with ratis type and factor 1
+ // fire event with pipeline create status with ratis type and factor 1
// pipeline, validate() should return false
firePipelineEvent(pipeline1, eventQueue);
GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains(
- "reported count is 0"),
+ "reported count is 1"),
1000, 5000);
- Assert.assertFalse(healthyPipelineSafeModeRule.validate());
+ Assert.assertTrue(healthyPipelineSafeModeRule.validate());
firePipelineEvent(pipeline2, eventQueue);
firePipelineEvent(pipeline3, eventQueue);
@@ -246,19 +241,7 @@ public class TestHealthyPipelineSafeModeRule {
}
-
private void firePipelineEvent(Pipeline pipeline, EventQueue eventQueue) {
- PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
- .newBuilder();
-
- reportBuilder.addPipelineReport(PipelineReport.newBuilder()
- .setPipelineID(pipeline.getId().getProtobuf()));
-
- // Here no need to fire event from 3 nodes, as already pipeline is in
- // open state, but doing it.
- eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
- new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
- pipeline.getNodes().get(0), reportBuilder.build()));
+ eventQueue.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
}
-
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
index ca54d05..7f8f0db 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
@@ -20,10 +20,6 @@ package org.apache.hadoop.hdds.scm.safemode;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -32,7 +28,6 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
@@ -63,6 +58,8 @@ public class TestOneReplicaPipelineSafeModeRule {
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
ozoneConfiguration.set(HddsConfigKeys.OZONE_METADATA_DIRS,
folder.newFolder().toString());
+ ozoneConfiguration.setBoolean(
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
List<ContainerInfo> containers = new ArrayList<>();
containers.addAll(HddsTestUtils.getContainerInfo(1));
@@ -123,7 +120,6 @@ public class TestOneReplicaPipelineSafeModeRule {
firePipelineEvent(pipelines.get(pipelineFactorThreeCount - 1));
GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000);
-
}
@@ -170,11 +166,8 @@ public class TestOneReplicaPipelineSafeModeRule {
firePipelineEvent(pipelines.get(pipelineCountThree - 1));
GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000);
-
}
-
-
private void createPipelines(int count,
HddsProtos.ReplicationFactor factor) throws Exception {
for (int i = 0; i < count; i++) {
@@ -184,26 +177,6 @@ public class TestOneReplicaPipelineSafeModeRule {
}
private void firePipelineEvent(Pipeline pipeline) {
- PipelineReportsProto.Builder reportBuilder =
- PipelineReportsProto.newBuilder();
-
- reportBuilder.addPipelineReport(PipelineReport.newBuilder()
- .setPipelineID(pipeline.getId().getProtobuf()));
-
- if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE) {
- eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
- new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
- pipeline.getNodes().get(0), reportBuilder.build()));
- eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
- new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
- pipeline.getNodes().get(1), reportBuilder.build()));
- eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
- new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
- pipeline.getNodes().get(2), reportBuilder.build()));
- } else {
- eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
- new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
- pipeline.getNodes().get(0), reportBuilder.build()));
- }
+ eventQueue.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
}
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index ba92035..0de0a73 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -31,8 +31,6 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -42,7 +40,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
@@ -175,7 +172,7 @@ public class TestSCMSafeModeManager {
HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT, healthyPercent);
conf.setDouble(HddsConfigKeys.
HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT, oneReplicaPercent);
-
+ conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
return conf;
}
@@ -300,12 +297,12 @@ public class TestSCMSafeModeManager {
// we shall a get an event when datanode is registered. In that case,
// validate will return true, and add this to validatedRules.
if (Math.max(healthyPipelinePercent, oneReplicaThresholdCount) == 0) {
- firePipelineEvent(pipelines.get(0));
+ firePipelineEvent(pipelineManager, pipelines.get(0));
}
for (int i = 0; i < Math.max(healthyPipelineThresholdCount,
- oneReplicaThresholdCount); i++) {
- firePipelineEvent(pipelines.get(i));
+ Math.min(oneReplicaThresholdCount, pipelines.size())); i++) {
+ firePipelineEvent(pipelineManager, pipelines.get(i));
if (i < healthyPipelineThresholdCount) {
checkHealthy(i + 1);
@@ -350,15 +347,11 @@ public class TestSCMSafeModeManager {
1000, 5000);
}
- private void firePipelineEvent(Pipeline pipeline) throws Exception {
- PipelineReportsProto.Builder reportBuilder =
- PipelineReportsProto.newBuilder();
-
- reportBuilder.addPipelineReport(PipelineReport.newBuilder()
- .setPipelineID(pipeline.getId().getProtobuf()));
- queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
- new PipelineReportFromDatanode(pipeline.getNodes().get(0),
- reportBuilder.build()));
+ private void firePipelineEvent(SCMPipelineManager pipelineManager,
+ Pipeline pipeline) throws Exception {
+ pipelineManager.openPipeline(pipeline.getId());
+ queue.fireEvent(SCMEvents.OPEN_PIPELINE,
+ pipelineManager.getPipeline(pipeline.getId()));
}
@@ -488,10 +481,6 @@ public class TestSCMSafeModeManager {
Pipeline pipeline = pipelineManager.createPipeline(
HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
- PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
- .newBuilder();
- reportBuilder.addPipelineReport(PipelineReport.newBuilder()
- .setPipelineID(pipeline.getId().getProtobuf()));
scmSafeModeManager = new SCMSafeModeManager(
config, containers, pipelineManager, queue);
@@ -500,10 +489,9 @@ public class TestSCMSafeModeManager {
HddsTestUtils.createNodeRegistrationContainerReport(containers));
assertTrue(scmSafeModeManager.getInSafeMode());
- // Trigger the processed pipeline report event
- queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
- new PipelineReportFromDatanode(pipeline.getNodes().get(0),
- reportBuilder.build()));
+
+
+ firePipelineEvent(pipelineManager, pipeline);
GenericTestUtils.waitFor(() -> {
return !scmSafeModeManager.getInSafeMode();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
index e4f1a37..378a1a6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
@@ -164,7 +164,9 @@ public class TestContainerStateManagerIntegration {
}
}
- cluster.restartStorageContainerManager(true);
+ // Restart SCM will not trigger container report to satisfy the safe mode
+ // exit rule.
+ cluster.restartStorageContainerManager(false);
List<ContainerInfo> result = cluster.getStorageContainerManager()
.getContainerManager().listContainer(null, 100);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index c583559..210dbb2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -81,7 +81,7 @@ public class TestPipelineClose {
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1000,
TimeUnit.MILLISECONDS);
- pipelineDestroyTimeoutInMillis = 5000;
+ pipelineDestroyTimeoutInMillis = 10000;
conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS);
cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 00144e4..dedc56a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -45,7 +46,9 @@ public class TestRatisPipelineProvider {
@Before
public void init() throws Exception {
nodeManager = new MockNodeManager(true, 10);
- stateManager = new PipelineStateManager(new OzoneConfiguration());
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
+ stateManager = new PipelineStateManager(conf);
provider = new MockRatisPipelineProvider(nodeManager,
stateManager, new OzoneConfiguration());
}
@@ -57,7 +60,7 @@ public class TestRatisPipelineProvider {
Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getPipelineState(),
- Pipeline.PipelineState.OPEN);
+ Pipeline.PipelineState.ALLOCATED);
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
Pipeline pipeline1 = provider.create(factor);
stateManager.addPipeline(pipeline1);
@@ -68,7 +71,7 @@ public class TestRatisPipelineProvider {
Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline1.getFactor(), factor);
Assert.assertEquals(pipeline1.getPipelineState(),
- Pipeline.PipelineState.OPEN);
+ Pipeline.PipelineState.ALLOCATED);
Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
}
@@ -80,7 +83,7 @@ public class TestRatisPipelineProvider {
Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getPipelineState(),
- Pipeline.PipelineState.OPEN);
+ Pipeline.PipelineState.ALLOCATED);
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
factor = HddsProtos.ReplicationFactor.ONE;
@@ -94,7 +97,7 @@ public class TestRatisPipelineProvider {
Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline1.getFactor(), factor);
Assert.assertEquals(pipeline1.getPipelineState(),
- Pipeline.PipelineState.OPEN);
+ Pipeline.PipelineState.ALLOCATED);
Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
}
@@ -183,7 +186,7 @@ public class TestRatisPipelineProvider {
Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getPipelineState(),
- Pipeline.PipelineState.OPEN);
+ Pipeline.PipelineState.ALLOCATED);
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
List<DatanodeDetails> pipelineNodes = pipeline.getNodes();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 2a486b1..2e1fe9c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -28,10 +28,10 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
+ .CreatePipelineStatus;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
@@ -42,7 +42,6 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -61,6 +60,8 @@ public class TestSCMPipelineManager {
testDir = GenericTestUtils
.getTestDir(TestSCMPipelineManager.class.getSimpleName());
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ conf.set(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK,
+ "false");
boolean folderExisted = testDir.exists() || testDir.mkdirs();
if (!folderExisted) {
throw new IOException("Unable to create test directory path");
@@ -157,30 +158,24 @@ public class TestSCMPipelineManager {
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
- SCMSafeModeManager scmSafeModeManager =
- new SCMSafeModeManager(new OzoneConfiguration(),
- new ArrayList<>(), pipelineManager, eventQueue);
-
// create a pipeline in allocated state with no dns yet reported
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
+
Assert
.assertFalse(pipelineManager.getPipeline(pipeline.getId()).isHealthy());
Assert
- .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen());
+ .assertFalse(pipelineManager.getPipeline(pipeline.getId()).isOpen());
- // get pipeline report from each dn in the pipeline
- PipelineReportHandler pipelineReportHandler =
- new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf);
+ // get pipeline create status from each dn in the pipeline
for (DatanodeDetails dn: pipeline.getNodes()) {
- PipelineReportFromDatanode pipelineReportFromDatanode =
- TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId());
// pipeline is not healthy until all dns report
Assert.assertFalse(
pipelineManager.getPipeline(pipeline.getId()).isHealthy());
- pipelineReportHandler
- .onMessage(pipelineReportFromDatanode, new EventQueue());
+ CreatePipelineStatus response =
+ TestUtils.getPipelineCreateStatusFromDatanode(dn, pipeline.getId());
+ pipelineManager.onMessage(response, eventQueue);
}
// pipeline is healthy when all dns report
@@ -194,11 +189,10 @@ public class TestSCMPipelineManager {
pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
for (DatanodeDetails dn: pipeline.getNodes()) {
- PipelineReportFromDatanode pipelineReportFromDatanode =
- TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId());
- // pipeline report for destroyed pipeline should be ignored
- pipelineReportHandler
- .onMessage(pipelineReportFromDatanode, new EventQueue());
+ CreatePipelineStatus response =
+ TestUtils.getPipelineCreateStatusFromDatanode(dn, pipeline.getId());
+ // pipeline create status for destroyed pipeline should be ignored
+ pipelineManager.onMessage(response, new EventQueue());
}
try {
@@ -226,9 +220,9 @@ public class TestSCMPipelineManager {
MetricsRecordBuilder metrics = getMetrics(
SCMPipelineMetrics.class.getSimpleName());
- long numPipelineCreated = getLongCounter("NumPipelineCreated",
+ long numPipelineAllocated = getLongCounter("NumPipelineAllocated",
metrics);
- Assert.assertTrue(numPipelineCreated == 0);
+ Assert.assertTrue(numPipelineAllocated == 0);
// 3 DNs are unhealthy.
// Create 5 pipelines (Use up 15 Datanodes)
@@ -241,8 +235,8 @@ public class TestSCMPipelineManager {
metrics = getMetrics(
SCMPipelineMetrics.class.getSimpleName());
- numPipelineCreated = getLongCounter("NumPipelineCreated", metrics);
- Assert.assertTrue(numPipelineCreated == 5);
+ numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
+ Assert.assertTrue(numPipelineAllocated == 5);
long numPipelineCreateFailed = getLongCounter(
"NumPipelineCreationFailed", metrics);
@@ -261,8 +255,8 @@ public class TestSCMPipelineManager {
metrics = getMetrics(
SCMPipelineMetrics.class.getSimpleName());
- numPipelineCreated = getLongCounter("NumPipelineCreated", metrics);
- Assert.assertTrue(numPipelineCreated == 5);
+ numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
+ Assert.assertTrue(numPipelineAllocated == 5);
numPipelineCreateFailed = getLongCounter(
"NumPipelineCreationFailed", metrics);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index ac76482..9845abb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -65,6 +65,7 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
.HEALTHY;
@@ -143,11 +144,16 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
final int healthy = scm.getNodeCount(HEALTHY);
- final boolean isReady = healthy == hddsDatanodes.size();
+ final boolean isNodeReady = healthy == hddsDatanodes.size();
+ final boolean exitSafeMode = !scm.isInSafeMode();
+
LOG.info("{}. Got {} of {} DN Heartbeats.",
- isReady? "Cluster is ready" : "Waiting for cluster to be ready",
+ isNodeReady? "Nodes are ready" : "Waiting for nodes to be ready",
+ healthy, hddsDatanodes.size());
+ LOG.info(exitSafeMode? "Cluster exits safe mode" :
+ "Waiting for cluster to exit safe mode",
healthy, hddsDatanodes.size());
- return isReady;
+ return isNodeReady && exitSafeMode;
}, 1000, waitForClusterToBeReadyTimeout);
}
@@ -615,11 +621,15 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
if (hbInterval.isPresent()) {
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
hbInterval.get(), TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL,
+ hbInterval.get(), TimeUnit.MILLISECONDS);
} else {
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
DEFAULT_HB_INTERVAL_MS,
TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL,
+ DEFAULT_HB_INTERVAL_MS, TimeUnit.MILLISECONDS);
}
if (hbProcessorInterval.isPresent()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: hdfs-commits-help@hadoop.apache.org