You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2018/09/06 21:36:21 UTC
[1/2] hadoop git commit: HDDS-297. Add pipeline actions in Ozone.
Contributed by Mukul Kumar Singh and Shashikant Banerjee
Repository: hadoop
Updated Branches:
refs/heads/trunk fa2945e7a -> b3161c4dd
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
new file mode 100644
index 0000000..7e3969c
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerMapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+ .ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
+ .ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
+ .ReplicationType.RATIS;
+
+/**
+ * Test Node failure detection and handling in Ratis.
+ */
+public class TestNodeFailure {
+
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration conf;
+ private static ContainerWithPipeline ratisContainer1;
+ private static ContainerWithPipeline ratisContainer2;
+ private static ContainerMapping mapping;
+ private static long timeForFailure;
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ *
+ * @throws IOException
+ */
+ @BeforeClass
+ public static void init() throws Exception {
+ conf = new OzoneConfiguration();
+ conf.setTimeDuration(OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY,
+ 10, TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT,
+ 10, TimeUnit.SECONDS);
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(6)
+ .setHbInterval(1000)
+ .setHbProcessorInterval(1000)
+ .build();
+ cluster.waitForClusterToBeReady();
+ StorageContainerManager scm = cluster.getStorageContainerManager();
+ mapping = (ContainerMapping)scm.getScmContainerManager();
+ ratisContainer1 = mapping.allocateContainer(RATIS, THREE, "testOwner");
+ ratisContainer2 = mapping.allocateContainer(RATIS, THREE, "testOwner");
+ // At this stage, there should be 2 pipeline one with 1 open container each.
+ // Try closing the both the pipelines, one with a closed container and
+ // the other with an open container.
+ timeForFailure = conf.getTimeDuration(
+ OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY,
+ OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT
+ .getDuration(), TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @AfterClass
+ public static void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testPipelineFail() throws InterruptedException, IOException,
+ TimeoutException {
+ Assert.assertEquals(ratisContainer1.getPipeline().getLifeCycleState(),
+ HddsProtos.LifeCycleState.OPEN);
+ Pipeline pipelineToFail = ratisContainer1.getPipeline();
+ DatanodeDetails dnToFail = pipelineToFail.getMachines().get(0);
+ cluster.shutdownHddsDatanode(dnToFail);
+
+ // wait for sufficient time for the callback to be triggered
+ Thread.sleep(3 * timeForFailure);
+
+ Assert.assertEquals(HddsProtos.LifeCycleState.CLOSED,
+ ratisContainer1.getPipeline().getLifeCycleState());
+ Assert.assertEquals(HddsProtos.LifeCycleState.OPEN,
+ ratisContainer2.getPipeline().getLifeCycleState());
+ Assert.assertNull(
+ mapping.getPipelineSelector().getPipeline(pipelineToFail.getId()));
+ // Now restart the datanode and make sure that a new pipeline is created.
+ cluster.restartHddsDatanode(dnToFail);
+ ContainerWithPipeline ratisContainer3 =
+ mapping.allocateContainer(RATIS, THREE, "testOwner");
+ //Assert that new container is not created from the ratis 2 pipeline
+ Assert.assertNotEquals(ratisContainer3.getPipeline().getId(),
+ ratisContainer2.getPipeline().getId());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index eb15396..0f8f925 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -112,8 +112,7 @@ public class TestPipelineClose {
pipelineSelector.finalizePipeline(ratisContainer1.getPipeline());
Pipeline pipeline1 = pipelineSelector
- .getPipeline(ratisContainer1.getPipeline().getId(),
- ratisContainer1.getContainerInfo().getReplicationType());
+ .getPipeline(ratisContainer1.getPipeline().getId());
Assert.assertNull(pipeline1);
Assert.assertEquals(ratisContainer1.getPipeline().getLifeCycleState(),
HddsProtos.LifeCycleState.CLOSED);
@@ -140,8 +139,7 @@ public class TestPipelineClose {
Assert.assertEquals(ratisContainer2.getPipeline().getLifeCycleState(),
HddsProtos.LifeCycleState.CLOSING);
Pipeline pipeline2 = pipelineSelector
- .getPipeline(ratisContainer2.getPipeline().getId(),
- ratisContainer2.getContainerInfo().getReplicationType());
+ .getPipeline(ratisContainer2.getPipeline().getId());
Assert.assertEquals(pipeline2.getLifeCycleState(),
HddsProtos.LifeCycleState.CLOSING);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index ae6a91e..e11cf9b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.om.OzoneManager;
@@ -156,6 +157,13 @@ public interface MiniOzoneCluster {
TimeoutException;
/**
+ * Restart a particular HddsDatanode.
+ *
+ * @param dn HddsDatanode in the MiniOzoneCluster
+ */
+ void restartHddsDatanode(DatanodeDetails dn) throws InterruptedException,
+ TimeoutException, IOException;
+ /**
* Shutdown a particular HddsDatanode.
*
* @param i index of HddsDatanode in the MiniOzoneCluster
@@ -163,6 +171,13 @@ public interface MiniOzoneCluster {
void shutdownHddsDatanode(int i);
/**
+ * Shutdown a particular HddsDatanode.
+ *
+ * @param dn HddsDatanode in the MiniOzoneCluster
+ */
+ void shutdownHddsDatanode(DatanodeDetails dn) throws IOException;
+
+ /**
* Shutdown the MiniOzoneCluster.
*/
void shutdown();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index e06e2f6..7b9bb0e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -157,6 +157,16 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
return hddsDatanodes;
}
+ private int getHddsDatanodeIndex(DatanodeDetails dn) throws IOException {
+ for (HddsDatanodeService service : hddsDatanodes) {
+ if (service.getDatanodeDetails().equals(dn)) {
+ return hddsDatanodes.indexOf(service);
+ }
+ }
+ throw new IOException(
+ "Not able to find datanode with datanode Id " + dn.getUuid());
+ }
+
@Override
public OzoneClient getClient() throws IOException {
return OzoneClientFactory.getClient(conf);
@@ -243,11 +253,22 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
}
@Override
+ public void restartHddsDatanode(DatanodeDetails dn)
+ throws InterruptedException, TimeoutException, IOException {
+ restartHddsDatanode(getHddsDatanodeIndex(dn));
+ }
+
+ @Override
public void shutdownHddsDatanode(int i) {
hddsDatanodes.get(i).stop();
}
@Override
+ public void shutdownHddsDatanode(DatanodeDetails dn) throws IOException {
+ shutdownHddsDatanode(getHddsDatanodeIndex(dn));
+ }
+
+ @Override
public void shutdown() {
try {
LOG.info("Shutting down the Mini Ozone Cluster");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
index 8b324b5..b53e683 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
@@ -156,7 +156,8 @@ public class TestCSMMetrics {
conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
final ContainerDispatcher dispatcher = new TestContainerDispatcher();
- return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher);
+ return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher,
+ null);
}
static void initXceiverServerRatis(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
index ebcc930..3abc8f8 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
@@ -138,7 +138,8 @@ public class TestContainerServer {
conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
final ContainerDispatcher dispatcher = new TestContainerDispatcher();
- return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher);
+ return XceiverServerRatis
+ .newXceiverServerRatis(dn, conf, dispatcher, null);
}
static void initXceiverServerRatis(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java
index 448742e..8c83fd3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java
@@ -69,7 +69,7 @@ public class TestContainerStateMachine {
new ArrayBlockingQueue<>(1024),
new ThreadPoolExecutor.CallerRunsPolicy());
private ContainerStateMachine stateMachine =
- new ContainerStateMachine(new TestContainerDispatcher(), executor);
+ new ContainerStateMachine(new TestContainerDispatcher(), executor, null);
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: HDDS-297. Add pipeline actions in Ozone.
Contributed by Mukul Kumar Singh and Shashikant Banerjee
Posted by sz...@apache.org.
HDDS-297. Add pipeline actions in Ozone. Contributed by Mukul Kumar Singh and Shashikant Banerjee
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b3161c4d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b3161c4d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b3161c4d
Branch: refs/heads/trunk
Commit: b3161c4dd9367c68b30528a63c03756eaa32aaf9
Parents: fa2945e
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Thu Sep 6 14:35:07 2018 -0700
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Thu Sep 6 14:35:07 2018 -0700
----------------------------------------------------------------------
.../apache/hadoop/hdds/scm/XceiverClient.java | 8 +-
.../hadoop/hdds/scm/XceiverClientGrpc.java | 9 +-
.../hadoop/hdds/scm/XceiverClientRatis.java | 36 +++-
.../scm/client/ContainerOperationClient.java | 2 +-
.../org/apache/hadoop/hdds/HddsConfigKeys.java | 5 +
.../apache/hadoop/hdds/scm/ScmConfigKeys.java | 11 +
.../hadoop/hdds/scm/XceiverClientSpi.java | 10 +-
.../apache/hadoop/ozone/OzoneConfigKeys.java | 12 ++
.../main/java/org/apache/ratis/RatisHelper.java | 22 +-
.../common/src/main/resources/ozone-default.xml | 27 +++
.../common/statemachine/StateContext.java | 45 ++++
.../states/endpoint/HeartbeatEndpointTask.java | 28 +++
.../server/ratis/ContainerStateMachine.java | 17 +-
.../server/ratis/XceiverServerRatis.java | 205 ++++++++++++++-----
.../container/ozoneimpl/OzoneContainer.java | 2 +-
.../StorageContainerDatanodeProtocol.proto | 26 +++
.../hdds/scm/container/ContainerMapping.java | 27 ++-
.../scm/container/ContainerStateManager.java | 5 +-
.../hadoop/hdds/scm/container/Mapping.java | 14 ++
.../hadoop/hdds/scm/events/SCMEvents.java | 24 ++-
.../hadoop/hdds/scm/node/StaleNodeHandler.java | 16 +-
.../hdds/scm/pipelines/Node2PipelineMap.java | 34 +--
.../pipelines/PipelineActionEventHandler.java | 60 ++++++
.../scm/pipelines/PipelineCloseHandler.java | 38 ++++
.../hdds/scm/pipelines/PipelineManager.java | 10 +-
.../hdds/scm/pipelines/PipelineSelector.java | 46 ++---
.../scm/pipelines/ratis/RatisManagerImpl.java | 14 +-
.../standalone/StandaloneManagerImpl.java | 7 +-
.../server/SCMDatanodeHeartbeatDispatcher.java | 23 +++
.../scm/server/StorageContainerManager.java | 13 +-
.../hdds/scm/pipeline/TestNode2PipelineMap.java | 6 +-
.../hdds/scm/pipeline/TestNodeFailure.java | 126 ++++++++++++
.../hdds/scm/pipeline/TestPipelineClose.java | 6 +-
.../apache/hadoop/ozone/MiniOzoneCluster.java | 15 ++
.../hadoop/ozone/MiniOzoneClusterImpl.java | 21 ++
.../transport/server/ratis/TestCSMMetrics.java | 3 +-
.../container/server/TestContainerServer.java | 3 +-
.../server/TestContainerStateMachine.java | 2 +-
38 files changed, 815 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
index 5022618..5f2fe26 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
@@ -186,15 +186,17 @@ public class XceiverClient extends XceiverClientSpi {
/**
* Create a pipeline.
- *
- * @param ignored - pipeline to be created.
*/
@Override
- public void createPipeline(Pipeline ignored)
+ public void createPipeline()
throws IOException {
// For stand alone pipeline, there is no notion called setup pipeline.
}
+ public void destroyPipeline() {
+ // For stand alone pipeline, there is no notion called destroy pipeline.
+ }
+
/**
* Returns pipeline Type.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 1622ddb..3cdbc7c 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -216,15 +216,16 @@ public class XceiverClientGrpc extends XceiverClientSpi {
/**
* Create a pipeline.
- *
- * @param ignored - pipeline to be created.
*/
@Override
- public void createPipeline(Pipeline ignored)
- throws IOException {
+ public void createPipeline() {
// For stand alone pipeline, there is no notion called setup pipeline.
}
+ public void destroyPipeline() {
+ // For stand alone pipeline, there is no notion called destroy pipeline.
+ }
+
/**
* Returns pipeline Type.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 2cb319f..499f94d 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -88,13 +88,27 @@ public final class XceiverClientRatis extends XceiverClientSpi {
/**
* {@inheritDoc}
*/
- public void createPipeline(Pipeline pipeline)
+ public void createPipeline()
throws IOException {
RaftGroupId groupId = pipeline.getId().getRaftGroupID();
RaftGroup group = RatisHelper.newRaftGroup(groupId, pipeline.getMachines());
LOG.debug("initializing pipeline:{} with nodes:{}",
pipeline.getId(), group.getPeers());
- reinitialize(pipeline.getMachines(), group);
+ reinitialize(pipeline.getMachines(), RatisHelper.emptyRaftGroup(), group);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void destroyPipeline()
+ throws IOException {
+ RaftGroupId groupId = pipeline.getId().getRaftGroupID();
+ RaftGroup currentGroup =
+ RatisHelper.newRaftGroup(groupId, pipeline.getMachines());
+ LOG.debug("destroying pipeline:{} with nodes:{}",
+ pipeline.getId(), currentGroup.getPeers());
+ reinitialize(pipeline.getMachines(), currentGroup,
+ RatisHelper.emptyRaftGroup());
}
/**
@@ -107,8 +121,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
return HddsProtos.ReplicationType.RATIS;
}
- private void reinitialize(List<DatanodeDetails> datanodes, RaftGroup group)
- throws IOException {
+ private void reinitialize(List<DatanodeDetails> datanodes, RaftGroup oldGroup,
+ RaftGroup newGroup) throws IOException {
if (datanodes.isEmpty()) {
return;
}
@@ -116,7 +130,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
IOException exception = null;
for (DatanodeDetails d : datanodes) {
try {
- reinitialize(d, group);
+ reinitialize(d, oldGroup, newGroup);
} catch (IOException ioe) {
if (exception == null) {
exception = new IOException(
@@ -135,14 +149,18 @@ public final class XceiverClientRatis extends XceiverClientSpi {
* Adds a new peers to the Ratis Ring.
*
* @param datanode - new datanode
- * @param group - Raft group
+ * @param oldGroup - previous Raft group
+ * @param newGroup - new Raft group
* @throws IOException - on Failure.
*/
- private void reinitialize(DatanodeDetails datanode, RaftGroup group)
+ private void reinitialize(DatanodeDetails datanode, RaftGroup oldGroup,
+ RaftGroup newGroup)
throws IOException {
final RaftPeer p = RatisHelper.toRaftPeer(datanode);
- try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) {
- client.reinitialize(group, p.getId());
+ try (RaftClient client = oldGroup == RatisHelper.emptyRaftGroup() ?
+ RatisHelper.newRaftClient(rpcType, p) :
+ RatisHelper.newRaftClient(rpcType, p, oldGroup)) {
+ client.reinitialize(newGroup, p.getId());
} catch (IOException ioe) {
LOG.error("Failed to reinitialize RaftPeer:{} datanode: {} ",
p, datanode, ioe);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
index 8c8cb95..fed589c 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
@@ -180,7 +180,7 @@ public class ContainerOperationClient implements ScmClient {
// ObjectStageChangeRequestProto.Op.create,
// ObjectStageChangeRequestProto.Stage.begin);
- client.createPipeline(pipeline);
+ client.createPipeline();
//storageContainerLocationClient.notifyObjectStageChange(
// ObjectStageChangeRequestProto.Type.pipeline,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 3bda29f..4dc7e0a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -56,6 +56,11 @@ public final class HddsConfigKeys {
public static final int HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT =
20;
+ public static final String HDDS_PIPELINE_ACTION_MAX_LIMIT =
+ "hdds.pipeline.action.max.limit";
+ public static final int HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT =
+ 20;
+
// Configuration to allow volume choosing policy.
public static final String HDDS_DATANODE_VOLUME_CHOOSING_POLICY =
"hdds.datanode.volume.choosing.policy";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 62d9ef5..22ba714 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.util.TimeDuration;
import java.util.concurrent.TimeUnit;
@@ -57,6 +58,10 @@ public final class ScmConfigKeys {
= "dfs.container.ratis.num.write.chunk.threads";
public static final int DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT
= 60;
+ public static final String DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY
+ = "dfs.container.ratis.replication.level";
+ public static final ReplicationLevel
+ DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT = ReplicationLevel.MAJORITY;
public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY =
"dfs.container.ratis.segment.size";
public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT =
@@ -76,6 +81,12 @@ public final class ScmConfigKeys {
DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT =
TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
+ public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY =
+ "dfs.ratis.server.failure.duration";
+ public static final TimeDuration
+ DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT =
+ TimeDuration.valueOf(120, TimeUnit.SECONDS);
+
// TODO : this is copied from OzoneConsts, may need to move to a better place
public static final String OZONE_SCM_CHUNK_SIZE_KEY = "ozone.scm.chunk.size";
// 16 MB by default
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index b3b0da2..e8ef5c5 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -111,10 +111,14 @@ public abstract class XceiverClientSpi implements Closeable {
/**
* Create a pipeline.
- *
- * @param pipeline - pipeline to be created.
*/
- public abstract void createPipeline(Pipeline pipeline) throws IOException;
+ public abstract void createPipeline() throws IOException;
+
+ /**
+ * Destroy a pipeline.
+ * @throws IOException
+ */
+ public abstract void destroyPipeline() throws IOException;
/**
* Returns pipeline Type.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 0f2b108..f07d599 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.util.TimeDuration;
/**
@@ -214,6 +215,11 @@ public final class OzoneConfigKeys {
= ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY;
public static final int DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT
= ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT;
+ public static final String DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY
+ = ScmConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY;
+ public static final ReplicationLevel
+ DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT
+ = ScmConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT;
public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY
= ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY;
public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT
@@ -237,6 +243,12 @@ public final class OzoneConfigKeys {
DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT =
ScmConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT;
+ public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY =
+ ScmConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY;
+ public static final TimeDuration
+ DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT =
+ ScmConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT;
+
public static final String OZONE_SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL =
"ozone.web.authentication.kerberos.principal";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
index 9c25e20..48fdd64 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
@@ -30,6 +30,7 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.proto.RaftProtos;
import org.apache.ratis.util.SizeInBytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +40,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.UUID;
import java.util.stream.Collectors;
/**
@@ -48,8 +50,19 @@ public interface RatisHelper {
Logger LOG = LoggerFactory.getLogger(RatisHelper.class);
static String toRaftPeerIdString(DatanodeDetails id) {
- return id.getUuidString() + "_" +
- id.getPort(DatanodeDetails.Port.Name.RATIS).getValue();
+ return id.getUuidString();
+ }
+
+ static UUID toDatanodeId(String peerIdString) {
+ return UUID.fromString(peerIdString);
+ }
+
+ static UUID toDatanodeId(RaftPeerId peerId) {
+ return toDatanodeId(peerId.toString());
+ }
+
+ static UUID toDatanodeId(RaftProtos.RaftPeerProto peerId) {
+ return toDatanodeId(RaftPeerId.valueOf(peerId.getId()));
}
static String toRaftPeerAddressString(DatanodeDetails id) {
@@ -117,6 +130,11 @@ public interface RatisHelper {
newRaftGroup(new ArrayList<>(Arrays.asList(leader))));
}
+ static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
+ RaftGroup group) {
+ return newRaftClient(rpcType, leader.getId(), group);
+ }
+
static RaftClient newRaftClient(
RpcType rpcType, RaftPeerId leader, RaftGroup group) {
LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 2112ae3..778d641 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -127,6 +127,15 @@
</description>
</property>
<property>
+ <name>dfs.container.ratis.replication.level</name>
+ <value>MAJORITY</value>
+ <tag>OZONE, RATIS</tag>
+ <description>Replication level to be used by datanode for submitting a
+ container command to ratis. Available replication levels are ALL and
+ MAJORTIY, MAJORITY is used as the default replication level.
+ </description>
+ </property>
+ <property>
<name>dfs.container.ratis.segment.size</name>
<value>1073741824</value>
<tag>OZONE, RATIS, PERFORMANCE</tag>
@@ -155,6 +164,15 @@
<description>The timeout duration for ratis server request.</description>
</property>
<property>
+ <name>dfs.ratis.server.failure.duration</name>
+ <value>120s</value>
+ <tag>OZONE, RATIS, MANAGEMENT</tag>
+ <description>The timeout duration for ratis server failure detection,
+ once the threshold has reached, the ratis state machine will be informed
+ about the failure in the ratis ring
+ </description>
+ </property>
+ <property>
<name>hdds.node.report.interval</name>
<value>60000ms</value>
<tag>OZONE, CONTAINER, MANAGEMENT</tag>
@@ -1105,6 +1123,15 @@
</property>
<property>
+ <name>hdds.pipeline.action.max.limit</name>
+ <value>20</value>
+ <tag>DATANODE</tag>
+ <description>
+ Maximum number of Pipeline Actions sent by the datanode to SCM in a
+ single heartbeat.
+ </description>
+ </property>
+ <property>
<name>hdds.scm.watcher.timeout</name>
<value>10m</value>
<tag>OZONE, SCM, MANAGEMENT</tag>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index a342294..c2d5421 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -21,6 +21,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineAction;
+import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
@@ -66,6 +68,7 @@ public class StateContext {
private final Configuration conf;
private final Queue<GeneratedMessage> reports;
private final Queue<ContainerAction> containerActions;
+ private final Queue<PipelineAction> pipelineActions;
private DatanodeStateMachine.DatanodeStates state;
/**
@@ -91,6 +94,7 @@ public class StateContext {
cmdStatusMap = new ConcurrentHashMap<>();
reports = new LinkedList<>();
containerActions = new LinkedList<>();
+ pipelineActions = new LinkedList<>();
lock = new ReentrantLock();
stateExecutionCount = new AtomicLong(0);
}
@@ -257,6 +261,47 @@ public class StateContext {
}
/**
+ * Add PipelineAction to PipelineAction queue if it's not present.
+ *
+ * @param pipelineAction PipelineAction to be added
+ */
+ public void addPipelineActionIfAbsent(PipelineAction pipelineAction) {
+ synchronized (pipelineActions) {
+ /**
+ * If pipelineAction queue already contains entry for the pipeline id
+ * with same action, we should just return.
+ * Note: We should not use pipelineActions.contains(pipelineAction) here
+ * as, pipelineAction has a msg string. So even if two msgs differ though
+ * action remains same on the given pipeline, it will end up adding it
+ * multiple times here.
+ */
+ for (PipelineAction pipelineActionIter : pipelineActions) {
+ if (pipelineActionIter.getAction() == pipelineAction.getAction()
+ && pipelineActionIter.hasClosePipeline() && pipelineAction
+ .hasClosePipeline()
+ && pipelineActionIter.getClosePipeline().getPipelineID()
+ == pipelineAction.getClosePipeline().getPipelineID()) {
+ return;
+ }
+ }
+ pipelineActions.add(pipelineAction);
+ }
+ }
+
+ /**
+ * Returns pending PipelineActions from the PipelineAction queue with a
+ * max limit on list size, or empty list if the queue is empty.
+ *
+ * @return List<ContainerAction>
+ */
+ public List<PipelineAction> getPendingPipelineAction(int maxLimit) {
+ synchronized (pipelineActions) {
+ return pipelineActions.parallelStream().limit(maxLimit)
+ .collect(Collectors.toList());
+ }
+ }
+
+ /**
* Returns the next task to get executed by the datanode state machine.
* @return A callable that will be executed by the
* {@link DatanodeStateMachine}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 020fb71..5769e6d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -25,6 +25,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineActionsProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineAction;
+import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerAction;
@@ -57,6 +61,10 @@ import static org.apache.hadoop.hdds.HddsConfigKeys
.HDDS_CONTAINER_ACTION_MAX_LIMIT;
import static org.apache.hadoop.hdds.HddsConfigKeys
.HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+ .HDDS_PIPELINE_ACTION_MAX_LIMIT;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+ .HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT;
/**
* Heartbeat class for SCMs.
@@ -70,6 +78,7 @@ public class HeartbeatEndpointTask
private DatanodeDetailsProto datanodeDetailsProto;
private StateContext context;
private int maxContainerActionsPerHB;
+ private int maxPipelineActionsPerHB;
/**
* Constructs a SCM heart beat.
@@ -83,6 +92,8 @@ public class HeartbeatEndpointTask
this.context = context;
this.maxContainerActionsPerHB = conf.getInt(HDDS_CONTAINER_ACTION_MAX_LIMIT,
HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT);
+ this.maxPipelineActionsPerHB = conf.getInt(HDDS_PIPELINE_ACTION_MAX_LIMIT,
+ HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT);
}
/**
@@ -121,6 +132,7 @@ public class HeartbeatEndpointTask
.setDatanodeDetails(datanodeDetailsProto);
addReports(requestBuilder);
addContainerActions(requestBuilder);
+ addPipelineActions(requestBuilder);
SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
.sendHeartbeat(requestBuilder.build());
processResponse(reponse, datanodeDetailsProto);
@@ -169,6 +181,22 @@ public class HeartbeatEndpointTask
}
}
+ /**
+ * Adds all the pending PipelineActions to the heartbeat.
+ *
+ * @param requestBuilder builder to which the report has to be added.
+ */
+ private void addPipelineActions(
+ SCMHeartbeatRequestProto.Builder requestBuilder) {
+ List<PipelineAction> actions = context.getPendingPipelineAction(
+ maxPipelineActionsPerHB);
+ if (!actions.isEmpty()) {
+ PipelineActionsProto pap = PipelineActionsProto.newBuilder()
+ .addAllPipelineActions(actions)
+ .build();
+ requestBuilder.setPipelineActions(pap);
+ }
+ }
/**
* Returns a builder class for HeartbeatEndpointTask task.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 68d6d5b..1636f24 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.shaded.com.google.protobuf
@@ -42,6 +43,7 @@ import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
import org.apache.ratis.statemachine.StateMachineStorage;
@@ -115,6 +117,7 @@ public class ContainerStateMachine extends BaseStateMachine {
= new SimpleStateMachineStorage();
private final ContainerDispatcher dispatcher;
private ThreadPoolExecutor chunkExecutor;
+ private final XceiverServerRatis ratisServer;
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
writeChunkFutureMap;
private final ConcurrentHashMap<Long, StateMachineHelper> stateMachineMap;
@@ -124,9 +127,10 @@ public class ContainerStateMachine extends BaseStateMachine {
private final CSMMetrics metrics;
public ContainerStateMachine(ContainerDispatcher dispatcher,
- ThreadPoolExecutor chunkExecutor) {
+ ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer) {
this.dispatcher = dispatcher;
this.chunkExecutor = chunkExecutor;
+ this.ratisServer = ratisServer;
this.writeChunkFutureMap = new ConcurrentHashMap<>();
this.stateMachineMap = new ConcurrentHashMap<>();
metrics = CSMMetrics.create();
@@ -401,6 +405,17 @@ public class ContainerStateMachine extends BaseStateMachine {
}
@Override
+ public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
+ ratisServer.handleNodeSlowness(group, roleInfoProto);
+ }
+
+ @Override
+ public void notifyExtendedNoLeader(RaftGroup group,
+ RoleInfoProto roleInfoProto) {
+ ratisServer.handleNoLeader(group, roleInfoProto);
+ }
+
+ @Override
public void close() throws IOException {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 8256722..f775396 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -26,9 +26,14 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server
.XceiverServerSpi;
import org.apache.ratis.RaftConfigKeys;
@@ -43,10 +48,15 @@ import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.NotLeaderException;
import org.apache.ratis.protocol.StateMachineException;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.shaded.proto.RaftProtos;
+import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
@@ -59,6 +69,7 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.util.Objects;
+import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
@@ -81,24 +92,72 @@ public final class XceiverServerRatis implements XceiverServerSpi {
private final RaftServer server;
private ThreadPoolExecutor chunkExecutor;
private ClientId clientId = ClientId.randomId();
+ private final StateContext context;
+ private final ReplicationLevel replicationLevel;
+ private long nodeFailureTimeoutMs;
private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir,
- ContainerDispatcher dispatcher, Configuration conf) throws IOException {
+ ContainerDispatcher dispatcher, Configuration conf, StateContext context)
+ throws IOException {
+ Objects.requireNonNull(dd, "id == null");
+ this.port = port;
+ RaftProperties serverProperties = newRaftProperties(conf, storageDir);
+ final int numWriteChunkThreads = conf.getInt(
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY,
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT);
+ chunkExecutor =
+ new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads,
+ 100, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(1024),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ this.context = context;
+ this.replicationLevel =
+ conf.getEnum(OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY,
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT);
+ ContainerStateMachine stateMachine =
+ new ContainerStateMachine(dispatcher, chunkExecutor, this);
+ this.server = RaftServer.newBuilder()
+ .setServerId(RatisHelper.toRaftPeerId(dd))
+ .setGroup(RatisHelper.emptyRaftGroup())
+ .setProperties(serverProperties)
+ .setStateMachine(stateMachine)
+ .build();
+ }
+
+ private RaftProperties newRaftProperties(Configuration conf,
+ String storageDir) {
+ final RaftProperties properties = new RaftProperties();
+
+ // Set rpc type
final String rpcType = conf.get(
OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
+ RaftConfigKeys.Rpc.setType(properties, rpc);
+
+ // set raft segment size
final int raftSegmentSize = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT);
+ RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
+ SizeInBytes.valueOf(raftSegmentSize));
+
+ // set raft segment pre-allocated size
final int raftSegmentPreallocatedSize = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT);
+ RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties,
+ SizeInBytes.valueOf(raftSegmentPreallocatedSize));
+ RaftServerConfigKeys.Log.setPreallocatedSize(properties,
+ SizeInBytes.valueOf(raftSegmentPreallocatedSize));
+
+ // Set max write buffer size, which is the scm chunk size
final int maxChunkSize = OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE;
- final int numWriteChunkThreads = conf.getInt(
- OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY,
- OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT);
+ RaftServerConfigKeys.Log.setWriteBufferSize(properties,
+ SizeInBytes.valueOf(maxChunkSize));
+
+ // Set the client requestTimeout
TimeUnit timeUnit =
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
.getUnit();
@@ -108,6 +167,10 @@ public final class XceiverServerRatis implements XceiverServerSpi {
.getDuration(), timeUnit);
final TimeDuration clientRequestTimeout =
TimeDuration.valueOf(duration, timeUnit);
+ RaftClientConfigKeys.Rpc
+ .setRequestTimeout(properties, clientRequestTimeout);
+
+ // Set the server Request timeout
timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT
.getUnit();
duration = conf.getTimeDuration(
@@ -116,61 +179,44 @@ public final class XceiverServerRatis implements XceiverServerSpi {
.getDuration(), timeUnit);
final TimeDuration serverRequestTimeout =
TimeDuration.valueOf(duration, timeUnit);
-
- Objects.requireNonNull(dd, "id == null");
- this.port = port;
- RaftProperties serverProperties =
- newRaftProperties(rpc, port, storageDir, maxChunkSize, raftSegmentSize,
- raftSegmentPreallocatedSize);
- setRequestTimeout(serverProperties, clientRequestTimeout,
- serverRequestTimeout);
-
- chunkExecutor =
- new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads,
- 100, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(1024),
- new ThreadPoolExecutor.CallerRunsPolicy());
- ContainerStateMachine stateMachine =
- new ContainerStateMachine(dispatcher, chunkExecutor);
- this.server = RaftServer.newBuilder()
- .setServerId(RatisHelper.toRaftPeerId(dd))
- .setGroup(RatisHelper.emptyRaftGroup())
- .setProperties(serverProperties)
- .setStateMachine(stateMachine)
- .build();
- }
-
- private static void setRequestTimeout(RaftProperties serverProperties,
- TimeDuration clientRequestTimeout, TimeDuration serverRequestTimeout) {
- RaftClientConfigKeys.Rpc
- .setRequestTimeout(serverProperties, clientRequestTimeout);
RaftServerConfigKeys.Rpc
- .setRequestTimeout(serverProperties, serverRequestTimeout);
- }
+ .setRequestTimeout(properties, serverRequestTimeout);
- private static RaftProperties newRaftProperties(
- RpcType rpc, int port, String storageDir, int scmChunkSize,
- int raftSegmentSize, int raftSegmentPreallocatedSize) {
- final RaftProperties properties = new RaftProperties();
+ // Enable batch append on raft server
RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true);
- RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties,
- SizeInBytes.valueOf(raftSegmentPreallocatedSize));
- RaftServerConfigKeys.Log.setWriteBufferSize(properties,
- SizeInBytes.valueOf(scmChunkSize));
- RaftServerConfigKeys.Log.setPreallocatedSize(properties,
- SizeInBytes.valueOf(raftSegmentPreallocatedSize));
- RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
- SizeInBytes.valueOf(raftSegmentSize));
- RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
- RaftConfigKeys.Rpc.setType(properties, rpc);
+ // Set the maximum cache segments
RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2);
- GrpcConfigKeys.setMessageSizeMax(properties,
- SizeInBytes.valueOf(scmChunkSize + raftSegmentPreallocatedSize));
+
+ // Set the ratis leader election timeout
RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
TimeDuration.valueOf(800, TimeUnit.MILLISECONDS));
RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS));
+
+ // set the node failure timeout
+ timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT
+ .getUnit();
+ duration = conf.getTimeDuration(
+ OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY,
+ OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT
+ .getDuration(), timeUnit);
+ final TimeDuration nodeFailureTimeout =
+ TimeDuration.valueOf(duration, timeUnit);
+ RaftServerConfigKeys.setLeaderElectionTimeout(properties,
+ nodeFailureTimeout);
+ RaftServerConfigKeys.Rpc.setSlownessTimeout(properties,
+ nodeFailureTimeout);
+ nodeFailureTimeoutMs = nodeFailureTimeout.toLong(TimeUnit.MILLISECONDS);
+
+ // Set the ratis storage directory
+ RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
+
+ // For grpc set the maximum message size
+ GrpcConfigKeys.setMessageSizeMax(properties,
+ SizeInBytes.valueOf(maxChunkSize + raftSegmentPreallocatedSize));
+
+ // Set the ratis port number
if (rpc == SupportedRpcType.GRPC) {
GrpcConfigKeys.Server.setPort(properties, port);
} else if (rpc == SupportedRpcType.NETTY) {
@@ -181,7 +227,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
public static XceiverServerRatis newXceiverServerRatis(
DatanodeDetails datanodeDetails, Configuration ozoneConf,
- ContainerDispatcher dispatcher) throws IOException {
+ ContainerDispatcher dispatcher, StateContext context) throws IOException {
final String ratisDir = File.separator + "ratis";
int localPort = ozoneConf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
@@ -226,7 +272,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
datanodeDetails.setPort(
DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort));
return new XceiverServerRatis(datanodeDetails, localPort, storageDir,
- dispatcher, ozoneConf);
+ dispatcher, ozoneConf, context);
}
@Override
@@ -296,7 +342,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
// the request here are applied on all the raft servers.
RaftClientRequest raftClientRequest =
createRaftClientRequest(request, pipelineID,
- RaftClientRequest.writeRequestType(ReplicationLevel.ALL));
+ RaftClientRequest.writeRequestType(replicationLevel));
CompletableFuture<RaftClientReply> reply =
server.submitClientRequestAsync(raftClientRequest);
reply.thenAccept(this::processReply);
@@ -309,4 +355,57 @@ public final class XceiverServerRatis implements XceiverServerSpi {
PipelineID.getFromProtobuf(pipelineID).getRaftGroupID(),
nextCallId(), 0, Message.valueOf(request.toByteString()), type);
}
+
+ private void handlePipelineFailure(RaftGroupId groupId,
+ RoleInfoProto roleInfoProto) {
+ String msg;
+ UUID datanode = RatisHelper.toDatanodeId(roleInfoProto.getSelf());
+ RaftPeerId id = RaftPeerId.valueOf(roleInfoProto.getSelf().getId());
+ switch (roleInfoProto.getRole()) {
+ case CANDIDATE:
+ msg = datanode + " is in candidate state for " +
+ roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs() + "ms";
+ break;
+ case LEADER:
+ StringBuilder sb = new StringBuilder();
+ sb.append(datanode).append(" has not seen follower/s");
+ for (RaftProtos.ServerRpcProto follower : roleInfoProto.getLeaderInfo()
+ .getFollowerInfoList()) {
+ if (follower.getLastRpcElapsedTimeMs() > nodeFailureTimeoutMs) {
+ sb.append(" ").append(RatisHelper.toDatanodeId(follower.getId()))
+ .append(" for ").append(follower.getLastRpcElapsedTimeMs())
+ .append("ms");
+ }
+ }
+ msg = sb.toString();
+ break;
+ default:
+ LOG.error("unknown state:" + roleInfoProto.getRole());
+ throw new IllegalStateException("node" + id + " is in illegal role "
+ + roleInfoProto.getRole());
+ }
+
+ PipelineID pipelineID = PipelineID.valueOf(groupId);
+ ClosePipelineInfo.Builder closePipelineInfo =
+ ClosePipelineInfo.newBuilder()
+ .setPipelineID(pipelineID.getProtobuf())
+ .setReason(ClosePipelineInfo.Reason.PIPELINE_FAILED)
+ .setDetailedReason(msg);
+
+ PipelineAction action = PipelineAction.newBuilder()
+ .setClosePipeline(closePipelineInfo)
+ .setAction(PipelineAction.Action.CLOSE)
+ .build();
+ context.addPipelineActionIfAbsent(action);
+ }
+
+ void handleNodeSlowness(
+ RaftGroup group, RoleInfoProto roleInfoProto) {
+ handlePipelineFailure(group.getGroupId(), roleInfoProto);
+ }
+
+ void handleNoLeader(
+ RaftGroup group, RoleInfoProto roleInfoProto) {
+ handlePipelineFailure(group.getGroupId(), roleInfoProto);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index b1bf381..72a5804 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -84,7 +84,7 @@ public class OzoneContainer {
new XceiverServerGrpc(datanodeDetails, this.config, this
.hddsDispatcher, createReplicationService()),
XceiverServerRatis.newXceiverServerRatis(datanodeDetails, this
- .config, hddsDispatcher)
+ .config, hddsDispatcher, context)
};
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 1a3496d..0a69343 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -81,6 +81,7 @@ message SCMHeartbeatRequestProto {
optional ContainerReportsProto containerReport = 3;
optional CommandStatusReportsProto commandStatusReport = 4;
optional ContainerActionsProto containerActions = 5;
+ optional PipelineActionsProto pipelineActions = 6;
}
/*
@@ -162,6 +163,31 @@ message ContainerAction {
optional Reason reason = 3;
}
+message PipelineActionsProto {
+ repeated PipelineAction pipelineActions = 1;
+}
+
+message ClosePipelineInfo {
+ enum Reason {
+ PIPELINE_FAILED = 1;
+ }
+ required PipelineID pipelineID = 1;
+ optional Reason reason = 3;
+ optional string detailedReason = 4;
+}
+
+message PipelineAction {
+ enum Action {
+ CLOSE = 1;
+ }
+
+ /**
+ * Action will be used to identify the correct pipeline action.
+ */
+ required Action action = 1;
+ optional ClosePipelineInfo closePipeline = 2;
+}
+
/**
A container report contains the following information.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index 8f5d8d6..3554339 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -200,8 +200,7 @@ public class ContainerMapping implements Mapping {
Pipeline pipeline;
if (contInfo.isContainerOpen()) {
// If pipeline with given pipeline Id already exist return it
- pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID(),
- contInfo.getReplicationType());
+ pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID());
if (pipeline == null) {
pipeline = pipelineSelector
.getReplicationPipeline(contInfo.getReplicationType(),
@@ -389,8 +388,7 @@ public class ContainerMapping implements Mapping {
.updateContainerState(containerInfo, event);
if (!updatedContainer.isContainerOpen()) {
Pipeline pipeline = pipelineSelector
- .getPipeline(containerInfo.getPipelineID(),
- containerInfo.getReplicationType());
+ .getPipeline(containerInfo.getPipelineID());
pipelineSelector.closePipelineIfNoOpenContainers(pipeline);
}
containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
@@ -470,8 +468,7 @@ public class ContainerMapping implements Mapping {
return null;
}
Pipeline pipeline = pipelineSelector
- .getPipeline(containerInfo.getPipelineID(),
- containerInfo.getReplicationType());
+ .getPipeline(containerInfo.getPipelineID());
if (pipeline == null) {
pipeline = pipelineSelector
.getReplicationPipeline(containerInfo.getReplicationType(),
@@ -480,6 +477,24 @@ public class ContainerMapping implements Mapping {
return new ContainerWithPipeline(containerInfo, pipeline);
}
+ public void handlePipelineClose(PipelineID pipelineID) {
+ try {
+ Pipeline pipeline = pipelineSelector.getPipeline(pipelineID);
+ if (pipeline != null) {
+ pipelineSelector.finalizePipeline(pipeline);
+ } else {
+ LOG.debug("pipeline:{} not found", pipelineID);
+ }
+ } catch (Exception e) {
+ LOG.info("failed to close pipeline:{}", pipelineID, e);
+ }
+ }
+
+ public Set<PipelineID> getPipelineOnDatanode(
+ DatanodeDetails datanodeDetails) {
+ return pipelineSelector.getPipelineId(datanodeDetails.getUuid());
+ }
+
/**
* Process container report from Datanode.
* <p>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 7afed42..421d34e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -486,10 +486,9 @@ public class ContainerStateManager implements Closeable {
* @throws IOException
*/
public ContainerWithPipeline getContainer(PipelineSelector selector,
- ContainerID containerID) throws IOException {
+ ContainerID containerID) {
ContainerInfo info = containers.getContainerInfo(containerID.getId());
- Pipeline pipeline = selector.getPipeline(info.getPipelineID(),
- info.getReplicationType());
+ Pipeline pipeline = selector.getPipeline(info.getPipelineID());
return new ContainerWithPipeline(info, pipeline);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
index f4b5bb2..1b0c57c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
@@ -25,11 +25,13 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* Mapping class contains the mapping from a name to a pipeline mapping. This is
@@ -135,4 +137,16 @@ public interface Mapping extends Closeable {
ContainerWithPipeline getMatchingContainerWithPipeline(long size,
String owner, ReplicationType type, ReplicationFactor factor,
LifeCycleState state) throws IOException;
+
+ /**
+ * Handle a pipeline close event.
+ * @param pipelineID pipeline id
+ */
+ void handlePipelineClose(PipelineID pipelineID);
+
+ /**
+ * Get set of pipeline for a specific datanode.
+ * @param datanodeDetails datanode for which pipelines needs to be fetched.
+ */
+ Set<PipelineID> getPipelineOnDatanode(DatanodeDetails datanodeDetails);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 9a4f887..03df8eb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -29,6 +29,9 @@ import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
.ReplicationStatus;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler.CloseContainerRetryableReq;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+ .PipelineActionsFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.ContainerActionsFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
@@ -72,6 +75,23 @@ public final class SCMEvents {
public static final TypedEvent<ContainerActionsFromDatanode>
CONTAINER_ACTIONS = new TypedEvent<>(ContainerActionsFromDatanode.class,
"Container_Actions");
+
+ /**
+ * PipelineActions are sent by Datanode. This event is received by
+ * SCMDatanodeHeartbeatDispatcher and PIPELINE_ACTIONS event is generated.
+ */
+ public static final TypedEvent<PipelineActionsFromDatanode>
+ PIPELINE_ACTIONS = new TypedEvent<>(PipelineActionsFromDatanode.class,
+ "Pipeline_Actions");
+
+ /**
+ * Pipeline close event are triggered to close pipeline because of failure,
+ * stale node, decommissioning etc.
+ */
+ public static final TypedEvent<PipelineID>
+ PIPELINE_CLOSE = new TypedEvent<>(PipelineID.class,
+ "Pipeline_Close");
+
/**
* A Command status report will be sent by datanodes. This repoort is received
* by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated.
@@ -155,7 +175,7 @@ public final class SCMEvents {
*/
public static final Event<DeleteBlockCommandStatus>
DELETE_BLOCK_STATUS =
- new TypedEvent(DeleteBlockCommandStatus.class,
+ new TypedEvent<>(DeleteBlockCommandStatus.class,
"DeleteBlockCommandStatus");
/**
@@ -164,7 +184,7 @@ public final class SCMEvents {
* deleteTransactionID on SCM.
*/
public static final Event<PendingDeleteStatusList> PENDING_DELETE_STATUS =
- new TypedEvent(PendingDeleteStatusList.class, "PendingDeleteStatus");
+ new TypedEvent<>(PendingDeleteStatusList.class, "PendingDeleteStatus");
/**
* This is the command for ReplicationManager to handle under/over
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
index b37dd93..0bd9339 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
@@ -19,24 +19,36 @@
package org.apache.hadoop.hdds.scm.node;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
+import java.util.Set;
+
/**
* Handles Stale node event.
*/
public class StaleNodeHandler implements EventHandler<DatanodeDetails> {
private final Node2ContainerMap node2ContainerMap;
+ private final Mapping containerManager;
- public StaleNodeHandler(Node2ContainerMap node2ContainerMap) {
+ public StaleNodeHandler(Node2ContainerMap node2ContainerMap,
+ Mapping containerManager) {
this.node2ContainerMap = node2ContainerMap;
+ this.containerManager = containerManager;
}
@Override
public void onMessage(DatanodeDetails datanodeDetails,
EventPublisher publisher) {
- //TODO: logic to handle stale node.
+ Set<PipelineID> pipelineIDs =
+ containerManager.getPipelineOnDatanode(datanodeDetails);
+ for (PipelineID id : pipelineIDs) {
+ publisher.fireEvent(SCMEvents.PIPELINE_CLOSE, id);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
index 4a7fa81..363ce71 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hdds.scm.pipelines;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import java.util.Collections;
import java.util.HashSet;
@@ -30,8 +30,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.DUPLICATE_DATANODE;
-
/**
* This data structure maintains the list of pipelines which the given datanode is a part of. This
* information will be added whenever a new pipeline allocation happens.
@@ -39,7 +37,7 @@ import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.DUP
* <p>TODO: this information needs to be regenerated from pipeline reports on SCM restart
*/
public class Node2PipelineMap {
- private final Map<UUID, Set<Pipeline>> dn2PipelineMap;
+ private final Map<UUID, Set<PipelineID>> dn2PipelineMap;
/** Constructs a Node2PipelineMap Object. */
public Node2PipelineMap() {
@@ -58,20 +56,6 @@ public class Node2PipelineMap {
}
/**
- * Insert a new datanode into Node2Pipeline Map.
- *
- * @param datanodeID -- Datanode UUID
- * @param pipelines - set of pipelines.
- */
- private void insertNewDatanode(UUID datanodeID, Set<Pipeline> pipelines) throws SCMException {
- Preconditions.checkNotNull(pipelines);
- Preconditions.checkNotNull(datanodeID);
- if (dn2PipelineMap.putIfAbsent(datanodeID, pipelines) != null) {
- throw new SCMException("Node already exists in the map", DUPLICATE_DATANODE);
- }
- }
-
- /**
* Removes datanode Entry from the map.
*
* @param datanodeID - Datanode ID.
@@ -87,9 +71,10 @@ public class Node2PipelineMap {
* @param datanode - UUID
* @return Set of pipelines or Null.
*/
- public Set<Pipeline> getPipelines(UUID datanode) {
+ public Set<PipelineID> getPipelines(UUID datanode) {
Preconditions.checkNotNull(datanode);
- return dn2PipelineMap.computeIfPresent(datanode, (k, v) -> Collections.unmodifiableSet(v));
+ final Set<PipelineID> s = dn2PipelineMap.get(datanode);
+ return s != null? Collections.unmodifiableSet(s): Collections.emptySet();
}
/**
@@ -100,9 +85,8 @@ public class Node2PipelineMap {
public synchronized void addPipeline(Pipeline pipeline) {
for (DatanodeDetails details : pipeline.getDatanodes().values()) {
UUID dnId = details.getUuid();
- dn2PipelineMap
- .computeIfAbsent(dnId, k -> Collections.synchronizedSet(new HashSet<>()))
- .add(pipeline);
+ dn2PipelineMap.computeIfAbsent(dnId, k -> new HashSet<>())
+ .add(pipeline.getId());
}
}
@@ -112,13 +96,13 @@ public class Node2PipelineMap {
dn2PipelineMap.computeIfPresent(
dnId,
(k, v) -> {
- v.remove(pipeline);
+ v.remove(pipeline.getId());
return v;
});
}
}
- public Map<UUID, Set<Pipeline>> getDn2PipelineMap() {
+ public Map<UUID, Set<PipelineID>> getDn2PipelineMap() {
return Collections.unmodifiableMap(dn2PipelineMap);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java
new file mode 100644
index 0000000..54c2400
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipelines;
+
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineAction;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+ .PipelineActionsFromDatanode;
+
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles pipeline actions from datanode.
+ */
+public class PipelineActionEventHandler implements
+ EventHandler<PipelineActionsFromDatanode> {
+
+ public static final Logger LOG = LoggerFactory.getLogger(
+ PipelineActionEventHandler.class);
+
+ public PipelineActionEventHandler() {
+
+ }
+
+ @Override
+ public void onMessage(PipelineActionsFromDatanode report,
+ EventPublisher publisher) {
+ for (PipelineAction action : report.getReport().getPipelineActionsList()) {
+ switch (action.getAction()) {
+ case CLOSE:
+ PipelineID pipelineID = PipelineID.
+ getFromProtobuf(action.getClosePipeline().getPipelineID());
+ publisher.fireEvent(SCMEvents.PIPELINE_CLOSE, pipelineID);
+ break;
+ default:
+ LOG.error("unknown pipeline action:{}" + action.getAction());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
new file mode 100644
index 0000000..733dec5
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipelines;
+
+import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+
+/**
+ * Handles pipeline close event.
+ */
+public class PipelineCloseHandler implements EventHandler<PipelineID> {
+ private final Mapping mapping;
+ public PipelineCloseHandler(Mapping mapping) {
+ this.mapping = mapping;
+ }
+
+ @Override
+ public void onMessage(PipelineID pipelineID, EventPublisher publisher) {
+ mapping.handlePipelineClose(pipelineID);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
index 5b1a7f7..102df8a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
@@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.pipelines;
import java.util.LinkedList;
import java.util.Map;
-import java.util.WeakHashMap;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
@@ -43,11 +42,12 @@ public abstract class PipelineManager {
private final AtomicInteger pipelineIndex;
private final Node2PipelineMap node2PipelineMap;
- public PipelineManager(Node2PipelineMap map) {
+ public PipelineManager(Node2PipelineMap map,
+ Map<PipelineID, Pipeline> pipelineMap) {
activePipelines = new LinkedList<>();
pipelineIndex = new AtomicInteger(0);
- pipelineMap = new WeakHashMap<>();
- node2PipelineMap = map;
+ this.pipelineMap = pipelineMap;
+ this.node2PipelineMap = map;
}
/**
@@ -187,7 +187,7 @@ public abstract class PipelineManager {
*
* @param pipeline
*/
- public void closePipeline(Pipeline pipeline) {
+ public void closePipeline(Pipeline pipeline) throws IOException {
pipelineMap.remove(pipeline.getId());
node2PipelineMap.removePipeline(pipeline);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
index b02beb3..63afbaa 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
@@ -55,6 +55,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -77,6 +79,7 @@ public class PipelineSelector {
private final StandaloneManagerImpl standaloneManager;
private final long containerSize;
private final Node2PipelineMap node2PipelineMap;
+ private final Map<PipelineID, Pipeline> pipelineMap;
private final LeaseManager<Pipeline> pipelineLeaseManager;
private final StateMachine<LifeCycleState,
HddsProtos.LifeCycleEvent> stateMachine;
@@ -99,12 +102,13 @@ public class PipelineSelector {
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
StorageUnit.BYTES);
node2PipelineMap = new Node2PipelineMap();
+ pipelineMap = new ConcurrentHashMap<>();
this.standaloneManager =
new StandaloneManagerImpl(this.nodeManager, placementPolicy,
- containerSize, node2PipelineMap);
+ containerSize, node2PipelineMap, pipelineMap);
this.ratisManager =
new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
- conf, node2PipelineMap);
+ conf, node2PipelineMap, pipelineMap);
// Initialize the container state machine.
Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
long pipelineCreationLeaseTimeout = conf.getTimeDuration(
@@ -303,19 +307,10 @@ public class PipelineSelector {
}
/**
- * This function to return pipeline for given pipeline name and replication
- * type.
+ * This function to return pipeline for given pipeline id.
*/
- public Pipeline getPipeline(PipelineID pipelineID,
- ReplicationType replicationType) throws IOException {
- if (pipelineID == null) {
- return null;
- }
- PipelineManager manager = getPipelineManager(replicationType);
- Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
- LOG.debug("Getting replication pipeline forReplicationType {} :" +
- " pipelineName:{}", replicationType, pipelineID);
- return manager.getPipeline(pipelineID);
+ public Pipeline getPipeline(PipelineID pipelineID) {
+ return pipelineMap.get(pipelineID);
}
/**
@@ -324,9 +319,18 @@ public class PipelineSelector {
public void finalizePipeline(Pipeline pipeline) throws IOException {
PipelineManager manager = getPipelineManager(pipeline.getType());
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
- LOG.debug("Finalizing pipeline. pipelineID: {}", pipeline.getId());
+ if (pipeline.getLifeCycleState() == LifeCycleState.CLOSING ||
+ pipeline.getLifeCycleState() == LifeCycleState.CLOSED) {
+ LOG.debug("pipeline:{} already in closing state, skipping",
+ pipeline.getId());
+ // already in closing/closed state
+ return;
+ }
+
// Remove the pipeline from active allocation
manager.finalizePipeline(pipeline);
+
+ LOG.info("Finalizing pipeline. pipelineID: {}", pipeline.getId());
updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.FINALIZE);
closePipelineIfNoOpenContainers(pipeline);
}
@@ -350,7 +354,7 @@ public class PipelineSelector {
/**
* Close a given pipeline.
*/
- private void closePipeline(Pipeline pipeline) {
+ private void closePipeline(Pipeline pipeline) throws IOException {
PipelineManager manager = getPipelineManager(pipeline.getType());
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getId());
@@ -400,14 +404,8 @@ public class PipelineSelector {
return node2PipelineMap;
}
- public void removePipeline(UUID dnId) {
- Set<Pipeline> pipelineSet =
- node2PipelineMap.getPipelines(dnId);
- for (Pipeline pipeline : pipelineSet) {
- getPipelineManager(pipeline.getType())
- .closePipeline(pipeline);
- }
- node2PipelineMap.removeDatanode(dnId);
+ public Set<PipelineID> getPipelineId(UUID dnId) {
+ return node2PipelineMap.getPipelines(dnId);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
index 8b14483..150802e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
@@ -39,6 +39,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
+import java.util.Map;
/**
* Implementation of {@link PipelineManager}.
@@ -59,8 +60,8 @@ public class RatisManagerImpl extends PipelineManager {
*/
public RatisManagerImpl(NodeManager nodeManager,
ContainerPlacementPolicy placementPolicy, long size, Configuration conf,
- Node2PipelineMap map) {
- super(map);
+ Node2PipelineMap map, Map<PipelineID, Pipeline> pipelineMap) {
+ super(map, pipelineMap);
this.conf = conf;
this.nodeManager = nodeManager;
ratisMembers = new HashSet<>();
@@ -101,20 +102,23 @@ public class RatisManagerImpl extends PipelineManager {
//TODO:move the initialization from SCM to client
try (XceiverClientRatis client =
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
- client.createPipeline(pipeline);
+ client.createPipeline();
}
}
/**
* Close the pipeline.
*/
- public void closePipeline(Pipeline pipeline) {
+ public void closePipeline(Pipeline pipeline) throws IOException {
super.closePipeline(pipeline);
for (DatanodeDetails node : pipeline.getMachines()) {
// A node should always be the in ratis members list.
Preconditions.checkArgument(ratisMembers.remove(node));
}
- //TODO: should the raft ring also be destroyed as well?
+ try (XceiverClientRatis client =
+ XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
+ client.destroyPipeline();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
index f1b23f5..2573b9c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -37,6 +37,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
+import java.util.Map;
/**
* Standalone Manager Impl to prove that pluggable interface
@@ -58,8 +59,8 @@ public class StandaloneManagerImpl extends PipelineManager {
*/
public StandaloneManagerImpl(NodeManager nodeManager,
ContainerPlacementPolicy placementPolicy, long containerSize,
- Node2PipelineMap map) {
- super(map);
+ Node2PipelineMap map, Map<PipelineID, Pipeline> pipelineMap) {
+ super(map, pipelineMap);
this.nodeManager = nodeManager;
this.placementPolicy = placementPolicy;
this.containerSize = containerSize;
@@ -103,7 +104,7 @@ public class StandaloneManagerImpl extends PipelineManager {
/**
* Close the pipeline.
*/
- public void closePipeline(Pipeline pipeline) {
+ public void closePipeline(Pipeline pipeline) throws IOException {
super.closePipeline(pipeline);
for (DatanodeDetails node : pipeline.getMachines()) {
// A node should always be the in standalone members list.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index c259141..a651f62 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdds.scm.server;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.PipelineActionsProto;
+import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
@@ -43,6 +45,8 @@ import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_ACTIONS;
+
/**
* This class is responsible for dispatching heartbeat from datanode to
* appropriate EventHandler at SCM.
@@ -99,6 +103,13 @@ public final class SCMDatanodeHeartbeatDispatcher {
heartbeat.getContainerActions()));
}
+ if (heartbeat.hasPipelineActions()) {
+ LOG.debug("Dispatching Pipeline Actions.");
+ eventPublisher.fireEvent(PIPELINE_ACTIONS,
+ new PipelineActionsFromDatanode(datanodeDetails,
+ heartbeat.getPipelineActions()));
+ }
+
if (heartbeat.hasCommandStatusReport()) {
eventPublisher.fireEvent(CMD_STATUS_REPORT,
new CommandStatusReportFromDatanode(datanodeDetails,
@@ -168,6 +179,18 @@ public final class SCMDatanodeHeartbeatDispatcher {
}
/**
+ * Pipeline action event payload with origin.
+ */
+ public static class PipelineActionsFromDatanode
+ extends ReportFromDatanode<PipelineActionsProto> {
+
+ public PipelineActionsFromDatanode(DatanodeDetails datanodeDetails,
+ PipelineActionsProto actions) {
+ super(datanodeDetails, actions);
+ }
+ }
+
+ /**
* Container report event payload with origin.
*/
public static class CommandStatusReportFromDatanode
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 061ff78..b84f399 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -62,6 +62,8 @@ import org.apache.hadoop.hdds.scm.node.NodeReportHandler;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.scm.node.StaleNodeHandler;
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineCloseHandler;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineActionEventHandler;
import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdfs.DFSUtil;
@@ -218,7 +220,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
new CommandStatusReportHandler();
NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap);
- StaleNodeHandler staleNodeHandler = new StaleNodeHandler(node2ContainerMap);
+ StaleNodeHandler staleNodeHandler =
+ new StaleNodeHandler(node2ContainerMap, scmContainerManager);
DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap,
getScmContainerManager().getStateManager());
ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
@@ -229,6 +232,11 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
new ContainerReportHandler(scmContainerManager, node2ContainerMap,
replicationStatus);
+ PipelineActionEventHandler pipelineActionEventHandler =
+ new PipelineActionEventHandler();
+
+ PipelineCloseHandler pipelineCloseHandler =
+ new PipelineCloseHandler(scmContainerManager);
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
@@ -242,6 +250,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
eventQueue.addHandler(SCMEvents.START_REPLICATION, replicationStatus);
eventQueue
.addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler);
+ eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS,
+ pipelineActionEventHandler);
+ eventQueue.addHandler(SCMEvents.PIPELINE_CLOSE, pipelineCloseHandler);
long watcherTimeout =
conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
index c0cd293..b8cb9970 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
import org.apache.hadoop.hdds.scm.container.common.helpers
.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
@@ -97,10 +97,10 @@ public class TestNode2PipelineMap {
Assert.assertEquals(3, dns.size());
// get pipeline details by dnid
- Set<Pipeline> pipelines = mapping.getPipelineSelector()
+ Set<PipelineID> pipelines = mapping.getPipelineSelector()
.getNode2PipelineMap().getPipelines(dns.get(0).getUuid());
Assert.assertEquals(1, pipelines.size());
- pipelines.forEach(p -> Assert.assertEquals(p.getId(),
+ pipelines.forEach(p -> Assert.assertEquals(p,
ratisContainer.getPipeline().getId()));
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org