You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/08/13 19:48:07 UTC
[1/2] hadoop git commit: HDDS-324. Use pipeline name as Ratis groupID
to allow datanode to report pipeline info. Contributed by Mukul Kumar Singh.
Repository: hadoop
Updated Branches:
refs/heads/trunk f760a544a -> b4031a8f1
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index 3c9e0c3..e5bb373 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
@@ -95,7 +96,7 @@ public class TestMiniOzoneCluster {
new Pipeline(datanodeDetails.getUuidString(),
HddsProtos.LifeCycleState.OPEN,
HddsProtos.ReplicationType.STAND_ALONE,
- HddsProtos.ReplicationFactor.ONE, "test");
+ HddsProtos.ReplicationFactor.ONE, PipelineID.randomId());
pipeline.addMember(datanodeDetails);
// Verify client is able to connect to the container
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index f3980a5..ff68863 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.container;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hdds.client.BlockID;
@@ -136,10 +137,10 @@ public final class ContainerTestHelper {
final Iterator<DatanodeDetails> i = ids.iterator();
Preconditions.checkArgument(i.hasNext());
final DatanodeDetails leader = i.next();
- String pipelineName = "TEST-" + UUID.randomUUID().toString().substring(3);
final Pipeline pipeline =
new Pipeline(leader.getUuidString(), LifeCycleState.OPEN,
- ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
+ ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+ PipelineID.randomId());
pipeline.addMember(leader);
for(; i.hasNext();) {
pipeline.addMember(i.next());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
index 61bd935..866bc32 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -108,10 +108,11 @@ public class TestCloseContainerByPipeline {
.get(0).getBlocksLatestVersionOnly().get(0);
long containerID = omKeyLocationInfo.getContainerID();
- List<DatanodeDetails> datanodes = cluster.getStorageContainerManager()
+ Pipeline pipeline = cluster.getStorageContainerManager()
.getScmContainerManager().getContainerWithPipeline(containerID)
- .getPipeline().getMachines();
- Assert.assertTrue(datanodes.size() == 1);
+ .getPipeline();
+ List<DatanodeDetails> datanodes = pipeline.getMachines();
+ Assert.assertEquals(datanodes.size(), 1);
DatanodeDetails datanodeDetails = datanodes.get(0);
HddsDatanodeService datanodeService = null;
@@ -131,7 +132,7 @@ public class TestCloseContainerByPipeline {
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(datanodeDetails.getUuid(),
new CloseContainerCommand(containerID,
- HddsProtos.ReplicationType.STAND_ALONE));
+ HddsProtos.ReplicationType.STAND_ALONE, pipeline.getId()));
GenericTestUtils
.waitFor(() -> isContainerClosed(cluster, containerID, datanodeDetails),
500, 5 * 1000);
@@ -142,7 +143,7 @@ public class TestCloseContainerByPipeline {
}
@Test
- public void testCloseContainerViaStandaAlone()
+ public void testCloseContainerViaStandAlone()
throws IOException, TimeoutException, InterruptedException {
OzoneOutputStream key = objectStore.getVolume("test").getBucket("test")
@@ -163,10 +164,11 @@ public class TestCloseContainerByPipeline {
.get(0).getBlocksLatestVersionOnly().get(0);
long containerID = omKeyLocationInfo.getContainerID();
- List<DatanodeDetails> datanodes = cluster.getStorageContainerManager()
+ Pipeline pipeline = cluster.getStorageContainerManager()
.getScmContainerManager().getContainerWithPipeline(containerID)
- .getPipeline().getMachines();
- Assert.assertTrue(datanodes.size() == 1);
+ .getPipeline();
+ List<DatanodeDetails> datanodes = pipeline.getMachines();
+ Assert.assertEquals(datanodes.size(), 1);
DatanodeDetails datanodeDetails = datanodes.get(0);
Assert
@@ -178,7 +180,7 @@ public class TestCloseContainerByPipeline {
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(datanodeDetails.getUuid(),
new CloseContainerCommand(containerID,
- HddsProtos.ReplicationType.STAND_ALONE));
+ HddsProtos.ReplicationType.STAND_ALONE, pipeline.getId()));
GenericTestUtils
.waitFor(() -> isContainerClosed(cluster, containerID, datanodeDetails),
@@ -216,10 +218,11 @@ public class TestCloseContainerByPipeline {
.get(0).getBlocksLatestVersionOnly().get(0);
long containerID = omKeyLocationInfo.getContainerID();
- List<DatanodeDetails> datanodes = cluster.getStorageContainerManager()
+ Pipeline pipeline = cluster.getStorageContainerManager()
.getScmContainerManager().getContainerWithPipeline(containerID)
- .getPipeline().getMachines();
- Assert.assertTrue(datanodes.size() == 3);
+ .getPipeline();
+ List<DatanodeDetails> datanodes = pipeline.getMachines();
+ Assert.assertEquals(3, datanodes.size());
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(OzoneContainer.LOG);
@@ -230,7 +233,7 @@ public class TestCloseContainerByPipeline {
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(details.getUuid(),
new CloseContainerCommand(containerID,
- HddsProtos.ReplicationType.RATIS));
+ HddsProtos.ReplicationType.RATIS, pipeline.getId()));
}
for (DatanodeDetails datanodeDetails : datanodes) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
index c0c9bc4..3d39dbb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
@@ -33,7 +34,6 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -80,6 +80,9 @@ public class TestCloseContainerHandler {
.get(0).getBlocksLatestVersionOnly().get(0);
long containerID = omKeyLocationInfo.getContainerID();
+ Pipeline pipeline = cluster.getStorageContainerManager()
+ .getScmContainerManager().getContainerWithPipeline(containerID)
+ .getPipeline();
Assert.assertFalse(isContainerClosed(cluster, containerID));
@@ -89,7 +92,7 @@ public class TestCloseContainerHandler {
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(datanodeDetails.getUuid(),
new CloseContainerCommand(containerID,
- HddsProtos.ReplicationType.STAND_ALONE));
+ HddsProtos.ReplicationType.STAND_ALONE, pipeline.getId()));
GenericTestUtils.waitFor(() -> isContainerClosed(cluster, containerID),
500,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkContainerStateMap.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkContainerStateMap.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkContainerStateMap.java
index c344bbe..878ab36 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkContainerStateMap.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkContainerStateMap.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.util.Time;
@@ -60,7 +61,7 @@ public class BenchMarkContainerStateMap {
try {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(CLOSED)
- .setPipelineName(pipeline.getPipelineName())
+ .setPipelineID(pipeline.getId())
.setReplicationType(pipeline.getType())
.setReplicationFactor(pipeline.getFactor())
// This is bytes allocated for blocks inside container, not the
@@ -83,7 +84,7 @@ public class BenchMarkContainerStateMap {
try {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(OPEN)
- .setPipelineName(pipeline.getPipelineName())
+ .setPipelineID(pipeline.getId())
.setReplicationType(pipeline.getType())
.setReplicationFactor(pipeline.getFactor())
// This is bytes allocated for blocks inside container, not the
@@ -105,7 +106,7 @@ public class BenchMarkContainerStateMap {
try {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(OPEN)
- .setPipelineName(pipeline.getPipelineName())
+ .setPipelineID(pipeline.getId())
.setReplicationType(pipeline.getType())
.setReplicationFactor(pipeline.getFactor())
// This is bytes allocated for blocks inside container, not the
@@ -154,10 +155,10 @@ public class BenchMarkContainerStateMap {
final Iterator<DatanodeDetails> i = ids.iterator();
Preconditions.checkArgument(i.hasNext());
final DatanodeDetails leader = i.next();
- String pipelineName = "TEST-" + UUID.randomUUID().toString().substring(5);
final Pipeline pipeline =
new Pipeline(leader.getUuidString(), OPEN,
- ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
+ ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+ PipelineID.randomId());
pipeline.addMember(leader);
for (; i.hasNext();) {
pipeline.addMember(i.next());
@@ -172,7 +173,7 @@ public class BenchMarkContainerStateMap {
int cid = state.containerID.incrementAndGet();
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(CLOSED)
- .setPipelineName(pipeline.getPipelineName())
+ .setPipelineID(pipeline.getId())
.setReplicationType(pipeline.getType())
.setReplicationFactor(pipeline.getFactor())
// This is bytes allocated for blocks inside container, not the
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
index 3c49fb6..77be57d 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.genesis;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine
@@ -92,7 +93,7 @@ public class BenchMarkDatanodeDispatcher {
datanodeUuid = UUID.randomUUID().toString();
pipeline = new Pipeline("127.0.0.1",
LifeCycleState.OPEN, ReplicationType.STAND_ALONE,
- ReplicationFactor.ONE, "SA-" + UUID.randomUUID());
+ ReplicationFactor.ONE, PipelineID.randomId());
// 1 MB of data
data = ByteString.copyFromUtf8(RandomStringUtils.randomAscii(1048576));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 7603842..bb0e48b 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -97,7 +97,7 @@
<ldap-api.version>1.0.0-M33</ldap-api.version>
<!-- Apache Ratis version -->
- <ratis.version>0.3.0-c242317-SNAPSHOT</ratis.version>
+ <ratis.version>0.3.0-e4a016f-SNAPSHOT</ratis.version>
<jcache.version>1.0-alpha-1</jcache.version>
<ehcache.version>3.3.1</ehcache.version>
<hikari.version>2.4.12</hikari.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: HDDS-324. Use pipeline name as Ratis groupID
to allow datanode to report pipeline info. Contributed by Mukul Kumar Singh.
Posted by xy...@apache.org.
HDDS-324. Use pipeline name as Ratis groupID to allow datanode to report pipeline info. Contributed by Mukul Kumar Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b4031a8f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b4031a8f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b4031a8f
Branch: refs/heads/trunk
Commit: b4031a8f1b2c81249ec24167e38679a775c09214
Parents: f760a54
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Mon Aug 13 12:39:05 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Mon Aug 13 12:47:49 2018 -0700
----------------------------------------------------------------------
.../src/main/bin/hadoop-functions.sh | 2 +-
.../apache/hadoop/hdds/scm/XceiverClient.java | 5 +-
.../hadoop/hdds/scm/XceiverClientGrpc.java | 5 +-
.../hadoop/hdds/scm/XceiverClientRatis.java | 14 ++--
.../scm/client/ContainerOperationClient.java | 5 +-
.../hadoop/hdds/scm/XceiverClientSpi.java | 6 +-
.../container/common/helpers/ContainerInfo.java | 25 +++---
.../scm/container/common/helpers/Pipeline.java | 38 +++++----
.../container/common/helpers/PipelineID.java | 88 ++++++++++++++++++++
.../main/java/org/apache/ratis/RatisHelper.java | 19 +++--
hadoop-hdds/common/src/main/proto/hdds.proto | 8 +-
.../CloseContainerCommandHandler.java | 3 +-
.../common/transport/server/XceiverServer.java | 7 +-
.../transport/server/XceiverServerGrpc.java | 7 +-
.../transport/server/XceiverServerSpi.java | 6 +-
.../server/ratis/XceiverServerRatis.java | 36 +++++---
.../container/ozoneimpl/OzoneContainer.java | 8 +-
.../commands/CloseContainerCommand.java | 19 +++--
.../StorageContainerDatanodeProtocol.proto | 1 +
.../container/CloseContainerEventHandler.java | 2 +-
.../hdds/scm/container/ContainerMapping.java | 14 ++--
.../scm/container/ContainerStateManager.java | 14 ++--
.../scm/container/closer/ContainerCloser.java | 4 +-
.../scm/container/states/ContainerStateMap.java | 16 ++--
.../hadoop/hdds/scm/node/DatanodeInfo.java | 8 ++
.../hdds/scm/pipelines/PipelineManager.java | 36 ++++----
.../hdds/scm/pipelines/PipelineSelector.java | 31 +++----
.../scm/pipelines/ratis/RatisManagerImpl.java | 18 ++--
.../standalone/StandaloneManagerImpl.java | 14 ++--
.../hdds/scm/block/TestDeletedBlockLog.java | 6 +-
.../container/TestContainerReportHandler.java | 4 +-
.../replication/TestReplicationManager.java | 5 +-
.../hadoop/hdds/scm/node/TestNodeManager.java | 4 +-
.../ozone/container/common/TestEndPoint.java | 2 +
.../hdds/scm/pipeline/TestNode2PipelineMap.java | 8 +-
.../hdds/scm/pipeline/TestPipelineClose.java | 10 +--
.../hadoop/ozone/TestMiniOzoneCluster.java | 3 +-
.../ozone/container/ContainerTestHelper.java | 5 +-
.../TestCloseContainerByPipeline.java | 31 +++----
.../TestCloseContainerHandler.java | 7 +-
.../genesis/BenchMarkContainerStateMap.java | 13 +--
.../genesis/BenchMarkDatanodeDispatcher.java | 3 +-
hadoop-project/pom.xml | 2 +-
43 files changed, 362 insertions(+), 200 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh b/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh
index 71ba7ff..64fd86e 100755
--- a/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh
+++ b/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh
@@ -598,7 +598,7 @@ function hadoop_bootstrap
MAPRED_LIB_JARS_DIR=${MAPRED_LIB_JARS_DIR:-"share/hadoop/mapreduce/lib"}
HDDS_DIR=${HDDS_DIR:-"share/hadoop/hdds"}
HDDS_LIB_JARS_DIR=${HDDS_LIB_JARS_DIR:-"share/hadoop/hdds/lib"}
- OZONE_DIR=${OZONE_DIR:-"share/hadoop/ozone"}
+ OZONE_DIR=${OZONE_DIR:-"share/hadoop/ozone"}q
OZONE_LIB_JARS_DIR=${OZONE_LIB_JARS_DIR:-"share/hadoop/ozone/lib"}
OZONEFS_DIR=${OZONEFS_DIR:-"share/hadoop/ozonefs"}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
index 709f0dc..097af17 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
@@ -188,11 +188,10 @@ public class XceiverClient extends XceiverClientSpi {
/**
* Create a pipeline.
*
- * @param pipelineID - Name of the pipeline.
- * @param datanodes - Datanodes
+ * @param pipeline - pipeline to be created.
*/
@Override
- public void createPipeline(String pipelineID, List<DatanodeDetails> datanodes)
+ public void createPipeline(Pipeline pipeline)
throws IOException {
// For stand alone pipeline, there is no notion called setup pipeline.
return;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 92df46e..35bc932 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -218,11 +218,10 @@ public class XceiverClientGrpc extends XceiverClientSpi {
/**
* Create a pipeline.
*
- * @param pipelineID - Name of the pipeline.
- * @param datanodes - Datanodes
+ * @param pipeline - pipeline to be created.
*/
@Override
- public void createPipeline(String pipelineID, List<DatanodeDetails> datanodes)
+ public void createPipeline(Pipeline pipeline)
throws IOException {
// For stand alone pipeline, there is no notion called setup pipeline.
return;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 2541415..2cb319f 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -35,6 +35,7 @@ import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
@@ -87,12 +88,13 @@ public final class XceiverClientRatis extends XceiverClientSpi {
/**
* {@inheritDoc}
*/
- public void createPipeline(String clusterId, List<DatanodeDetails> datanodes)
+ public void createPipeline(Pipeline pipeline)
throws IOException {
- RaftGroup group = RatisHelper.newRaftGroup(datanodes);
- LOG.debug("initializing pipeline:{} with nodes:{}", clusterId,
- group.getPeers());
- reinitialize(datanodes, group);
+ RaftGroupId groupId = pipeline.getId().getRaftGroupID();
+ RaftGroup group = RatisHelper.newRaftGroup(groupId, pipeline.getMachines());
+ LOG.debug("initializing pipeline:{} with nodes:{}",
+ pipeline.getId(), group.getPeers());
+ reinitialize(pipeline.getMachines(), group);
}
/**
@@ -157,7 +159,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
@Override
public void connect() throws Exception {
LOG.debug("Connecting to pipeline:{} leader:{}",
- getPipeline().getPipelineName(),
+ getPipeline().getId(),
RatisHelper.toRaftPeerId(pipeline.getLeader()));
// TODO : XceiverClient ratis should pass the config value of
// maxOutstandingRequests so as to set the upper bound on max no of async
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
index e7bdaf0..faa1ec6 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
@@ -158,7 +158,7 @@ public class ContainerOperationClient implements ScmClient {
private void createPipeline(XceiverClientSpi client, Pipeline pipeline)
throws IOException {
- Preconditions.checkNotNull(pipeline.getPipelineName(), "Pipeline " +
+ Preconditions.checkNotNull(pipeline.getId(), "Pipeline " +
"name cannot be null when client create flag is set.");
// Pipeline creation is a three step process.
@@ -180,8 +180,7 @@ public class ContainerOperationClient implements ScmClient {
// ObjectStageChangeRequestProto.Op.create,
// ObjectStageChangeRequestProto.Stage.begin);
- client.createPipeline(pipeline.getPipelineName(),
- pipeline.getMachines());
+ client.createPipeline(pipeline);
//storageContainerLocationClient.notifyObjectStageChange(
// ObjectStageChangeRequestProto.Type.pipeline,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index 56cc741..b29e73d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -114,11 +114,9 @@ public abstract class XceiverClientSpi implements Closeable {
/**
* Create a pipeline.
*
- * @param pipelineID - Name of the pipeline.
- * @param datanodes - Datanodes
+ * @param pipeline - pipeline to be created.
*/
- public abstract void createPipeline(String pipelineID,
- List<DatanodeDetails> datanodes) throws IOException;
+ public abstract void createPipeline(Pipeline pipeline) throws IOException;
/**
* Returns pipeline Type.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
index b194c14..427c08b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
@@ -58,7 +58,7 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
}
private HddsProtos.LifeCycleState state;
- private String pipelineName;
+ private PipelineID pipelineID;
private ReplicationFactor replicationFactor;
private ReplicationType replicationType;
// Bytes allocated by SCM for clients.
@@ -82,7 +82,7 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
ContainerInfo(
long containerID,
HddsProtos.LifeCycleState state,
- String pipelineName,
+ PipelineID pipelineID,
long allocatedBytes,
long usedBytes,
long numberOfKeys,
@@ -92,7 +92,7 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
ReplicationFactor replicationFactor,
ReplicationType repType) {
this.containerID = containerID;
- this.pipelineName = pipelineName;
+ this.pipelineID = pipelineID;
this.allocatedBytes = allocatedBytes;
this.usedBytes = usedBytes;
this.numberOfKeys = numberOfKeys;
@@ -113,7 +113,8 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
public static ContainerInfo fromProtobuf(HddsProtos.SCMContainerInfo info) {
ContainerInfo.Builder builder = new ContainerInfo.Builder();
- return builder.setPipelineName(info.getPipelineName())
+ return builder.setPipelineID(
+ PipelineID.getFromProtobuf(info.getPipelineID()))
.setAllocatedBytes(info.getAllocatedBytes())
.setUsedBytes(info.getUsedBytes())
.setNumberOfKeys(info.getNumberOfKeys())
@@ -147,8 +148,8 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
return replicationFactor;
}
- public String getPipelineName() {
- return pipelineName;
+ public PipelineID getPipelineID() {
+ return pipelineID;
}
public long getAllocatedBytes() {
@@ -217,7 +218,7 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
.setNumberOfKeys(getNumberOfKeys()).setState(getState())
.setStateEnterTime(getStateEnterTime()).setContainerID(getContainerID())
.setDeleteTransactionId(getDeleteTransactionId())
- .setPipelineName(getPipelineName())
+ .setPipelineID(getPipelineID().getProtobuf())
.setReplicationFactor(getReplicationFactor())
.setReplicationType(getReplicationType())
.setOwner(getOwner())
@@ -236,7 +237,7 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
public String toString() {
return "ContainerInfo{"
+ "state=" + state
- + ", pipelineName=" + pipelineName
+ + ", pipelineID=" + pipelineID
+ ", stateEnterTime=" + stateEnterTime
+ ", owner=" + owner
+ '}';
@@ -389,7 +390,7 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
private String owner;
private long containerID;
private long deleteTransactionId;
- private String pipelineName;
+ private PipelineID pipelineID;
private ReplicationFactor replicationFactor;
private ReplicationType replicationType;
@@ -399,8 +400,8 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
return this;
}
- public Builder setPipelineName(String pipelineName) {
- this.pipelineName = pipelineName;
+ public Builder setPipelineID(PipelineID pipelineID) {
+ this.pipelineID = pipelineID;
return this;
}
@@ -451,7 +452,7 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
}
public ContainerInfo build() {
- return new ContainerInfo(containerID, state, pipelineName, allocated,
+ return new ContainerInfo(containerID, state, pipelineID, allocated,
used, keys, stateEnterTime, owner, deleteTransactionId,
replicationFactor, replicationType);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
index 534c9fd..9270468 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
@@ -64,9 +64,7 @@ public class Pipeline {
private HddsProtos.LifeCycleState lifeCycleState;
private HddsProtos.ReplicationType type;
private HddsProtos.ReplicationFactor factor;
- private String name;
- // TODO: change to long based id
- //private long id;
+ private PipelineID id;
/**
* Constructs a new pipeline data structure.
@@ -75,16 +73,16 @@ public class Pipeline {
* @param lifeCycleState - Pipeline State
* @param replicationType - Replication protocol
* @param replicationFactor - replication count on datanodes
- * @param name - pipelineName
+ * @param id - pipeline ID
*/
public Pipeline(String leaderID, HddsProtos.LifeCycleState lifeCycleState,
HddsProtos.ReplicationType replicationType,
- HddsProtos.ReplicationFactor replicationFactor, String name) {
+ HddsProtos.ReplicationFactor replicationFactor, PipelineID id) {
this.leaderID = leaderID;
this.lifeCycleState = lifeCycleState;
this.type = replicationType;
this.factor = replicationFactor;
- this.name = name;
+ this.id = id;
datanodes = new TreeMap<>();
}
@@ -102,7 +100,7 @@ public class Pipeline {
pipelineProto.getState(),
pipelineProto.getType(),
pipelineProto.getFactor(),
- pipelineProto.getName());
+ PipelineID.getFromProtobuf(pipelineProto.getId()));
for (HddsProtos.DatanodeDetailsProto dataID :
pipelineProto.getMembersList()) {
@@ -191,15 +189,19 @@ public class Pipeline {
}
builder.setLeaderID(leaderID);
- if (this.getLifeCycleState() != null) {
- builder.setState(this.getLifeCycleState());
+ if (lifeCycleState != null) {
+ builder.setState(lifeCycleState);
}
- if (this.getType() != null) {
- builder.setType(this.getType());
+ if (type != null) {
+ builder.setType(type);
}
- if (this.getFactor() != null) {
- builder.setFactor(this.getFactor());
+ if (factor != null) {
+ builder.setFactor(factor);
+ }
+
+ if (id != null) {
+ builder.setId(id.getProtobuf());
}
return builder.build();
}
@@ -221,12 +223,12 @@ public class Pipeline {
}
/**
- * Gets the pipeline Name.
+ * Gets the pipeline id.
*
- * @return - Name of the pipeline
+ * @return - Id of the pipeline
*/
- public String getPipelineName() {
- return name;
+ public PipelineID getId() {
+ return id;
}
/**
@@ -245,7 +247,7 @@ public class Pipeline {
getDatanodes().keySet().stream()
.forEach(id -> b.
append(id.endsWith(getLeaderID()) ? "*" + id : id));
- b.append(" name:").append(getPipelineName());
+ b.append(" id:").append(id);
if (getType() != null) {
b.append(" type:").append(getType().toString());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java
new file mode 100644
index 0000000..473ebc5
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.common.helpers;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.ratis.protocol.RaftGroupId;
+
+import java.util.UUID;
+
+/**
+ * ID for the pipeline, the ID is based on UUID so that it can be used
+ * in Ratis as RaftGroupId, GroupID is used by the datanodes to initialize
+ * the ratis group they are part of.
+ */
+public class PipelineID {
+
+ private UUID id;
+ private RaftGroupId groupId;
+
+ private PipelineID(UUID id) {
+ this.id = id;
+ this.groupId = RaftGroupId.valueOf(id);
+ }
+
+ public static PipelineID randomId() {
+ return new PipelineID(UUID.randomUUID());
+ }
+
+ public static PipelineID valueOf(RaftGroupId groupId) {
+ return new PipelineID(groupId.getUuid());
+ }
+
+ public RaftGroupId getRaftGroupID() {
+ return groupId;
+ }
+
+ public UUID getId() {
+ return id;
+ }
+
+ public HddsProtos.PipelineID getProtobuf() {
+ return HddsProtos.PipelineID.newBuilder().setId(id.toString()).build();
+ }
+
+ public static PipelineID getFromProtobuf(HddsProtos.PipelineID protos) {
+ return new PipelineID(UUID.fromString(protos.getId()));
+ }
+
+ @Override
+ public String toString() {
+ return "pipelineId=" + id;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ PipelineID that = (PipelineID) o;
+
+ return id.equals(that.id);
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
index df83116..9c25e20 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
@@ -88,25 +88,28 @@ public interface RatisHelper {
return EMPTY_GROUP;
}
- static RaftGroup newRaftGroup(List<DatanodeDetails> datanodes) {
- final List<RaftPeer> newPeers = datanodes.stream()
- .map(RatisHelper::toRaftPeer)
- .collect(Collectors.toList());
- return RatisHelper.newRaftGroup(newPeers);
- }
-
static RaftGroup newRaftGroup(Collection<RaftPeer> peers) {
return peers.isEmpty()? emptyRaftGroup()
: new RaftGroup(DUMMY_GROUP_ID, peers);
}
+ static RaftGroup newRaftGroup(RaftGroupId groupId,
+ Collection<DatanodeDetails> peers) {
+ final List<RaftPeer> newPeers = peers.stream()
+ .map(RatisHelper::toRaftPeer)
+ .collect(Collectors.toList());
+ return peers.isEmpty() ? new RaftGroup(groupId, Collections.emptyList())
+ : new RaftGroup(groupId, newPeers);
+ }
+
static RaftGroup newRaftGroup(Pipeline pipeline) {
return newRaftGroup(toRaftPeers(pipeline));
}
static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline) {
return newRaftClient(rpcType, toRaftPeerId(pipeline.getLeader()),
- newRaftGroup(pipeline));
+ newRaftGroup(pipeline.getId().getRaftGroupID(),
+ pipeline.getMachines()));
}
static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/common/src/main/proto/hdds.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto
index a5ce994..89c928b 100644
--- a/hadoop-hdds/common/src/main/proto/hdds.proto
+++ b/hadoop-hdds/common/src/main/proto/hdds.proto
@@ -40,13 +40,17 @@ message Port {
required uint32 value = 2;
}
+message PipelineID {
+ required string id = 1;
+}
+
message Pipeline {
required string leaderID = 1;
repeated DatanodeDetailsProto members = 2;
optional LifeCycleState state = 3 [default = OPEN];
optional ReplicationType type = 4 [default = STAND_ALONE];
optional ReplicationFactor factor = 5 [default = ONE];
- optional string name = 6;
+ required PipelineID id = 6;
}
message KeyValue {
@@ -129,7 +133,7 @@ enum LifeCycleEvent {
message SCMContainerInfo {
required int64 containerID = 1;
required LifeCycleState state = 2;
- optional string pipelineName = 3;
+ optional PipelineID pipelineID = 3;
// This is not total size of container, but space allocated by SCM for
// clients to write blocks
required uint64 allocatedBytes = 4;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index a3bddfc..030a357 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -73,6 +73,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
CloseContainerCommandProto
.parseFrom(command.getProtoBufMessage());
containerID = closeContainerProto.getContainerID();
+ HddsProtos.PipelineID pipelineID = closeContainerProto.getPipelineID();
HddsProtos.ReplicationType replicationType =
closeContainerProto.getReplicationType();
@@ -87,7 +88,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
context.getParent().getDatanodeDetails().getUuidString());
// submit the close container request for the XceiverServer to handle
container.submitContainerRequest(
- request.build(), replicationType);
+ request.build(), replicationType, pipelineID);
cmdExecuted = true;
} catch (Exception e) {
LOG.error("Can't close container " + containerID, e);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
index 3a469de..f866fcd 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
@@ -19,7 +19,8 @@
package org.apache.hadoop.ozone.container.common.transport.server;
import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandRequestProto;
import org.apache.ratis.shaded.io.netty.bootstrap.ServerBootstrap;
import org.apache.ratis.shaded.io.netty.channel.Channel;
import org.apache.ratis.shaded.io.netty.channel.EventLoopGroup;
@@ -132,8 +133,8 @@ public final class XceiverServer implements XceiverServerSpi {
}
@Override
- public void submitRequest(
- ContainerProtos.ContainerCommandRequestProto request) throws IOException {
+ public void submitRequest(ContainerCommandRequestProto request,
+ HddsProtos.PipelineID pipelineID) {
storageContainer.dispatch(request);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index 0a9e1db..f4f3f6f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -21,7 +21,8 @@ package org.apache.hadoop.ozone.container.common.transport.server;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -108,8 +109,8 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
}
@Override
- public void submitRequest(
- ContainerProtos.ContainerCommandRequestProto request) throws IOException {
+ public void submitRequest(ContainerCommandRequestProto request,
+ HddsProtos.PipelineID pipelineID) {
storageContainer.dispatch(request);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
index 49579f2..1863f6d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
@@ -18,7 +18,8 @@
package org.apache.hadoop.ozone.container.common.transport.server;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import java.io.IOException;
@@ -45,6 +46,7 @@ public interface XceiverServerSpi {
* submits a containerRequest to be performed by the replication pipeline.
* @param request ContainerCommandRequest
*/
- void submitRequest(ContainerProtos.ContainerCommandRequestProto request)
+ void submitRequest(ContainerCommandRequestProto request,
+ HddsProtos.PipelineID pipelineID)
throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 723b94ae..f8c7af2 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -23,8 +23,10 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.transport.server
@@ -35,12 +37,17 @@ import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.netty.NettyConfigKeys;
-import org.apache.ratis.protocol.*;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.shaded.proto.RaftProtos;
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
@@ -73,6 +80,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
private final int port;
private final RaftServer server;
private ThreadPoolExecutor chunkExecutor;
+ private ClientId clientId = ClientId.randomId();
private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir,
ContainerDispatcher dispatcher, Configuration conf) throws IOException {
@@ -282,17 +290,23 @@ public final class XceiverServerRatis implements XceiverServerSpi {
@Override
public void submitRequest(
- ContainerProtos.ContainerCommandRequestProto request) throws IOException {
- ClientId clientId = ClientId.randomId();
+ ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID)
+ throws IOException {
+ // ReplicationLevel.ALL ensures the transactions corresponding to
+ // the request here are applied on all the raft servers.
RaftClientRequest raftClientRequest =
- new RaftClientRequest(clientId, server.getId(),
- RatisHelper.emptyRaftGroup().getGroupId(), nextCallId(), 0,
- Message.valueOf(request.toByteString()), RaftClientRequest
- // ReplicationLevel.ALL ensures the transactions corresponding to
- // the request here are applied on all the raft servers.
- .writeRequestType(RaftProtos.ReplicationLevel.ALL));
+ createRaftClientRequest(request, pipelineID,
+ RaftClientRequest.writeRequestType(ReplicationLevel.ALL));
CompletableFuture<RaftClientReply> reply =
server.submitClientRequestAsync(raftClientRequest);
reply.thenAccept(this::processReply);
}
+
+ private RaftClientRequest createRaftClientRequest(
+ ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID,
+ RaftClientRequest.Type type) {
+ return new RaftClientRequest(clientId, server.getId(),
+ PipelineID.getFromProtobuf(pipelineID).getRaftGroupID(),
+ nextCallId(),0, Message.valueOf(request.toByteString()), type);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 85c947f..5bff78c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -168,23 +168,25 @@ public class OzoneContainer {
* Submit ContainerRequest.
* @param request
* @param replicationType
+ * @param pipelineID
* @throws IOException
*/
public void submitContainerRequest(
ContainerProtos.ContainerCommandRequestProto request,
- HddsProtos.ReplicationType replicationType) throws IOException {
+ HddsProtos.ReplicationType replicationType,
+ HddsProtos.PipelineID pipelineID) throws IOException {
XceiverServerSpi serverInstance;
long containerId = getContainerIdForCmd(request);
if (replicationType == HddsProtos.ReplicationType.RATIS) {
serverInstance = getRatisSerer();
Preconditions.checkNotNull(serverInstance);
- serverInstance.submitRequest(request);
+ serverInstance.submitRequest(request, pipelineID);
LOG.info("submitting {} request over RATIS server for container {}",
request.getCmdType(), containerId);
} else {
serverInstance = getStandaAloneSerer();
Preconditions.checkNotNull(serverInstance);
- getStandaAloneSerer().submitRequest(request);
+ getStandaAloneSerer().submitRequest(request, pipelineID);
LOG.info(
"submitting {} request over STAND_ALONE server for container {}",
request.getCmdType(), containerId);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
index 1829642..aaa5f11 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
/**
* Asks datanode to close a container.
@@ -31,20 +32,25 @@ public class CloseContainerCommand
private long containerID;
private HddsProtos.ReplicationType replicationType;
+ private PipelineID pipelineID;
public CloseContainerCommand(long containerID,
- HddsProtos.ReplicationType replicationType) {
+ HddsProtos.ReplicationType replicationType,
+ PipelineID pipelineID) {
super();
this.containerID = containerID;
this.replicationType = replicationType;
+ this.pipelineID = pipelineID;
}
// Should be called only for protobuf conversion
private CloseContainerCommand(long containerID,
- HddsProtos.ReplicationType replicationType, long id) {
+ HddsProtos.ReplicationType replicationType,
+ PipelineID pipelineID, long id) {
super(id);
this.containerID = containerID;
this.replicationType = replicationType;
+ this.pipelineID = pipelineID;
}
/**
@@ -71,15 +77,18 @@ public class CloseContainerCommand
return CloseContainerCommandProto.newBuilder()
.setContainerID(containerID)
.setCmdId(getId())
- .setReplicationType(replicationType).build();
+ .setReplicationType(replicationType)
+ .setPipelineID(pipelineID.getProtobuf())
+ .build();
}
public static CloseContainerCommand getFromProtobuf(
CloseContainerCommandProto closeContainerProto) {
Preconditions.checkNotNull(closeContainerProto);
return new CloseContainerCommand(closeContainerProto.getContainerID(),
- closeContainerProto.getReplicationType(), closeContainerProto
- .getCmdId());
+ closeContainerProto.getReplicationType(),
+ PipelineID.getFromProtobuf(closeContainerProto.getPipelineID()),
+ closeContainerProto.getCmdId());
}
public long getContainerID() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 71c41e3..1a3496d 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -247,6 +247,7 @@ message CloseContainerCommandProto {
required int64 containerID = 1;
required hadoop.hdds.ReplicationType replicationType = 2;
required int64 cmdId = 3;
+ required PipelineID pipelineID = 4;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index 949eb13..c723dfa 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -78,7 +78,7 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
CommandForDatanode closeContainerCommand = new CommandForDatanode<>(
datanode.getUuid(),
new CloseContainerCommand(containerID.getId(),
- info.getReplicationType()));
+ info.getReplicationType(), info.getPipelineID()));
publisher.fireEvent(DATANODE_COMMAND, closeContainerCommand);
}
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index 863d6c5..e12fcad 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -201,7 +202,7 @@ public class ContainerMapping implements Mapping {
.parseFrom(containerBytes);
contInfo = ContainerInfo.fromProtobuf(temp);
Pipeline pipeline = pipelineSelector
- .getPipeline(contInfo.getPipelineName(),
+ .getPipeline(contInfo.getPipelineID(),
contInfo.getReplicationType());
if(pipeline == null) {
@@ -381,7 +382,7 @@ public class ContainerMapping implements Mapping {
.updateContainerState(containerInfo, event);
if (!updatedContainer.isContainerOpen()) {
Pipeline pipeline = pipelineSelector
- .getPipeline(containerInfo.getPipelineName(),
+ .getPipeline(containerInfo.getPipelineID(),
containerInfo.getReplicationType());
pipelineSelector.closePipelineIfNoOpenContainers(pipeline);
}
@@ -462,7 +463,7 @@ public class ContainerMapping implements Mapping {
return null;
}
Pipeline pipeline = pipelineSelector
- .getPipeline(containerInfo.getPipelineName(),
+ .getPipeline(containerInfo.getPipelineID(),
containerInfo.getReplicationType());
if (pipeline == null) {
pipeline = pipelineSelector
@@ -527,7 +528,8 @@ public class ContainerMapping implements Mapping {
// If the container is closed, then state is already written to SCM
Pipeline pipeline =
- pipelineSelector.getPipeline(newState.getPipelineName(),
+ pipelineSelector.getPipeline(
+ PipelineID.getFromProtobuf(newState.getPipelineID()),
newState.getReplicationType());
if(pipeline == null) {
pipeline = pipelineSelector
@@ -570,7 +572,7 @@ public class ContainerMapping implements Mapping {
HddsProtos.SCMContainerInfo.Builder builder =
HddsProtos.SCMContainerInfo.newBuilder();
builder.setContainerID(knownState.getContainerID())
- .setPipelineName(knownState.getPipelineName())
+ .setPipelineID(knownState.getPipelineID())
.setReplicationType(knownState.getReplicationType())
.setReplicationFactor(knownState.getReplicationFactor());
@@ -725,7 +727,7 @@ public class ContainerMapping implements Mapping {
.setAllocatedBytes(info.getAllocatedBytes())
.setNumberOfKeys(oldInfo.getNumberOfKeys())
.setOwner(oldInfo.getOwner())
- .setPipelineName(oldInfo.getPipelineName())
+ .setPipelineID(oldInfo.getPipelineID())
.setState(oldInfo.getState())
.setUsedBytes(oldInfo.getUsedBytes())
.setDeleteTransactionId(oldInfo.getDeleteTransactionId())
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 6b983a6..5df7dc7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.states.ContainerState;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
@@ -299,7 +300,7 @@ public class ContainerStateManager implements Closeable {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(HddsProtos.LifeCycleState.ALLOCATED)
- .setPipelineName(pipeline.getPipelineName())
+ .setPipelineID(pipeline.getId())
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0)
@@ -467,12 +468,12 @@ public class ContainerStateManager implements Closeable {
/**
* Returns a set of open ContainerIDs that reside on a pipeline.
*
- * @param pipeline Pipeline of the Containers.
+ * @param pipelineID PipelineID of the Containers.
* @return Set of containers that match the specific query parameters.
*/
- public NavigableSet<ContainerID> getMatchingContainerIDsByPipeline(String
- pipeline) {
- return containers.getOpenContainerIDsByPipeline(pipeline);
+ public NavigableSet<ContainerID> getMatchingContainerIDsByPipeline(PipelineID
+ pipelineID) {
+ return containers.getOpenContainerIDsByPipeline(pipelineID);
}
/**
@@ -485,7 +486,8 @@ public class ContainerStateManager implements Closeable {
public ContainerWithPipeline getContainer(PipelineSelector selector,
ContainerID containerID) throws IOException {
ContainerInfo info = containers.getContainerInfo(containerID.getId());
- Pipeline pipeline = selector.getPipeline(info.getPipelineName(), info.getReplicationType());
+ Pipeline pipeline = selector.getPipeline(info.getPipelineID(),
+ info.getReplicationType());
return new ContainerWithPipeline(info, pipeline);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
index ba691ca..26b1548 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
@@ -132,7 +133,8 @@ public class ContainerCloser {
for (DatanodeDetails datanodeDetails : pipeline.getMachines()) {
nodeManager.addDatanodeCommand(datanodeDetails.getUuid(),
new CloseContainerCommand(info.getContainerID(),
- info.getReplicationType()));
+ info.getReplicationType(),
+ PipelineID.getFromProtobuf(info.getPipelineID())));
}
if (!commandIssued.containsKey(info.getContainerID())) {
commandIssued.put(info.getContainerID(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index b358b7c..f840b27 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -25,6 +25,7 @@ import java.util.Set;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -96,7 +97,7 @@ public class ContainerStateMap {
//1. Dead datanode.
//2. Datanode out of space.
//3. Volume loss or volume out of space.
- private final ContainerAttribute<String> openPipelineMap;
+ private final ContainerAttribute<PipelineID> openPipelineMap;
private final Map<ContainerID, ContainerInfo> containerMap;
// Map to hold replicas of given container.
@@ -153,7 +154,7 @@ public class ContainerStateMap {
factorMap.insert(info.getReplicationFactor(), id);
typeMap.insert(info.getReplicationType(), id);
if (info.isContainerOpen()) {
- openPipelineMap.insert(info.getPipelineName(), id);
+ openPipelineMap.insert(info.getPipelineID(), id);
}
LOG.trace("Created container with {} successfully.", id);
}
@@ -347,7 +348,7 @@ public class ContainerStateMap {
// In case the container is set to closed state, it needs to be removed from
// the pipeline Map.
if (!info.isContainerOpen()) {
- openPipelineMap.remove(info.getPipelineName(), id);
+ openPipelineMap.remove(info.getPipelineID(), id);
}
}
@@ -382,14 +383,15 @@ public class ContainerStateMap {
/**
* Returns Open containers in the SCM by the Pipeline
*
- * @param pipeline - Pipeline name.
+ * @param pipelineID - Pipeline id.
* @return NavigableSet<ContainerID>
*/
- public NavigableSet<ContainerID> getOpenContainerIDsByPipeline(String pipeline) {
- Preconditions.checkNotNull(pipeline);
+ public NavigableSet<ContainerID> getOpenContainerIDsByPipeline(
+ PipelineID pipelineID) {
+ Preconditions.checkNotNull(pipelineID);
try (AutoCloseableLock lock = autoLock.acquire()) {
- return openPipelineMap.getCollection(pipeline);
+ return openPipelineMap.getCollection(pipelineID);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
index 6d5575b..26b8b95 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
@@ -107,6 +107,14 @@ public class DatanodeInfo extends DatanodeDetails {
}
}
+ /**
+ * Returns the last updated time of datanode info.
+ * @return the last updated time of datanode info.
+ */
+ public long getLastStatsUpdatedTime() {
+ return lastStatsUpdatedTime;
+ }
+
@Override
public int hashCode() {
return super.hashCode();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
index 266b1f3..7d91ee4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,8 +38,8 @@ import java.util.concurrent.atomic.AtomicInteger;
public abstract class PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(PipelineManager.class);
- private final List<Pipeline> activePipelines;
- private final Map<String, Pipeline> pipelineMap;
+ private final List<PipelineID> activePipelines;
+ private final Map<PipelineID, Pipeline> pipelineMap;
private final AtomicInteger pipelineIndex;
private final Node2PipelineMap node2PipelineMap;
@@ -64,7 +65,7 @@ public abstract class PipelineManager {
if (pipeline != null) {
LOG.debug("re-used pipeline:{} for container with " +
"replicationType:{} replicationFactor:{}",
- pipeline.getPipelineName(), replicationType, replicationFactor);
+ pipeline.getId(), replicationType, replicationFactor);
}
if (pipeline == null) {
LOG.error("Get pipeline call failed. We are not able to find" +
@@ -78,19 +79,19 @@ public abstract class PipelineManager {
/**
* This function to get pipeline with given pipeline name.
*
- * @param pipelineName
+ * @param id
* @return a Pipeline.
*/
- public synchronized final Pipeline getPipeline(String pipelineName) {
+ public synchronized final Pipeline getPipeline(PipelineID id) {
Pipeline pipeline = null;
// 1. Check if pipeline already exists
- if (pipelineMap.containsKey(pipelineName)) {
- pipeline = pipelineMap.get(pipelineName);
- LOG.debug("Returning pipeline for pipelineName:{}", pipelineName);
+ if (pipelineMap.containsKey(id)) {
+ pipeline = pipelineMap.get(id);
+ LOG.debug("Returning pipeline for pipelineName:{}", id);
return pipeline;
} else {
- LOG.debug("Unable to find pipeline for pipelineName:{}", pipelineName);
+ LOG.debug("Unable to find pipeline for pipelineName:{}", id);
}
return pipeline;
}
@@ -132,9 +133,10 @@ public abstract class PipelineManager {
int nextIndex = sentinal;
for (; startIndex != nextIndex; nextIndex = getNextIndex()) {
// Just walk the list in a circular way.
- Pipeline temp =
+ PipelineID id =
activePipelines
.get(nextIndex != sentinal ? nextIndex : startIndex);
+ Pipeline temp = pipelineMap.get(id);
// if we find an operational pipeline just return that.
if ((temp.getLifeCycleState() == LifeCycleState.OPEN) &&
(temp.getFactor() == factor) && (temp.getType() == type)) {
@@ -165,9 +167,9 @@ public abstract class PipelineManager {
if (pipeline != null) {
LOG.debug("created new pipeline:{} for container with "
+ "replicationType:{} replicationFactor:{}",
- pipeline.getPipelineName(), replicationType, replicationFactor);
- activePipelines.add(pipeline);
- pipelineMap.put(pipeline.getPipelineName(), pipeline);
+ pipeline.getId(), replicationType, replicationFactor);
+ activePipelines.add(pipeline.getId());
+ pipelineMap.put(pipeline.getId(), pipeline);
node2PipelineMap.addPipeline(pipeline);
}
return pipeline;
@@ -178,7 +180,7 @@ public abstract class PipelineManager {
* @param pipeline pipeline to be finalized
*/
public synchronized void finalizePipeline(Pipeline pipeline) {
- activePipelines.remove(pipeline);
+ activePipelines.remove(pipeline.getId());
}
/**
@@ -186,7 +188,7 @@ public abstract class PipelineManager {
* @param pipeline
*/
public void closePipeline(Pipeline pipeline) {
- pipelineMap.remove(pipeline.getPipelineName());
+ pipelineMap.remove(pipeline.getId());
node2PipelineMap.removePipeline(pipeline);
}
@@ -194,12 +196,12 @@ public abstract class PipelineManager {
* list members in the pipeline .
* @return the datanode
*/
- public abstract List<DatanodeDetails> getMembers(String pipelineID)
+ public abstract List<DatanodeDetails> getMembers(PipelineID pipelineID)
throws IOException;
/**
* Update the datanode list of the pipeline.
*/
- public abstract void updatePipeline(String pipelineID,
+ public abstract void updatePipeline(PipelineID pipelineID,
List<DatanodeDetails> newDatanodes) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
index ebe39d0..028d14b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
@@ -184,13 +185,13 @@ public class PipelineSelector {
*/
public static Pipeline newPipelineFromNodes(
List<DatanodeDetails> nodes, ReplicationType replicationType,
- ReplicationFactor replicationFactor, String name) {
+ ReplicationFactor replicationFactor, PipelineID id) {
Preconditions.checkNotNull(nodes);
Preconditions.checkArgument(nodes.size() > 0);
String leaderId = nodes.get(0).getUuidString();
// A new pipeline always starts in allocated state
Pipeline pipeline = new Pipeline(leaderId, LifeCycleState.ALLOCATED,
- replicationType, replicationFactor, name);
+ replicationType, replicationFactor, id);
for (DatanodeDetails node : nodes) {
pipeline.addMember(node);
}
@@ -304,16 +305,16 @@ public class PipelineSelector {
* This function to return pipeline for given pipeline name and replication
* type.
*/
- public Pipeline getPipeline(String pipelineName,
+ public Pipeline getPipeline(PipelineID pipelineID,
ReplicationType replicationType) throws IOException {
- if (pipelineName == null) {
+ if (pipelineID == null) {
return null;
}
PipelineManager manager = getPipelineManager(replicationType);
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
LOG.debug("Getting replication pipeline forReplicationType {} :" +
- " pipelineName:{}", replicationType, pipelineName);
- return manager.getPipeline(pipelineName);
+ " pipelineName:{}", replicationType, pipelineID);
+ return manager.getPipeline(pipelineID);
}
/**
@@ -322,7 +323,7 @@ public class PipelineSelector {
public void finalizePipeline(Pipeline pipeline) throws IOException {
PipelineManager manager = getPipelineManager(pipeline.getType());
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
- LOG.debug("Finalizing pipeline. pipelineID: {}", pipeline.getPipelineName());
+ LOG.debug("Finalizing pipeline. pipelineID: {}", pipeline.getId());
// Remove the pipeline from active allocation
manager.finalizePipeline(pipeline);
updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.FINALIZE);
@@ -337,10 +338,10 @@ public class PipelineSelector {
return;
}
NavigableSet<ContainerID> containerIDS = containerStateManager
- .getMatchingContainerIDsByPipeline(pipeline.getPipelineName());
+ .getMatchingContainerIDsByPipeline(pipeline.getId());
if (containerIDS.size() == 0) {
updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CLOSE);
- LOG.info("Closing pipeline. pipelineID: {}", pipeline.getPipelineName());
+ LOG.info("Closing pipeline. pipelineID: {}", pipeline.getId());
}
}
@@ -350,10 +351,10 @@ public class PipelineSelector {
private void closePipeline(Pipeline pipeline) {
PipelineManager manager = getPipelineManager(pipeline.getType());
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
- LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getPipelineName());
+ LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getId());
NavigableSet<ContainerID> containers =
containerStateManager
- .getMatchingContainerIDsByPipeline(pipeline.getPipelineName());
+ .getMatchingContainerIDsByPipeline(pipeline.getId());
Preconditions.checkArgument(containers.size() == 0);
manager.closePipeline(pipeline);
}
@@ -361,7 +362,7 @@ public class PipelineSelector {
private void closeContainersByPipeline(Pipeline pipeline) {
NavigableSet<ContainerID> containers =
containerStateManager
- .getMatchingContainerIDsByPipeline(pipeline.getPipelineName());
+ .getMatchingContainerIDsByPipeline(pipeline.getId());
for (ContainerID id : containers) {
eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, id);
}
@@ -372,7 +373,7 @@ public class PipelineSelector {
*/
public List<DatanodeDetails> getDatanodes(ReplicationType replicationType,
- String pipelineID) throws IOException {
+ PipelineID pipelineID) throws IOException {
PipelineManager manager = getPipelineManager(replicationType);
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
LOG.debug("Getting data nodes from pipeline : {}", pipelineID);
@@ -383,7 +384,7 @@ public class PipelineSelector {
* Update the datanodes in the list of the pipeline.
*/
- public void updateDatanodes(ReplicationType replicationType, String
+ public void updateDatanodes(ReplicationType replicationType, PipelineID
pipelineID, List<DatanodeDetails> newDatanodes) throws IOException {
PipelineManager manager = getPipelineManager(replicationType);
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
@@ -423,7 +424,7 @@ public class PipelineSelector {
String error = String.format("Failed to update pipeline state %s, " +
"reason: invalid state transition from state: %s upon " +
"event: %s.",
- pipeline.getPipelineName(), pipeline.getLifeCycleState(), event);
+ pipeline.getId(), pipeline.getLifeCycleState(), event);
LOG.error(error);
throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
index fdd0605..8b14483 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
@@ -20,6 +20,7 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -38,7 +39,6 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
-import java.util.UUID;
/**
* Implementation of {@link PipelineManager}.
@@ -48,7 +48,6 @@ import java.util.UUID;
public class RatisManagerImpl extends PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(RatisManagerImpl.class);
- private static final String PREFIX = "Ratis-";
private final Configuration conf;
private final NodeManager nodeManager;
private final Set<DatanodeDetails> ratisMembers;
@@ -87,12 +86,11 @@ public class RatisManagerImpl extends PipelineManager {
// once a datanode has been added to a pipeline, exclude it from
// further allocations
ratisMembers.addAll(newNodesList);
- LOG.info("Allocating a new ratis pipeline of size: {}", count);
- // Start all pipeline names with "Ratis", easy to grep the logs.
- String pipelineName = PREFIX +
- UUID.randomUUID().toString().substring(PREFIX.length());
+ PipelineID pipelineID = PipelineID.randomId();
+ LOG.info("Allocating a new ratis pipeline of size: {} id: {}",
+ count, pipelineID);
return PipelineSelector.newPipelineFromNodes(newNodesList,
- ReplicationType.RATIS, factor, pipelineName);
+ ReplicationType.RATIS, factor, pipelineID);
}
}
}
@@ -103,7 +101,7 @@ public class RatisManagerImpl extends PipelineManager {
//TODO:move the initialization from SCM to client
try (XceiverClientRatis client =
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
- client.createPipeline(pipeline.getPipelineName(), pipeline.getMachines());
+ client.createPipeline(pipeline);
}
}
@@ -126,7 +124,7 @@ public class RatisManagerImpl extends PipelineManager {
* @return the datanode
*/
@Override
- public List<DatanodeDetails> getMembers(String pipelineID)
+ public List<DatanodeDetails> getMembers(PipelineID pipelineID)
throws IOException {
return null;
}
@@ -138,7 +136,7 @@ public class RatisManagerImpl extends PipelineManager {
* @param newDatanodes
*/
@Override
- public void updatePipeline(String pipelineID,
+ public void updatePipeline(PipelineID pipelineID,
List<DatanodeDetails> newDatanodes)
throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
index 0506e59..f1b23f5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.pipelines.standalone;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -36,7 +37,6 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
-import java.util.UUID;
/**
* Standalone Manager Impl to prove that pluggable interface
@@ -85,11 +85,11 @@ public class StandaloneManagerImpl extends PipelineManager {
// once a datanode has been added to a pipeline, exclude it from
// further allocations
standAloneMembers.addAll(newNodesList);
- LOG.info("Allocating a new standalone pipeline of size: {}", count);
- String pipelineName =
- "SA-" + UUID.randomUUID().toString().substring(3);
+ PipelineID pipelineID = PipelineID.randomId();
+ LOG.info("Allocating a new standalone pipeline of size: {} id: {}",
+ count, pipelineID);
return PipelineSelector.newPipelineFromNodes(newNodesList,
- ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
+ ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineID);
}
}
}
@@ -118,7 +118,7 @@ public class StandaloneManagerImpl extends PipelineManager {
* @return the datanode
*/
@Override
- public List<DatanodeDetails> getMembers(String pipelineID)
+ public List<DatanodeDetails> getMembers(PipelineID pipelineID)
throws IOException {
return null;
}
@@ -130,7 +130,7 @@ public class StandaloneManagerImpl extends PipelineManager {
* @param newDatanodes
*/
@Override
- public void updatePipeline(String pipelineID, List<DatanodeDetails>
+ public void updatePipeline(PipelineID pipelineID, List<DatanodeDetails>
newDatanodes) throws IOException {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index e86717b..1d06ea4 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.scm.container.Mapping;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -381,11 +382,12 @@ public class TestDeletedBlockLog {
private void mockContainerInfo(long containerID, DatanodeDetails dd) throws IOException {
Pipeline pipeline =
new Pipeline("fake", LifeCycleState.OPEN,
- ReplicationType.STAND_ALONE, ReplicationFactor.ONE, "fake");
+ ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+ PipelineID.randomId());
pipeline.addMember(dd);
ContainerInfo.Builder builder = new ContainerInfo.Builder();
- builder.setPipelineName(pipeline.getPipelineName())
+ builder.setPipelineID(pipeline.getId())
.setReplicationType(pipeline.getType())
.setReplicationFactor(pipeline.getFactor());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index 363db99..e7b6cd9 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo
.Builder;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.replication
.ReplicationActivityStatus;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
@@ -109,7 +110,8 @@ public class TestContainerReportHandler implements EventPublisher {
PipelineSelector pipelineSelector = Mockito.mock(PipelineSelector.class);
Pipeline pipeline = new Pipeline("leader", LifeCycleState.CLOSED,
- ReplicationType.STAND_ALONE, ReplicationFactor.THREE, "pipeline1");
+ ReplicationType.STAND_ALONE, ReplicationFactor.THREE,
+ PipelineID.randomId());
when(pipelineSelector.getReplicationPipeline(ReplicationType.STAND_ALONE,
ReplicationFactor.THREE)).thenReturn(pipeline);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index 9aa4b64..fa87706 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
@@ -206,10 +207,10 @@ public class TestReplicationManager {
final Iterator<DatanodeDetails> i = ids.iterator();
Preconditions.checkArgument(i.hasNext());
final DatanodeDetails leader = i.next();
- String pipelineName = "TEST-" + UUID.randomUUID().toString().substring(3);
final Pipeline pipeline =
new Pipeline(leader.getUuidString(), LifeCycleState.OPEN,
- ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
+ ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+ PipelineID.randomId());
pipeline.addMember(leader);
for (; i.hasNext(); ) {
pipeline.addMember(i.next());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
index cfa20be..9d4346a 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -1123,7 +1124,8 @@ public class TestNodeManager {
.register(datanodeDetails, TestUtils.createNodeReport(report));
eq.fireEvent(DATANODE_COMMAND,
new CommandForDatanode<>(datanodeDetails.getUuid(),
- new CloseContainerCommand(1L, ReplicationType.STAND_ALONE)));
+ new CloseContainerCommand(1L, ReplicationType.STAND_ALONE,
+ PipelineID.randomId())));
eq.processAll(1000L);
List<SCMCommand> command =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index e9359b8..5071d8d 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.statemachine
@@ -430,6 +431,7 @@ public class TestEndPoint {
CloseContainerCommandProto.newBuilder().setCmdId(1)
.setContainerID(1)
.setReplicationType(ReplicationType.RATIS)
+ .setPipelineID(PipelineID.randomId().getProtobuf())
.build())
.setCommandType(Type.closeContainerCommand)
.build();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
index ffac6d5..c0cd293 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
@@ -87,7 +87,7 @@ public class TestNode2PipelineMap {
public void testPipelineMap() throws IOException {
NavigableSet<ContainerID> set = stateMap.getOpenContainerIDsByPipeline(
- ratisContainer.getPipeline().getPipelineName());
+ ratisContainer.getPipeline().getId());
long cId = ratisContainer.getContainerInfo().getContainerID();
Assert.assertEquals(1, set.size());
@@ -100,8 +100,8 @@ public class TestNode2PipelineMap {
Set<Pipeline> pipelines = mapping.getPipelineSelector()
.getNode2PipelineMap().getPipelines(dns.get(0).getUuid());
Assert.assertEquals(1, pipelines.size());
- pipelines.forEach(p -> Assert.assertEquals(p.getPipelineName(),
- ratisContainer.getPipeline().getPipelineName()));
+ pipelines.forEach(p -> Assert.assertEquals(p.getId(),
+ ratisContainer.getPipeline().getId()));
// Now close the container and it should not show up while fetching
@@ -115,7 +115,7 @@ public class TestNode2PipelineMap {
mapping
.updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE);
NavigableSet<ContainerID> set2 = stateMap.getOpenContainerIDsByPipeline(
- ratisContainer.getPipeline().getPipelineName());
+ ratisContainer.getPipeline().getId());
Assert.assertEquals(0, set2.size());
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4031a8f/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index 24e25ab..8915a82 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -93,7 +93,7 @@ public class TestPipelineClose {
@Test
public void testPipelineCloseWithClosedContainer() throws IOException {
NavigableSet<ContainerID> set = stateMap.getOpenContainerIDsByPipeline(
- ratisContainer1.getPipeline().getPipelineName());
+ ratisContainer1.getPipeline().getId());
long cId = ratisContainer1.getContainerInfo().getContainerID();
Assert.assertEquals(1, set.size());
@@ -111,12 +111,12 @@ public class TestPipelineClose {
.updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE);
NavigableSet<ContainerID> setClosed = stateMap.getOpenContainerIDsByPipeline(
- ratisContainer1.getPipeline().getPipelineName());
+ ratisContainer1.getPipeline().getId());
Assert.assertEquals(0, setClosed.size());
pipelineSelector.finalizePipeline(ratisContainer1.getPipeline());
Pipeline pipeline1 = pipelineSelector
- .getPipeline(ratisContainer1.getPipeline().getPipelineName(),
+ .getPipeline(ratisContainer1.getPipeline().getId(),
ratisContainer1.getContainerInfo().getReplicationType());
Assert.assertNull(pipeline1);
Assert.assertEquals(ratisContainer1.getPipeline().getLifeCycleState(),
@@ -132,7 +132,7 @@ public class TestPipelineClose {
public void testPipelineCloseWithOpenContainer() throws IOException,
TimeoutException, InterruptedException {
NavigableSet<ContainerID> setOpen = stateMap.getOpenContainerIDsByPipeline(
- ratisContainer2.getPipeline().getPipelineName());
+ ratisContainer2.getPipeline().getId());
Assert.assertEquals(1, setOpen.size());
long cId2 = ratisContainer2.getContainerInfo().getContainerID();
@@ -144,7 +144,7 @@ public class TestPipelineClose {
Assert.assertEquals(ratisContainer2.getPipeline().getLifeCycleState(),
HddsProtos.LifeCycleState.CLOSING);
Pipeline pipeline2 = pipelineSelector
- .getPipeline(ratisContainer2.getPipeline().getPipelineName(),
+ .getPipeline(ratisContainer2.getPipeline().getId(),
ratisContainer2.getContainerInfo().getReplicationType());
Assert.assertEquals(pipeline2.getLifeCycleState(),
HddsProtos.LifeCycleState.CLOSING);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org