You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/07/03 21:12:07 UTC
[1/3] hadoop git commit: HDDS-175. Refactor ContainerInfo to remove
Pipeline object from it. Contributed by Ajay Kumar.
Repository: hadoop
Updated Branches:
refs/heads/trunk 93ac01cb5 -> 7ca4f0cef
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java
index d6f5d32..a9781b1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -79,14 +79,16 @@ public class TestXceiverClientMetrics {
OzoneConfiguration conf = new OzoneConfiguration();
XceiverClientManager clientManager = new XceiverClientManager(conf);
- ContainerInfo container = storageContainerLocationClient
+ ContainerWithPipeline container = storageContainerLocationClient
.allocateContainer(clientManager.getType(), clientManager.getFactor(),
containerOwner);
- XceiverClientSpi client = clientManager.acquireClient(
- container.getPipeline(), container.getContainerID());
+ XceiverClientSpi client = clientManager
+ .acquireClient(container.getPipeline(),
+ container.getContainerInfo().getContainerID());
ContainerCommandRequestProto request = ContainerTestHelper
- .getCreateContainerRequest(container.getContainerID(),
+ .getCreateContainerRequest(
+ container.getContainerInfo().getContainerID(),
container.getPipeline());
client.sendCommand(request);
@@ -112,7 +114,7 @@ public class TestXceiverClientMetrics {
// use async interface for testing pending metrics
for (int i = 0; i < numRequest; i++) {
BlockID blockID = ContainerTestHelper.
- getTestBlockID(container.getContainerID());
+ getTestBlockID(container.getContainerInfo().getContainerID());
ContainerProtos.ContainerCommandRequestProto smallFileRequest;
smallFileRequest = ContainerTestHelper.getWriteSmallFileRequest(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkContainerStateMap.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkContainerStateMap.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkContainerStateMap.java
index 375450c..c344bbe 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkContainerStateMap.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkContainerStateMap.java
@@ -60,7 +60,9 @@ public class BenchMarkContainerStateMap {
try {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(CLOSED)
- .setPipeline(pipeline)
+ .setPipelineName(pipeline.getPipelineName())
+ .setReplicationType(pipeline.getType())
+ .setReplicationFactor(pipeline.getFactor())
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0)
@@ -81,7 +83,9 @@ public class BenchMarkContainerStateMap {
try {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(OPEN)
- .setPipeline(pipeline)
+ .setPipelineName(pipeline.getPipelineName())
+ .setReplicationType(pipeline.getType())
+ .setReplicationFactor(pipeline.getFactor())
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0)
@@ -101,7 +105,9 @@ public class BenchMarkContainerStateMap {
try {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(OPEN)
- .setPipeline(pipeline)
+ .setPipelineName(pipeline.getPipelineName())
+ .setReplicationType(pipeline.getType())
+ .setReplicationFactor(pipeline.getFactor())
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0)
@@ -166,7 +172,9 @@ public class BenchMarkContainerStateMap {
int cid = state.containerID.incrementAndGet();
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(CLOSED)
- .setPipeline(pipeline)
+ .setPipelineName(pipeline.getPipelineName())
+ .setReplicationType(pipeline.getType())
+ .setReplicationFactor(pipeline.getFactor())
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
index edc0d7b..26776c5 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.scm.cli;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
+import com.google.protobuf.ByteString;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
@@ -37,7 +38,6 @@ import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyI
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -86,12 +86,12 @@ public class SQLCLI extends Configured implements Tool {
private static final String CREATE_CONTAINER_INFO =
"CREATE TABLE containerInfo (" +
"containerID LONG PRIMARY KEY NOT NULL, " +
- "leaderUUID TEXT NOT NULL)";
- private static final String CREATE_CONTAINER_MEMBERS =
- "CREATE TABLE containerMembers (" +
- "containerName TEXT NOT NULL, " +
- "datanodeUUID TEXT NOT NULL," +
- "PRIMARY KEY(containerName, datanodeUUID));";
+ "replicationType TEXT NOT NULL," +
+ "replicationFactor TEXT NOT NULL," +
+ "usedBytes LONG NOT NULL," +
+ "allocatedBytes LONG NOT NULL," +
+ "owner TEXT," +
+ "numberOfKeys LONG)";
private static final String CREATE_DATANODE_INFO =
"CREATE TABLE datanodeInfo (" +
"hostName TEXT NOT NULL, " +
@@ -99,8 +99,10 @@ public class SQLCLI extends Configured implements Tool {
"ipAddress TEXT, " +
"containerPort INTEGER NOT NULL);";
private static final String INSERT_CONTAINER_INFO =
- "INSERT INTO containerInfo (containerID, leaderUUID) " +
- "VALUES (\"%d\", \"%s\")";
+ "INSERT INTO containerInfo (containerID, replicationType, "
+ + "replicationFactor, usedBytes, allocatedBytes, owner, "
+ + "numberOfKeys) VALUES (\"%d\", \"%s\", \"%s\", \"%d\", \"%d\", "
+ + "\"%s\", \"%d\")";
private static final String INSERT_DATANODE_INFO =
"INSERT INTO datanodeInfo (hostname, datanodeUUid, ipAddress, " +
"containerPort) " +
@@ -469,10 +471,7 @@ public class SQLCLI extends Configured implements Tool {
.setConf(conf).setDbFile(dbFile).build();
Connection conn = connectDB(outPath.toString())) {
executeSQL(conn, CREATE_CONTAINER_INFO);
- executeSQL(conn, CREATE_CONTAINER_MEMBERS);
- executeSQL(conn, CREATE_DATANODE_INFO);
- HashSet<String> uuidChecked = new HashSet<>();
dbStore.iterate(null, (key, value) -> {
long containerID = Longs.fromByteArray(key);
ContainerInfo containerInfo = null;
@@ -481,8 +480,7 @@ public class SQLCLI extends Configured implements Tool {
Preconditions.checkNotNull(containerInfo);
try {
//TODO: include container state to sqllite schema
- insertContainerDB(conn, containerID,
- containerInfo.getPipeline().getProtobufMessage(), uuidChecked);
+ insertContainerDB(conn, containerInfo, containerID);
return true;
} catch (SQLException e) {
throw new IOException(e);
@@ -494,38 +492,23 @@ public class SQLCLI extends Configured implements Tool {
/**
* Insert into the sqlite DB of container.db.
* @param conn the connection to the sqlite DB.
- * @param containerID the id of the container.
- * @param pipeline the actual container pipeline object.
- * @param uuidChecked the uuid that has been already inserted.
+ * @param containerInfo
+ * @param containerID
* @throws SQLException throws exception.
*/
- private void insertContainerDB(Connection conn, long containerID,
- Pipeline pipeline, Set<String> uuidChecked) throws SQLException {
+ private void insertContainerDB(Connection conn, ContainerInfo containerInfo,
+ long containerID) throws SQLException {
LOG.info("Insert to sql container db, for container {}", containerID);
String insertContainerInfo = String.format(
INSERT_CONTAINER_INFO, containerID,
- pipeline.getLeaderID());
- executeSQL(conn, insertContainerInfo);
+ containerInfo.getReplicationType(),
+ containerInfo.getReplicationFactor(),
+ containerInfo.getUsedBytes(),
+ containerInfo.getAllocatedBytes(),
+ containerInfo.getOwner(),
+ containerInfo.getNumberOfKeys());
- for (HddsProtos.DatanodeDetailsProto dd :
- pipeline.getMembersList()) {
- String uuid = dd.getUuid();
- if (!uuidChecked.contains(uuid)) {
- // we may also not use this checked set, but catch exception instead
- // but this seems a bit cleaner.
- String ipAddr = dd.getIpAddress();
- String hostName = dd.getHostName();
- int containerPort = DatanodeDetails.getFromProtoBuf(dd)
- .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
- String insertMachineInfo = String.format(
- INSERT_DATANODE_INFO, hostName, uuid, ipAddr, containerPort);
- executeSQL(conn, insertMachineInfo);
- uuidChecked.add(uuid);
- }
- String insertContainerMembers = String.format(
- INSERT_CONTAINER_MEMBERS, containerID, uuid);
- executeSQL(conn, insertContainerMembers);
- }
+ executeSQL(conn, insertContainerInfo);
LOG.info("Insertion completed.");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/3] hadoop git commit: HDDS-175. Refactor ContainerInfo to remove
Pipeline object from it. Contributed by Ajay Kumar.
Posted by ae...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index d06d568..9255ec7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
import org.apache.hadoop.hdds.scm.container.Mapping;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -362,10 +363,16 @@ public class TestDeletedBlockLog {
pipeline.addMember(dd);
ContainerInfo.Builder builder = new ContainerInfo.Builder();
- builder.setPipeline(pipeline);
-
- ContainerInfo conatinerInfo = builder.build();
- Mockito.doReturn(conatinerInfo).when(mappingService)
+ builder.setPipelineName(pipeline.getPipelineName())
+ .setReplicationType(pipeline.getType())
+ .setReplicationFactor(pipeline.getFactor());
+
+ ContainerInfo containerInfo = builder.build();
+ ContainerWithPipeline containerWithPipeline = new ContainerWithPipeline(
+ containerInfo, pipeline);
+ Mockito.doReturn(containerInfo).when(mappingService)
.getContainer(containerID);
+ Mockito.doReturn(containerWithPipeline).when(mappingService)
+ .getContainerWithPipeline(containerID);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index 09ade3e..721dbf6 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
@@ -97,7 +97,7 @@ public class TestCloseContainerEventHandler {
new ContainerID(id));
eventQueue.processAll(1000);
Assert.assertTrue(logCapturer.getOutput()
- .contains("Container with id : " + id + " does not exist"));
+ .contains("Failed to update the container state"));
}
@Test
@@ -105,11 +105,12 @@ public class TestCloseContainerEventHandler {
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(CloseContainerEventHandler.LOG);
- ContainerInfo info = mapping
+ ContainerWithPipeline containerWithPipeline = mapping
.allocateContainer(HddsProtos.ReplicationType.STAND_ALONE,
HddsProtos.ReplicationFactor.ONE, "ozone");
- ContainerID id = new ContainerID(info.getContainerID());
- DatanodeDetails datanode = info.getPipeline().getLeader();
+ ContainerID id = new ContainerID(
+ containerWithPipeline.getContainerInfo().getContainerID());
+ DatanodeDetails datanode = containerWithPipeline.getPipeline().getLeader();
int closeCount = nodeManager.getCommandCount(datanode);
eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT, id);
eventQueue.processAll(1000);
@@ -125,7 +126,8 @@ public class TestCloseContainerEventHandler {
mapping.updateContainerState(id.getId(), CREATE);
mapping.updateContainerState(id.getId(), CREATED);
eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT,
- new ContainerID(info.getContainerID()));
+ new ContainerID(
+ containerWithPipeline.getContainerInfo().getContainerID()));
eventQueue.processAll(1000);
Assert.assertEquals(closeCount + 1, nodeManager.getCommandCount(datanode));
Assert.assertEquals(HddsProtos.LifeCycleState.CLOSING,
@@ -137,20 +139,23 @@ public class TestCloseContainerEventHandler {
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(CloseContainerEventHandler.LOG);
- ContainerInfo info = mapping
+ ContainerWithPipeline containerWithPipeline = mapping
.allocateContainer(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE, "ozone");
- ContainerID id = new ContainerID(info.getContainerID());
+ ContainerID id = new ContainerID(
+ containerWithPipeline.getContainerInfo().getContainerID());
int[] closeCount = new int[3];
eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT, id);
eventQueue.processAll(1000);
int i = 0;
- for (DatanodeDetails details : info.getPipeline().getMachines()) {
+ for (DatanodeDetails details : containerWithPipeline.getPipeline()
+ .getMachines()) {
closeCount[i] = nodeManager.getCommandCount(details);
i++;
}
i = 0;
- for (DatanodeDetails details : info.getPipeline().getMachines()) {
+ for (DatanodeDetails details : containerWithPipeline.getPipeline()
+ .getMachines()) {
Assert.assertEquals(closeCount[i], nodeManager.getCommandCount(details));
i++;
}
@@ -161,12 +166,12 @@ public class TestCloseContainerEventHandler {
//Execute these state transitions so that we can close the container.
mapping.updateContainerState(id.getId(), CREATE);
mapping.updateContainerState(id.getId(), CREATED);
- eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT,
- new ContainerID(info.getContainerID()));
+ eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT, id);
eventQueue.processAll(1000);
i = 0;
// Make sure close is queued for each datanode on the pipeline
- for (DatanodeDetails details : info.getPipeline().getMachines()) {
+ for (DatanodeDetails details : containerWithPipeline.getPipeline()
+ .getMachines()) {
Assert.assertEquals(closeCount[i] + 1,
nodeManager.getCommandCount(details));
Assert.assertEquals(HddsProtos.LifeCycleState.CLOSING,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java
index eefb639..42ab126 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -103,7 +104,7 @@ public class TestContainerMapping {
@Test
public void testallocateContainer() throws Exception {
- ContainerInfo containerInfo = mapping.allocateContainer(
+ ContainerWithPipeline containerInfo = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
containerOwner);
@@ -120,7 +121,7 @@ public class TestContainerMapping {
*/
Set<UUID> pipelineList = new TreeSet<>();
for (int x = 0; x < 30; x++) {
- ContainerInfo containerInfo = mapping.allocateContainer(
+ ContainerWithPipeline containerInfo = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
containerOwner);
@@ -135,14 +136,13 @@ public class TestContainerMapping {
@Test
public void testGetContainer() throws IOException {
- ContainerInfo containerInfo = mapping.allocateContainer(
+ ContainerWithPipeline containerInfo = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
containerOwner);
Pipeline pipeline = containerInfo.getPipeline();
Assert.assertNotNull(pipeline);
- Pipeline newPipeline = mapping.getContainer(
- containerInfo.getContainerID()).getPipeline();
+ Pipeline newPipeline = containerInfo.getPipeline();
Assert.assertEquals(pipeline.getLeader().getUuid(),
newPipeline.getLeader().getUuid());
}
@@ -165,12 +165,12 @@ public class TestContainerMapping {
public void testContainerCreationLeaseTimeout() throws IOException,
InterruptedException {
nodeManager.setChillmode(false);
- ContainerInfo containerInfo = mapping.allocateContainer(
+ ContainerWithPipeline containerInfo = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
containerOwner);
- mapping.updateContainerState(containerInfo.getContainerID(),
- HddsProtos.LifeCycleEvent.CREATE);
+ mapping.updateContainerState(containerInfo.getContainerInfo()
+ .getContainerID(), HddsProtos.LifeCycleEvent.CREATE);
Thread.sleep(TIMEOUT + 1000);
NavigableSet<ContainerID> deleteContainers = mapping.getStateManager()
@@ -179,12 +179,14 @@ public class TestContainerMapping {
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.DELETING);
- Assert.assertTrue(deleteContainers.contains(containerInfo.containerID()));
+ Assert.assertTrue(deleteContainers
+ .contains(containerInfo.getContainerInfo().containerID()));
thrown.expect(IOException.class);
thrown.expectMessage("Lease Exception");
- mapping.updateContainerState(containerInfo.getContainerID(),
- HddsProtos.LifeCycleEvent.CREATED);
+ mapping
+ .updateContainerState(containerInfo.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATED);
}
@Test
@@ -294,10 +296,11 @@ public class TestContainerMapping {
private ContainerInfo createContainer()
throws IOException {
nodeManager.setChillmode(false);
- ContainerInfo containerInfo = mapping.allocateContainer(
+ ContainerWithPipeline containerWithPipeline = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
containerOwner);
+ ContainerInfo containerInfo = containerWithPipeline.getContainerInfo();
mapping.updateContainerState(containerInfo.getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
mapping.updateContainerState(containerInfo.getContainerID(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java
index 0d7848f..74238a7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.container.TestContainerMapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -91,9 +92,10 @@ public class TestContainerCloser {
@Test
public void testClose() throws IOException {
- ContainerInfo info = mapping.allocateContainer(
+ ContainerWithPipeline containerWithPipeline = mapping.allocateContainer(
HddsProtos.ReplicationType.STAND_ALONE,
HddsProtos.ReplicationFactor.ONE, "ozone");
+ ContainerInfo info = containerWithPipeline.getContainerInfo();
//Execute these state transitions so that we can close the container.
mapping.updateContainerState(info.getContainerID(), CREATE);
@@ -101,7 +103,7 @@ public class TestContainerCloser {
long currentCount = mapping.getCloser().getCloseCount();
long runCount = mapping.getCloser().getThreadRunCount();
- DatanodeDetails datanode = info.getPipeline().getLeader();
+ DatanodeDetails datanode = containerWithPipeline.getPipeline().getLeader();
// Send a container report with used set to 1 GB. This should not close.
sendContainerReport(info, 1 * GIGABYTE);
@@ -138,9 +140,10 @@ public class TestContainerCloser {
configuration.setTimeDuration(OZONE_CONTAINER_REPORT_INTERVAL, 1,
TimeUnit.SECONDS);
- ContainerInfo info = mapping.allocateContainer(
+ ContainerWithPipeline containerWithPipeline = mapping.allocateContainer(
HddsProtos.ReplicationType.STAND_ALONE,
HddsProtos.ReplicationFactor.ONE, "ozone");
+ ContainerInfo info = containerWithPipeline.getContainerInfo();
//Execute these state transitions so that we can close the container.
mapping.updateContainerState(info.getContainerID(), CREATE);
@@ -148,10 +151,10 @@ public class TestContainerCloser {
long currentCount = mapping.getCloser().getCloseCount();
long runCount = mapping.getCloser().getThreadRunCount();
+ DatanodeDetails datanodeDetails = containerWithPipeline.getPipeline()
+ .getLeader();
- DatanodeDetails datanodeDetails = info.getPipeline().getLeader();
-
- // Send this command twice and assert we have only one command in the queue.
+ // Send this command twice and assert we have only one command in queue.
sendContainerReport(info, 5 * GIGABYTE);
sendContainerReport(info, 5 * GIGABYTE);
@@ -183,9 +186,10 @@ public class TestContainerCloser {
long runCount = mapping.getCloser().getThreadRunCount();
for (int x = 0; x < ContainerCloser.getCleanupWaterMark() + 10; x++) {
- ContainerInfo info = mapping.allocateContainer(
+ ContainerWithPipeline containerWithPipeline = mapping.allocateContainer(
HddsProtos.ReplicationType.STAND_ALONE,
HddsProtos.ReplicationFactor.ONE, "ozone");
+ ContainerInfo info = containerWithPipeline.getContainerInfo();
mapping.updateContainerState(info.getContainerID(), CREATE);
mapping.updateContainerState(info.getContainerID(), CREATED);
sendContainerReport(info, 5 * GIGABYTE);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 5ad28f6..98b0a28 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
@@ -151,11 +151,11 @@ public class TestContainerPlacement {
assertTrue(nodeManager.isOutOfChillMode());
- ContainerInfo containerInfo = containerManager.allocateContainer(
+ ContainerWithPipeline containerWithPipeline = containerManager.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), "OZONE");
assertEquals(xceiverClientManager.getFactor().getNumber(),
- containerInfo.getPipeline().getMachines().size());
+ containerWithPipeline.getPipeline().getMachines().size());
} finally {
IOUtils.closeQuietly(containerManager);
IOUtils.closeQuietly(nodeManager);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/CloseContainerHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/CloseContainerHandler.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/CloseContainerHandler.java
index 4f3b143..e2267da 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/CloseContainerHandler.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/CloseContainerHandler.java
@@ -24,9 +24,9 @@ import org.apache.commons.cli.Options;
import org.apache.hadoop.hdds.scm.cli.OzoneCommandHandler;
import org.apache.hadoop.hdds.scm.cli.SCMCLI;
import org.apache.hadoop.hdds.scm.client.ScmClient;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import java.io.IOException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
/**
* The handler of close container command.
@@ -51,15 +51,15 @@ public class CloseContainerHandler extends OzoneCommandHandler {
}
String containerID = cmd.getOptionValue(OPT_CONTAINER_ID);
- ContainerInfo container = getScmClient().
- getContainer(Long.parseLong(containerID));
+ ContainerWithPipeline container = getScmClient().
+ getContainerWithPipeline(Long.parseLong(containerID));
if (container == null) {
throw new IOException("Cannot close an non-exist container "
+ containerID);
}
logOut("Closing container : %s.", containerID);
- getScmClient().closeContainer(container.getContainerID(),
- container.getPipeline());
+ getScmClient()
+ .closeContainer(container.getContainerInfo().getContainerID());
logOut("Container closed.");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/DeleteContainerHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/DeleteContainerHandler.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/DeleteContainerHandler.java
index 20a6d9e..1b26665 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/DeleteContainerHandler.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/DeleteContainerHandler.java
@@ -25,9 +25,9 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.hadoop.hdds.scm.cli.OzoneCommandHandler;
import org.apache.hadoop.hdds.scm.client.ScmClient;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import java.io.IOException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import static org.apache.hadoop.hdds.scm.cli.SCMCLI.CMD_WIDTH;
import static org.apache.hadoop.hdds.scm.cli.SCMCLI.HELP_OP;
@@ -60,7 +60,7 @@ public class DeleteContainerHandler extends OzoneCommandHandler {
String containerID = cmd.getOptionValue(OPT_CONTAINER_ID);
- ContainerInfo container = getScmClient().getContainer(
+ ContainerWithPipeline container = getScmClient().getContainerWithPipeline(
Long.parseLong(containerID));
if (container == null) {
throw new IOException("Cannot delete an non-exist container "
@@ -68,8 +68,9 @@ public class DeleteContainerHandler extends OzoneCommandHandler {
}
logOut("Deleting container : %s.", containerID);
- getScmClient().deleteContainer(container.getContainerID(),
- container.getPipeline(), cmd.hasOption(OPT_FORCE));
+ getScmClient()
+ .deleteContainer(container.getContainerInfo().getContainerID(),
+ container.getPipeline(), cmd.hasOption(OPT_FORCE));
logOut("Container %s deleted.", containerID);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoContainerHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoContainerHandler.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoContainerHandler.java
index 6027bec..3716ace 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoContainerHandler.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoContainerHandler.java
@@ -24,7 +24,6 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.hadoop.hdds.scm.cli.OzoneCommandHandler;
import org.apache.hadoop.hdds.scm.client.ScmClient;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerData;
@@ -33,6 +32,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
import java.io.IOException;
import java.util.stream.Collectors;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import static org.apache.hadoop.hdds.scm.cli.SCMCLI.CMD_WIDTH;
import static org.apache.hadoop.hdds.scm.cli.SCMCLI.HELP_OP;
@@ -68,13 +68,12 @@ public class InfoContainerHandler extends OzoneCommandHandler {
}
}
String containerID = cmd.getOptionValue(OPT_CONTAINER_ID);
- ContainerInfo container = getScmClient().
- getContainer(Long.parseLong(containerID));
+ ContainerWithPipeline container = getScmClient().
+ getContainerWithPipeline(Long.parseLong(containerID));
Preconditions.checkNotNull(container, "Container cannot be null");
- ContainerData containerData =
- getScmClient().readContainer(container.getContainerID(),
- container.getPipeline());
+ ContainerData containerData = getScmClient().readContainer(container
+ .getContainerInfo().getContainerID(), container.getPipeline());
// Print container report info.
logOut("Container id: %s", containerID);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
index e1a2918..edd85aa 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
@@ -21,8 +21,8 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
@@ -271,17 +271,17 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
KsmKeyLocationInfo ksmKeyLocationInfo = keyLocationInfos.get(i);
BlockID blockID = ksmKeyLocationInfo.getBlockID();
long containerID = blockID.getContainerID();
- ContainerInfo container =
- storageContainerLocationClient.getContainer(containerID);
- XceiverClientSpi xceiverClient =
- xceiverClientManager.acquireClient(container.getPipeline(), containerID);
+ ContainerWithPipeline containerWithPipeline =
+ storageContainerLocationClient.getContainerWithPipeline(containerID);
+ XceiverClientSpi xceiverClient = xceiverClientManager
+ .acquireClient(containerWithPipeline.getPipeline(), containerID);
boolean success = false;
containerKey = ksmKeyLocationInfo.getLocalID();
try {
LOG.debug("get key accessing {} {}",
containerID, containerKey);
groupInputStream.streamOffset[i] = length;
- ContainerProtos.KeyData containerKeyData = OzoneContainerTranslation
+ ContainerProtos.KeyData containerKeyData = OzoneContainerTranslation
.containerKeyDataForRead(blockID);
ContainerProtos.GetKeyResponseProto response = ContainerProtocolCalls
.getKey(xceiverClient, containerKeyData, requestId);
@@ -292,7 +292,8 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
}
success = true;
ChunkInputStream inputStream = new ChunkInputStream(
- ksmKeyLocationInfo.getBlockID(), xceiverClientManager, xceiverClient,
+ ksmKeyLocationInfo.getBlockID(), xceiverClientManager,
+ xceiverClient,
chunks, requestId);
groupInputStream.addStream(inputStream,
ksmKeyLocationInfo.getLength());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
index c6e56b3..d1a3b46 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -163,10 +164,12 @@ public class ChunkGroupOutputStream extends OutputStream {
private void checkKeyLocationInfo(KsmKeyLocationInfo subKeyInfo)
throws IOException {
- ContainerInfo container = scmClient.getContainer(
- subKeyInfo.getContainerID());
+ ContainerWithPipeline containerWithPipeline = scmClient
+ .getContainerWithPipeline(subKeyInfo.getContainerID());
+ ContainerInfo container = containerWithPipeline.getContainerInfo();
+
XceiverClientSpi xceiverClient =
- xceiverClientManager.acquireClient(container.getPipeline(),
+ xceiverClientManager.acquireClient(containerWithPipeline.getPipeline(),
container.getContainerID());
// create container if needed
if (subKeyInfo.getShouldCreateContainer()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OzonePBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OzonePBHelper.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OzonePBHelper.java
new file mode 100644
index 0000000..8361bac
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OzonePBHelper.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocolPB;
+
+/**
+ * Helper class for converting protobuf objects.
+ */
+public final class OzonePBHelper {
+
+ private OzonePBHelper() {
+ /** Hidden constructor */
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
index bedd5c4..bb85650 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
@@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.container;
import com.google.common.primitives.Longs;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -30,7 +31,6 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
-import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
@@ -71,31 +71,35 @@ public class TestContainerStateManager {
@Test
public void testAllocateContainer() throws IOException {
// Allocate a container and verify the container info
- ContainerInfo container1 = scm.getClientProtocolServer().allocateContainer(
- xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), containerOwner);
+ ContainerWithPipeline container1 = scm.getClientProtocolServer()
+ .allocateContainer(
+ xceiverClientManager.getType(),
+ xceiverClientManager.getFactor(), containerOwner);
ContainerInfo info = containerStateManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.ALLOCATED);
- Assert.assertEquals(container1.getContainerID(), info.getContainerID());
+ Assert.assertEquals(container1.getContainerInfo().getContainerID(),
+ info.getContainerID());
Assert.assertEquals(OzoneConsts.GB * 3, info.getAllocatedBytes());
Assert.assertEquals(containerOwner, info.getOwner());
Assert.assertEquals(xceiverClientManager.getType(),
- info.getPipeline().getType());
+ info.getReplicationType());
Assert.assertEquals(xceiverClientManager.getFactor(),
- info.getPipeline().getFactor());
+ info.getReplicationFactor());
Assert.assertEquals(HddsProtos.LifeCycleState.ALLOCATED, info.getState());
// Check there are two containers in ALLOCATED state after allocation
- ContainerInfo container2 = scm.getClientProtocolServer().allocateContainer(
- xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), containerOwner);
+ ContainerWithPipeline container2 = scm.getClientProtocolServer()
+ .allocateContainer(
+ xceiverClientManager.getType(),
+ xceiverClientManager.getFactor(), containerOwner);
int numContainers = containerStateManager
.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.ALLOCATED).size();
- Assert.assertNotEquals(container1.getContainerID(), container2.getContainerID());
+ Assert.assertNotEquals(container1.getContainerInfo().getContainerID(),
+ container2.getContainerInfo().getContainerID());
Assert.assertEquals(2, numContainers);
}
@@ -105,14 +109,15 @@ public class TestContainerStateManager {
List<ContainerInfo> containers = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- ContainerInfo container = scm.getClientProtocolServer().allocateContainer(
- xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), containerOwner);
- containers.add(container);
+ ContainerWithPipeline container = scm.getClientProtocolServer()
+ .allocateContainer(
+ xceiverClientManager.getType(),
+ xceiverClientManager.getFactor(), containerOwner);
+ containers.add(container.getContainerInfo());
if (i >= 5) {
- scm.getScmContainerManager()
- .updateContainerState(container.getContainerID(),
- HddsProtos.LifeCycleEvent.CREATE);
+ scm.getScmContainerManager().updateContainerState(container
+ .getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATE);
}
}
@@ -134,34 +139,40 @@ public class TestContainerStateManager {
@Test
public void testGetMatchingContainer() throws IOException {
- ContainerInfo container1 = scm.getClientProtocolServer().
+ ContainerWithPipeline container1 = scm.getClientProtocolServer().
allocateContainer(xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), containerOwner);
- scmContainerMapping.updateContainerState(container1.getContainerID(),
- HddsProtos.LifeCycleEvent.CREATE);
- scmContainerMapping.updateContainerState(container1.getContainerID(),
- HddsProtos.LifeCycleEvent.CREATED);
+ xceiverClientManager.getFactor(), containerOwner);
+ scmContainerMapping
+ .updateContainerState(container1.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATE);
+ scmContainerMapping
+ .updateContainerState(container1.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATED);
- ContainerInfo container2 = scm.getClientProtocolServer().
+ ContainerWithPipeline container2 = scm.getClientProtocolServer().
allocateContainer(xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), containerOwner);
+ xceiverClientManager.getFactor(), containerOwner);
ContainerInfo info = containerStateManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.OPEN);
- Assert.assertEquals(container1.getContainerID(), info.getContainerID());
+ Assert.assertEquals(container1.getContainerInfo().getContainerID(),
+ info.getContainerID());
info = containerStateManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.ALLOCATED);
- Assert.assertEquals(container2.getContainerID(), info.getContainerID());
+ Assert.assertEquals(container2.getContainerInfo().getContainerID(),
+ info.getContainerID());
- scmContainerMapping.updateContainerState(container2.getContainerID(),
- HddsProtos.LifeCycleEvent.CREATE);
- scmContainerMapping.updateContainerState(container2.getContainerID(),
- HddsProtos.LifeCycleEvent.CREATED);
+ scmContainerMapping
+ .updateContainerState(container2.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATE);
+ scmContainerMapping
+ .updateContainerState(container2.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATED);
// space has already been allocated in container1, now container 2 should
// be chosen.
@@ -169,7 +180,8 @@ public class TestContainerStateManager {
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.OPEN);
- Assert.assertEquals(container2.getContainerID(), info.getContainerID());
+ Assert.assertEquals(container2.getContainerInfo().getContainerID(),
+ info.getContainerID());
}
@Test
@@ -183,30 +195,33 @@ public class TestContainerStateManager {
// Allocate container1 and update its state from ALLOCATED -> CREATING ->
// OPEN -> CLOSING -> CLOSED -> DELETING -> DELETED
- ContainerInfo container1 = scm.getClientProtocolServer().allocateContainer(
- xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), containerOwner);
+ ContainerWithPipeline container1 = scm.getClientProtocolServer()
+ .allocateContainer(
+ xceiverClientManager.getType(),
+ xceiverClientManager.getFactor(), containerOwner);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.ALLOCATED).size();
Assert.assertEquals(1, containers);
- scmContainerMapping.updateContainerState(container1.getContainerID(),
- HddsProtos.LifeCycleEvent.CREATE);
+ scmContainerMapping
+ .updateContainerState(container1.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATE);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.CREATING).size();
Assert.assertEquals(1, containers);
- scmContainerMapping.updateContainerState(container1.getContainerID(),
- HddsProtos.LifeCycleEvent.CREATED);
+ scmContainerMapping
+ .updateContainerState(container1.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATED);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.OPEN).size();
Assert.assertEquals(1, containers);
scmContainerMapping
- .updateContainerState(container1.getContainerID(),
+ .updateContainerState(container1.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.FINALIZE);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
@@ -214,7 +229,7 @@ public class TestContainerStateManager {
Assert.assertEquals(1, containers);
scmContainerMapping
- .updateContainerState(container1.getContainerID(),
+ .updateContainerState(container1.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CLOSE);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
@@ -222,7 +237,7 @@ public class TestContainerStateManager {
Assert.assertEquals(1, containers);
scmContainerMapping
- .updateContainerState(container1.getContainerID(),
+ .updateContainerState(container1.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.DELETE);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
@@ -230,7 +245,7 @@ public class TestContainerStateManager {
Assert.assertEquals(1, containers);
scmContainerMapping
- .updateContainerState(container1.getContainerID(),
+ .updateContainerState(container1.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CLEANUP);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
@@ -239,13 +254,15 @@ public class TestContainerStateManager {
// Allocate container1 and update its state from ALLOCATED -> CREATING ->
// DELETING
- ContainerInfo container2 = scm.getClientProtocolServer().allocateContainer(
- xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), containerOwner);
- scmContainerMapping.updateContainerState(container2.getContainerID(),
- HddsProtos.LifeCycleEvent.CREATE);
+ ContainerWithPipeline container2 = scm.getClientProtocolServer()
+ .allocateContainer(
+ xceiverClientManager.getType(),
+ xceiverClientManager.getFactor(), containerOwner);
+ scmContainerMapping
+ .updateContainerState(container2.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATE);
scmContainerMapping
- .updateContainerState(container2.getContainerID(),
+ .updateContainerState(container2.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.TIMEOUT);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
@@ -254,17 +271,21 @@ public class TestContainerStateManager {
// Allocate container1 and update its state from ALLOCATED -> CREATING ->
// OPEN -> CLOSING -> CLOSED
- ContainerInfo container3 = scm.getClientProtocolServer().allocateContainer(
- xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), containerOwner);
- scmContainerMapping.updateContainerState(container3.getContainerID(),
- HddsProtos.LifeCycleEvent.CREATE);
- scmContainerMapping.updateContainerState(container3.getContainerID(),
- HddsProtos.LifeCycleEvent.CREATED);
- scmContainerMapping.updateContainerState(container3.getContainerID(),
- HddsProtos.LifeCycleEvent.FINALIZE);
+ ContainerWithPipeline container3 = scm.getClientProtocolServer()
+ .allocateContainer(
+ xceiverClientManager.getType(),
+ xceiverClientManager.getFactor(), containerOwner);
+ scmContainerMapping
+ .updateContainerState(container3.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATE);
+ scmContainerMapping
+ .updateContainerState(container3.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATED);
+ scmContainerMapping
+ .updateContainerState(container3.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.FINALIZE);
scmContainerMapping
- .updateContainerState(container3.getContainerID(),
+ .updateContainerState(container3.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CLOSE);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
@@ -274,12 +295,14 @@ public class TestContainerStateManager {
@Test
public void testUpdatingAllocatedBytes() throws Exception {
- ContainerInfo container1 = scm.getClientProtocolServer().allocateContainer(
- xceiverClientManager.getType(),
+ ContainerWithPipeline container1 = scm.getClientProtocolServer()
+ .allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
- scmContainerMapping.updateContainerState(container1.getContainerID(),
+ scmContainerMapping.updateContainerState(container1
+ .getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
- scmContainerMapping.updateContainerState(container1.getContainerID(),
+ scmContainerMapping.updateContainerState(container1
+ .getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATED);
Random ran = new Random();
@@ -292,18 +315,18 @@ public class TestContainerStateManager {
.getMatchingContainer(size, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.OPEN);
- Assert.assertEquals(container1.getContainerID(), info.getContainerID());
+ Assert.assertEquals(container1.getContainerInfo().getContainerID(),
+ info.getContainerID());
ContainerMapping containerMapping =
- (ContainerMapping)scmContainerMapping;
+ (ContainerMapping) scmContainerMapping;
// manually trigger a flush, this will persist the allocated bytes value
// to disk
containerMapping.flushContainerInfo();
// the persisted value should always be equal to allocated size.
- byte[] containerBytes =
- containerMapping.getContainerStore().get(
- Longs.toByteArray(container1.getContainerID()));
+ byte[] containerBytes = containerMapping.getContainerStore().get(
+ Longs.toByteArray(container1.getContainerInfo().getContainerID()));
HddsProtos.SCMContainerInfo infoProto =
HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
ContainerInfo currentInfo = ContainerInfo.fromProtobuf(infoProto);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
index d4c9d4f..129cf04 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.ozone;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -28,7 +28,6 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.client.ContainerOperationClient;
import org.apache.hadoop.hdds.scm.client.ScmClient;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.junit.AfterClass;
@@ -78,12 +77,12 @@ public class TestContainerOperations {
*/
@Test
public void testCreate() throws Exception {
- ContainerInfo container = storageClient.createContainer(HddsProtos
+ ContainerWithPipeline container = storageClient.createContainer(HddsProtos
.ReplicationType.STAND_ALONE, HddsProtos.ReplicationFactor
.ONE, "OZONE");
- assertEquals(container.getContainerID(),
- storageClient.getContainer(container.getContainerID()).
- getContainerID());
+ assertEquals(container.getContainerInfo().getContainerID(), storageClient
+ .getContainer(container.getContainerInfo().getContainerID())
+ .getContainerID());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 0c1d8f2..d07097c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
import org.apache.hadoop.hdds.scm.server.SCMStorage;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -131,7 +131,7 @@ public class TestStorageContainerManager {
}
try {
- ContainerInfo container2 = mockClientServer
+ ContainerWithPipeline container2 = mockClientServer
.allocateContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, "OZONE");
if (expectPermissionDenied) {
@@ -144,7 +144,7 @@ public class TestStorageContainerManager {
}
try {
- ContainerInfo container3 = mockClientServer
+ ContainerWithPipeline container3 = mockClientServer
.allocateContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, "OZONE");
if (expectPermissionDenied) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
index c937980..4c2a904 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
@@ -23,7 +23,7 @@ import com.google.common.primitives.Longs;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -158,9 +158,11 @@ public class TestStorageContainerManagerHelper {
private MetadataStore getContainerMetadata(Long containerID)
throws IOException {
- ContainerInfo container = cluster.getStorageContainerManager()
- .getClientProtocolServer().getContainer(containerID);
- DatanodeDetails leadDN = container.getPipeline().getLeader();
+ ContainerWithPipeline containerWithPipeline = cluster
+ .getStorageContainerManager().getClientProtocolServer()
+ .getContainerWithPipeline(containerID);
+
+ DatanodeDetails leadDN = containerWithPipeline.getPipeline().getLeader();
OzoneContainer containerServer =
getContainerServerByDatanodeUuid(leadDN.getUuidString());
ContainerData containerData = containerServer.getContainerManager()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
index cafe5db..214382e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
@@ -390,8 +390,8 @@ public class TestOzoneRpcClient {
keyInfo.getLatestVersionLocations().getLocationList()) {
ContainerInfo container =
storageContainerLocationClient.getContainer(info.getContainerID());
- if ((container.getPipeline().getFactor() != replicationFactor) ||
- (container.getPipeline().getType() != replicationType)) {
+ if (!container.getReplicationFactor().equals(replicationFactor) || (
+ container.getReplicationType() != replicationType)) {
return false;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
index 265c82b..3e514e7 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
@@ -23,8 +23,6 @@ import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -35,7 +33,6 @@ import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
-import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
@@ -112,9 +109,9 @@ public class TestCloseContainerByPipeline {
.get(0).getBlocksLatestVersionOnly().get(0);
long containerID = ksmKeyLocationInfo.getContainerID();
- List<DatanodeDetails> datanodes =
- cluster.getStorageContainerManager().getContainerInfo(containerID)
- .getPipeline().getMachines();
+ List<DatanodeDetails> datanodes = cluster.getStorageContainerManager()
+ .getScmContainerManager().getContainerWithPipeline(containerID)
+ .getPipeline().getMachines();
Assert.assertTrue(datanodes.size() == 1);
DatanodeDetails datanodeDetails = datanodes.get(0);
@@ -167,9 +164,9 @@ public class TestCloseContainerByPipeline {
.get(0).getBlocksLatestVersionOnly().get(0);
long containerID = ksmKeyLocationInfo.getContainerID();
- List<DatanodeDetails> datanodes =
- cluster.getStorageContainerManager().getContainerInfo(containerID)
- .getPipeline().getMachines();
+ List<DatanodeDetails> datanodes = cluster.getStorageContainerManager()
+ .getScmContainerManager().getContainerWithPipeline(containerID)
+ .getPipeline().getMachines();
Assert.assertTrue(datanodes.size() == 1);
DatanodeDetails datanodeDetails = datanodes.get(0);
@@ -220,9 +217,9 @@ public class TestCloseContainerByPipeline {
.get(0).getBlocksLatestVersionOnly().get(0);
long containerID = ksmKeyLocationInfo.getContainerID();
- List<DatanodeDetails> datanodes =
- cluster.getStorageContainerManager().getContainerInfo(containerID)
- .getPipeline().getMachines();
+ List<DatanodeDetails> datanodes = cluster.getStorageContainerManager()
+ .getScmContainerManager().getContainerWithPipeline(containerID)
+ .getPipeline().getMachines();
Assert.assertTrue(datanodes.size() == 3);
GenericTestUtils.LogCapturer logCapturer =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java
index bafba32..1cc7ff8 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
@@ -32,7 +33,6 @@ import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestAllocateContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestAllocateContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestAllocateContainer.java
index b1e9d26..144c562 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestAllocateContainer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestAllocateContainer.java
@@ -17,14 +17,12 @@
*/
package org.apache.hadoop.ozone.scm;
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -68,7 +66,7 @@ public class TestAllocateContainer {
@Test
public void testAllocate() throws Exception {
- ContainerInfo container = storageContainerLocationClient.allocateContainer(
+ ContainerWithPipeline container = storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
containerOwner);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
index ce1fe46..42bb936 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.scm;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -81,17 +81,18 @@ public class TestContainerSmallFile {
@Test
public void testAllocateWrite() throws Exception {
String traceID = UUID.randomUUID().toString();
- ContainerInfo container =
+ ContainerWithPipeline container =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
- XceiverClientSpi client = xceiverClientManager.acquireClient(
- container.getPipeline(), container.getContainerID());
+ XceiverClientSpi client = xceiverClientManager
+ .acquireClient(container.getPipeline(),
+ container.getContainerInfo().getContainerID());
ContainerProtocolCalls.createContainer(client,
- container.getContainerID(), traceID);
+ container.getContainerInfo().getContainerID(), traceID);
BlockID blockID = ContainerTestHelper.getTestBlockID(
- container.getContainerID());
+ container.getContainerInfo().getContainerID());
ContainerProtocolCalls.writeSmallFile(client, blockID,
"data123".getBytes(), traceID);
ContainerProtos.GetSmallFileResponseProto response =
@@ -104,20 +105,21 @@ public class TestContainerSmallFile {
@Test
public void testInvalidKeyRead() throws Exception {
String traceID = UUID.randomUUID().toString();
- ContainerInfo container =
+ ContainerWithPipeline container =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
- XceiverClientSpi client = xceiverClientManager.acquireClient(
- container.getPipeline(), container.getContainerID());
+ XceiverClientSpi client = xceiverClientManager
+ .acquireClient(container.getPipeline(),
+ container.getContainerInfo().getContainerID());
ContainerProtocolCalls.createContainer(client,
- container.getContainerID(), traceID);
+ container.getContainerInfo().getContainerID(), traceID);
thrown.expect(StorageContainerException.class);
thrown.expectMessage("Unable to find the key");
BlockID blockID = ContainerTestHelper.getTestBlockID(
- container.getContainerID());
+ container.getContainerInfo().getContainerID());
// Try to read a Key Container Name
ContainerProtos.GetSmallFileResponseProto response =
ContainerProtocolCalls.readSmallFile(client, blockID, traceID);
@@ -128,20 +130,20 @@ public class TestContainerSmallFile {
public void testInvalidContainerRead() throws Exception {
String traceID = UUID.randomUUID().toString();
long nonExistContainerID = 8888L;
- ContainerInfo container =
+ ContainerWithPipeline container =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
- XceiverClientSpi client = xceiverClientManager.
- acquireClient(container.getPipeline(), container.getContainerID());
+ XceiverClientSpi client = xceiverClientManager
+ .acquireClient(container.getPipeline(),
+ container.getContainerInfo().getContainerID());
ContainerProtocolCalls.createContainer(client,
- container.getContainerID(), traceID);
+ container.getContainerInfo().getContainerID(), traceID);
BlockID blockID = ContainerTestHelper.getTestBlockID(
- container.getContainerID());
+ container.getContainerInfo().getContainerID());
ContainerProtocolCalls.writeSmallFile(client, blockID,
"data123".getBytes(), traceID);
-
thrown.expect(StorageContainerException.class);
thrown.expectMessage("Unable to find the container");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMCli.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMCli.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMCli.java
index 732221a..a6bb586 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMCli.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMCli.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.scm;
import com.google.common.primitives.Longs;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -136,7 +137,7 @@ public class TestSCMCli {
private boolean containerExist(long containerID) {
try {
ContainerInfo container = scm.getClientProtocolServer()
- .getContainer(containerID);
+ .getContainerWithPipeline(containerID).getContainerInfo();
return container != null
&& containerID == container.getContainerID();
} catch (IOException e) {
@@ -157,31 +158,34 @@ public class TestSCMCli {
// 1. Test to delete a non-empty container.
// ****************************************
// Create an non-empty container
- ContainerInfo container = containerOperationClient
+ ContainerWithPipeline container = containerOperationClient
.createContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
ContainerData cdata = ContainerData
.getFromProtBuf(containerOperationClient.readContainer(
- container.getContainerID(), container.getPipeline()), conf);
- KeyUtils.getDB(cdata, conf).put(Longs.toByteArray(container.getContainerID()),
- "someKey".getBytes());
- Assert.assertTrue(containerExist(container.getContainerID()));
+ container.getContainerInfo().getContainerID()), conf);
+ KeyUtils.getDB(cdata, conf)
+ .put(Longs.toByteArray(container.getContainerInfo().getContainerID()),
+ "someKey".getBytes());
+ Assert.assertTrue(
+ containerExist(container.getContainerInfo().getContainerID()));
// Gracefully delete a container should fail because it is open.
- delCmd = new String[] {"-container", "-delete", "-c",
- Long.toString(container.getContainerID())};
+ delCmd = new String[]{"-container", "-delete", "-c",
+ Long.toString(container.getContainerInfo().getContainerID())};
testErr = new ByteArrayOutputStream();
ByteArrayOutputStream out = new ByteArrayOutputStream();
exitCode = runCommandAndGetOutput(delCmd, out, testErr);
assertEquals(EXECUTION_ERROR, exitCode);
assertTrue(testErr.toString()
.contains("Deleting an open container is not allowed."));
- Assert.assertTrue(containerExist(container.getContainerID()));
+ Assert.assertTrue(
+ containerExist(container.getContainerInfo().getContainerID()));
// Close the container
containerOperationClient.closeContainer(
- container.getContainerID(), container.getPipeline());
+ container.getContainerInfo().getContainerID());
// Gracefully delete a container should fail because it is not empty.
testErr = new ByteArrayOutputStream();
@@ -189,45 +193,49 @@ public class TestSCMCli {
assertEquals(EXECUTION_ERROR, exitCode2);
assertTrue(testErr.toString()
.contains("Container cannot be deleted because it is not empty."));
- Assert.assertTrue(containerExist(container.getContainerID()));
+ Assert.assertTrue(
+ containerExist(container.getContainerInfo().getContainerID()));
// Try force delete again.
- delCmd = new String[] {"-container", "-delete", "-c",
- Long.toString(container.getContainerID()), "-f"};
+ delCmd = new String[]{"-container", "-delete", "-c",
+ Long.toString(container.getContainerInfo().getContainerID()), "-f"};
exitCode = runCommandAndGetOutput(delCmd, out, null);
assertEquals("Expected success, found:", ResultCode.SUCCESS, exitCode);
- assertFalse(containerExist(container.getContainerID()));
+ assertFalse(containerExist(container.getContainerInfo().getContainerID()));
// ****************************************
// 2. Test to delete an empty container.
// ****************************************
// Create an empty container
- ContainerInfo emptyContainer = containerOperationClient
+ ContainerWithPipeline emptyContainer = containerOperationClient
.createContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
- containerOperationClient.closeContainer(emptyContainer.getContainerID(),
- container.getPipeline());
- Assert.assertTrue(containerExist(emptyContainer.getContainerID()));
+ containerOperationClient
+ .closeContainer(emptyContainer.getContainerInfo().getContainerID());
+ Assert.assertTrue(
+ containerExist(emptyContainer.getContainerInfo().getContainerID()));
// Successfully delete an empty container.
- delCmd = new String[] {"-container", "-delete", "-c",
- Long.toString(emptyContainer.getContainerID())};
+ delCmd = new String[]{"-container", "-delete", "-c",
+ Long.toString(emptyContainer.getContainerInfo().getContainerID())};
exitCode = runCommandAndGetOutput(delCmd, out, null);
assertEquals(ResultCode.SUCCESS, exitCode);
- assertFalse(containerExist(emptyContainer.getContainerID()));
+ assertFalse(
+ containerExist(emptyContainer.getContainerInfo().getContainerID()));
// After the container is deleted,
// another container can now be recreated.
- ContainerInfo newContainer = containerOperationClient.
+ ContainerWithPipeline newContainer = containerOperationClient.
createContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
- Assert.assertTrue(containerExist(newContainer.getContainerID()));
+ Assert.assertTrue(
+ containerExist(newContainer.getContainerInfo().getContainerID()));
// ****************************************
// 3. Test to delete a non-exist container.
// ****************************************
- long nonExistContainerID = ContainerTestHelper.getTestContainerID();
- delCmd = new String[] {"-container", "-delete", "-c",
+ long nonExistContainerID = ContainerTestHelper.getTestContainerID();
+ delCmd = new String[]{"-container", "-delete", "-c",
Long.toString(nonExistContainerID)};
testErr = new ByteArrayOutputStream();
exitCode = runCommandAndGetOutput(delCmd, out, testErr);
@@ -250,45 +258,33 @@ public class TestSCMCli {
"LeaderID: %s\n" +
"Datanodes: [%s]\n";
- String formatStrWithHash =
- "Container id: %s\n" +
- "Container State: %s\n" +
- "Container Hash: %s\n" +
- "Container DB Path: %s\n" +
- "Container Path: %s\n" +
- "Container Metadata: {%s}\n" +
- "LeaderID: %s\n" +
- "Datanodes: [%s]\n";
-
// Test a non-exist container
String containerID =
Long.toString(ContainerTestHelper.getTestContainerID());
- String[] info = { "-container", "-info", containerID };
+ String[] info = {"-container", "-info", containerID};
int exitCode = runCommandAndGetOutput(info, null, null);
assertEquals("Expected Execution Error, Did not find that.",
EXECUTION_ERROR, exitCode);
// Create an empty container.
- ContainerInfo container = containerOperationClient
+ ContainerWithPipeline container = containerOperationClient
.createContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
- ContainerData data = ContainerData
- .getFromProtBuf(containerOperationClient.
- readContainer(container.getContainerID(),
- container.getPipeline()), conf);
+ ContainerData data = ContainerData.getFromProtBuf(containerOperationClient
+ .readContainer(container.getContainerInfo().getContainerID()), conf);
- info = new String[] { "-container", "-info", "-c",
- Long.toString(container.getContainerID()) };
+ info = new String[]{"-container", "-info", "-c",
+ Long.toString(container.getContainerInfo().getContainerID())};
ByteArrayOutputStream out = new ByteArrayOutputStream();
exitCode = runCommandAndGetOutput(info, out, null);
assertEquals("Expected Success, did not find it.", ResultCode.SUCCESS,
- exitCode);
+ exitCode);
String openStatus = data.isOpen() ? "OPEN" : "CLOSED";
- String expected =
- String.format(formatStr, container.getContainerID(), openStatus,
- data.getDBPath(), data.getContainerPath(), "",
- datanodeDetails.getHostName(), datanodeDetails.getHostName());
+ String expected = String.format(formatStr, container.getContainerInfo()
+ .getContainerID(), openStatus, data.getDBPath(),
+ data.getContainerPath(), "", datanodeDetails.getHostName(),
+ datanodeDetails.getHostName());
assertEquals(expected, out.toString());
out.reset();
@@ -299,40 +295,39 @@ public class TestSCMCli {
HddsProtos.ReplicationFactor.ONE, containerOwner);
data = ContainerData
.getFromProtBuf(containerOperationClient.readContainer(
- container.getContainerID(), container.getPipeline()), conf);
+ container.getContainerInfo().getContainerID()), conf);
KeyUtils.getDB(data, conf)
.put(containerID.getBytes(), "someKey".getBytes());
- info = new String[] { "-container", "-info", "-c",
- Long.toString(container.getContainerID()) };
+ info = new String[]{"-container", "-info", "-c",
+ Long.toString(container.getContainerInfo().getContainerID())};
exitCode = runCommandAndGetOutput(info, out, null);
assertEquals(ResultCode.SUCCESS, exitCode);
openStatus = data.isOpen() ? "OPEN" : "CLOSED";
- expected = String.format(formatStr, container.getContainerID(), openStatus,
- data.getDBPath(), data.getContainerPath(), "",
- datanodeDetails.getHostName(), datanodeDetails.getHostName());
+ expected = String.format(formatStr, container.getContainerInfo().
+ getContainerID(), openStatus, data.getDBPath(),
+ data.getContainerPath(), "", datanodeDetails.getHostName(),
+ datanodeDetails.getHostName());
assertEquals(expected, out.toString());
out.reset();
-
// Close last container and test info again.
- containerOperationClient.closeContainer(
- container.getContainerID(), container.getPipeline());
+ containerOperationClient
+ .closeContainer(container.getContainerInfo().getContainerID());
- info = new String[] { "-container", "-info", "-c",
- Long.toString(container.getContainerID()) };
+ info = new String[]{"-container", "-info", "-c",
+ Long.toString(container.getContainerInfo().getContainerID())};
exitCode = runCommandAndGetOutput(info, out, null);
assertEquals(ResultCode.SUCCESS, exitCode);
- data = ContainerData
- .getFromProtBuf(containerOperationClient.readContainer(
- container.getContainerID(), container.getPipeline()), conf);
+ data = ContainerData.getFromProtBuf(containerOperationClient
+ .readContainer(container.getContainerInfo().getContainerID()), conf);
openStatus = data.isOpen() ? "OPEN" : "CLOSED";
expected = String
- .format(formatStr, container.getContainerID(), openStatus,
- data.getDBPath(), data.getContainerPath(), "",
+ .format(formatStr, container.getContainerInfo().getContainerID(),
+ openStatus, data.getDBPath(), data.getContainerPath(), "",
datanodeDetails.getHostName(), datanodeDetails.getHostName());
assertEquals(expected, out.toString());
}
@@ -360,10 +355,10 @@ public class TestSCMCli {
// Create 20 containers for testing.
List<ContainerInfo> containers = new ArrayList<>();
for (int index = 0; index < 20; index++) {
- ContainerInfo container = containerOperationClient.createContainer(
+ ContainerWithPipeline container = containerOperationClient.createContainer(
xceiverClientManager.getType(), HddsProtos.ReplicationFactor.ONE,
containerOwner);
- containers.add(container);
+ containers.add(container.getContainerInfo());
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -417,11 +412,11 @@ public class TestSCMCli {
@Test
public void testCloseContainer() throws Exception {
- long containerID = containerOperationClient
- .createContainer(xceiverClientManager.getType(),
- HddsProtos.ReplicationFactor.ONE, containerOwner).getContainerID();
+ long containerID = containerOperationClient.createContainer(
+ xceiverClientManager.getType(), HddsProtos.ReplicationFactor.ONE,
+ containerOwner).getContainerInfo().getContainerID();
ContainerInfo container = scm.getClientProtocolServer()
- .getContainer(containerID);
+ .getContainerWithPipeline(containerID).getContainerInfo();
assertNotNull(container);
assertEquals(containerID, container.getContainerID());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
index 56f3c7a..a75264e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.ozone.scm;
import com.google.common.cache.Cache;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -98,22 +98,25 @@ public class TestXceiverClientManager {
shouldUseGrpc);
XceiverClientManager clientManager = new XceiverClientManager(conf);
- ContainerInfo container1 = storageContainerLocationClient
+ ContainerWithPipeline container1 = storageContainerLocationClient
.allocateContainer(clientManager.getType(), clientManager.getFactor(),
containerOwner);
- XceiverClientSpi client1 = clientManager.acquireClient(container1.getPipeline(),
- container1.getContainerID());
+ XceiverClientSpi client1 = clientManager
+ .acquireClient(container1.getPipeline(),
+ container1.getContainerInfo().getContainerID());
Assert.assertEquals(1, client1.getRefcount());
- ContainerInfo container2 = storageContainerLocationClient
+ ContainerWithPipeline container2 = storageContainerLocationClient
.allocateContainer(clientManager.getType(), clientManager.getFactor(),
containerOwner);
- XceiverClientSpi client2 = clientManager.acquireClient(container2.getPipeline(),
- container2.getContainerID());
+ XceiverClientSpi client2 = clientManager
+ .acquireClient(container2.getPipeline(),
+ container2.getContainerInfo().getContainerID());
Assert.assertEquals(1, client2.getRefcount());
- XceiverClientSpi client3 = clientManager.acquireClient(container1.getPipeline(),
- container1.getContainerID());
+ XceiverClientSpi client3 = clientManager
+ .acquireClient(container1.getPipeline(),
+ container1.getContainerInfo().getContainerID());
Assert.assertEquals(2, client3.getRefcount());
Assert.assertEquals(2, client1.getRefcount());
Assert.assertEquals(client1, client3);
@@ -132,32 +135,35 @@ public class TestXceiverClientManager {
Cache<Long, XceiverClientSpi> cache =
clientManager.getClientCache();
- ContainerInfo container1 =
+ ContainerWithPipeline container1 =
storageContainerLocationClient.allocateContainer(
clientManager.getType(), HddsProtos.ReplicationFactor.ONE,
containerOwner);
- XceiverClientSpi client1 = clientManager.acquireClient(container1.getPipeline(),
- container1.getContainerID());
+ XceiverClientSpi client1 = clientManager
+ .acquireClient(container1.getPipeline(),
+ container1.getContainerInfo().getContainerID());
Assert.assertEquals(1, client1.getRefcount());
Assert.assertEquals(container1.getPipeline(),
client1.getPipeline());
- ContainerInfo container2 =
+ ContainerWithPipeline container2 =
storageContainerLocationClient.allocateContainer(
clientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
- XceiverClientSpi client2 = clientManager.acquireClient(container2.getPipeline(),
- container2.getContainerID());
+ XceiverClientSpi client2 = clientManager
+ .acquireClient(container2.getPipeline(),
+ container2.getContainerInfo().getContainerID());
Assert.assertEquals(1, client2.getRefcount());
Assert.assertNotEquals(client1, client2);
// least recent container (i.e containerName1) is evicted
- XceiverClientSpi nonExistent1 = cache.getIfPresent(container1.getContainerID());
+ XceiverClientSpi nonExistent1 = cache
+ .getIfPresent(container1.getContainerInfo().getContainerID());
Assert.assertEquals(null, nonExistent1);
// However container call should succeed because of refcount on the client.
String traceID1 = "trace" + RandomStringUtils.randomNumeric(4);
ContainerProtocolCalls.createContainer(client1,
- container1.getContainerID(), traceID1);
+ container1.getContainerInfo().getContainerID(), traceID1);
// After releasing the client, this connection should be closed
// and any container operations should fail
@@ -166,7 +172,7 @@ public class TestXceiverClientManager {
String expectedMessage = "This channel is not connected.";
try {
ContainerProtocolCalls.createContainer(client1,
- container1.getContainerID(), traceID1);
+ container1.getContainerInfo().getContainerID(), traceID1);
Assert.fail("Create container should throw exception on closed"
+ "client");
} catch (Exception e) {
@@ -186,28 +192,30 @@ public class TestXceiverClientManager {
Cache<Long, XceiverClientSpi> cache =
clientManager.getClientCache();
- ContainerInfo container1 =
+ ContainerWithPipeline container1 =
storageContainerLocationClient.allocateContainer(
clientManager.getType(),
clientManager.getFactor(), containerOwner);
- XceiverClientSpi client1 = clientManager.acquireClient(container1.getPipeline(),
- container1.getContainerID());
+ XceiverClientSpi client1 = clientManager
+ .acquireClient(container1.getPipeline(),
+ container1.getContainerInfo().getContainerID());
Assert.assertEquals(1, client1.getRefcount());
clientManager.releaseClient(client1);
Assert.assertEquals(0, client1.getRefcount());
- ContainerInfo container2 = storageContainerLocationClient
+ ContainerWithPipeline container2 = storageContainerLocationClient
.allocateContainer(clientManager.getType(), clientManager.getFactor(),
containerOwner);
- XceiverClientSpi client2 = clientManager.acquireClient(container2.getPipeline(),
- container2.getContainerID());
+ XceiverClientSpi client2 = clientManager
+ .acquireClient(container2.getPipeline(),
+ container2.getContainerInfo().getContainerID());
Assert.assertEquals(1, client2.getRefcount());
Assert.assertNotEquals(client1, client2);
-
// now client 1 should be evicted
- XceiverClientSpi nonExistent = cache.getIfPresent(container1.getContainerID());
+ XceiverClientSpi nonExistent = cache
+ .getIfPresent(container1.getContainerInfo().getContainerID());
Assert.assertEquals(null, nonExistent);
// Any container operation should now fail
@@ -215,7 +223,7 @@ public class TestXceiverClientManager {
String expectedMessage = "This channel is not connected.";
try {
ContainerProtocolCalls.createContainer(client1,
- container1.getContainerID(), traceID2);
+ container1.getContainerInfo().getContainerID(), traceID2);
Assert.fail("Create container should throw exception on closed"
+ "client");
} catch (Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[3/3] hadoop git commit: HDDS-175. Refactor ContainerInfo to remove
Pipeline object from it. Contributed by Ajay Kumar.
Posted by ae...@apache.org.
HDDS-175. Refactor ContainerInfo to remove Pipeline object from it.
Contributed by Ajay Kumar.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7ca4f0ce
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7ca4f0ce
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7ca4f0ce
Branch: refs/heads/trunk
Commit: 7ca4f0cefa220c752920822c8d16469ab3b09b37
Parents: 93ac01c
Author: Anu Engineer <ae...@apache.org>
Authored: Tue Jul 3 13:30:19 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Tue Jul 3 14:11:52 2018 -0700
----------------------------------------------------------------------
.../scm/client/ContainerOperationClient.java | 109 +++++++++---
.../hadoop/hdds/scm/client/ScmClient.java | 38 ++++-
.../container/common/helpers/ContainerInfo.java | 167 +++++++++++++------
.../common/helpers/ContainerWithPipeline.java | 131 +++++++++++++++
.../StorageContainerLocationProtocol.java | 13 +-
...rLocationProtocolClientSideTranslatorPB.java | 26 ++-
...rLocationProtocolServerSideTranslatorPB.java | 25 ++-
.../StorageContainerLocationProtocol.proto | 15 +-
hadoop-hdds/common/src/main/proto/hdds.proto | 9 +-
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 80 +++++----
.../block/DatanodeDeletedBlockTransactions.java | 11 +-
.../container/CloseContainerEventHandler.java | 26 ++-
.../hdds/scm/container/ContainerMapping.java | 128 +++++++++++---
.../scm/container/ContainerStateManager.java | 30 +++-
.../hadoop/hdds/scm/container/Mapping.java | 26 ++-
.../scm/container/closer/ContainerCloser.java | 15 +-
.../scm/container/states/ContainerStateMap.java | 7 +-
.../hdds/scm/pipelines/PipelineManager.java | 27 ++-
.../hdds/scm/pipelines/PipelineSelector.java | 16 ++
.../scm/pipelines/ratis/RatisManagerImpl.java | 1 +
.../standalone/StandaloneManagerImpl.java | 1 +
.../scm/server/SCMClientProtocolServer.java | 14 +-
.../hdds/scm/block/TestDeletedBlockLog.java | 15 +-
.../TestCloseContainerEventHandler.java | 31 ++--
.../scm/container/TestContainerMapping.java | 27 +--
.../container/closer/TestContainerCloser.java | 18 +-
.../hdds/scm/node/TestContainerPlacement.java | 6 +-
.../cli/container/CloseContainerHandler.java | 10 +-
.../cli/container/DeleteContainerHandler.java | 9 +-
.../scm/cli/container/InfoContainerHandler.java | 11 +-
.../ozone/client/io/ChunkGroupInputStream.java | 15 +-
.../ozone/client/io/ChunkGroupOutputStream.java | 9 +-
.../hadoop/ozone/protocolPB/OzonePBHelper.java | 30 ++++
.../container/TestContainerStateManager.java | 161 ++++++++++--------
.../hadoop/ozone/TestContainerOperations.java | 11 +-
.../ozone/TestStorageContainerManager.java | 6 +-
.../TestStorageContainerManagerHelper.java | 10 +-
.../ozone/client/rpc/TestOzoneRpcClient.java | 4 +-
.../TestCloseContainerByPipeline.java | 21 +--
.../ozone/ksm/TestContainerReportWithKeys.java | 2 +-
.../hadoop/ozone/scm/TestAllocateContainer.java | 6 +-
.../ozone/scm/TestContainerSmallFile.java | 36 ++--
.../org/apache/hadoop/ozone/scm/TestSCMCli.java | 135 ++++++++-------
.../ozone/scm/TestXceiverClientManager.java | 62 ++++---
.../ozone/scm/TestXceiverClientMetrics.java | 14 +-
.../genesis/BenchMarkContainerStateMap.java | 16 +-
.../org/apache/hadoop/ozone/scm/cli/SQLCLI.java | 63 +++----
47 files changed, 1139 insertions(+), 504 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
index 07f6cec..b04f8c4 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.client;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.protocolPB
@@ -87,16 +88,17 @@ public class ContainerOperationClient implements ScmClient {
* @inheritDoc
*/
@Override
- public ContainerInfo createContainer(String owner)
+ public ContainerWithPipeline createContainer(String owner)
throws IOException {
XceiverClientSpi client = null;
try {
- ContainerInfo container =
+ ContainerWithPipeline containerWithPipeline =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), owner);
- Pipeline pipeline = container.getPipeline();
- client = xceiverClientManager.acquireClient(pipeline, container.getContainerID());
+ Pipeline pipeline = containerWithPipeline.getPipeline();
+ client = xceiverClientManager.acquireClient(pipeline,
+ containerWithPipeline.getContainerInfo().getContainerID());
// Allocated State means that SCM has allocated this pipeline in its
// namespace. The client needs to create the pipeline on the machines
@@ -106,8 +108,9 @@ public class ContainerOperationClient implements ScmClient {
if (pipeline.getLifeCycleState() == ALLOCATED) {
createPipeline(client, pipeline);
}
- createContainer(client, container.getContainerID());
- return container;
+ createContainer(client,
+ containerWithPipeline.getContainerInfo().getContainerID());
+ return containerWithPipeline;
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
@@ -197,17 +200,17 @@ public class ContainerOperationClient implements ScmClient {
* @inheritDoc
*/
@Override
- public ContainerInfo createContainer(HddsProtos.ReplicationType type,
+ public ContainerWithPipeline createContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor factor, String owner) throws IOException {
XceiverClientSpi client = null;
try {
// allocate container on SCM.
- ContainerInfo container =
+ ContainerWithPipeline containerWithPipeline =
storageContainerLocationClient.allocateContainer(type, factor,
owner);
- Pipeline pipeline = container.getPipeline();
+ Pipeline pipeline = containerWithPipeline.getPipeline();
client = xceiverClientManager.acquireClient(pipeline,
- container.getContainerID());
+ containerWithPipeline.getContainerInfo().getContainerID());
// Allocated State means that SCM has allocated this pipeline in its
// namespace. The client needs to create the pipeline on the machines
@@ -217,9 +220,10 @@ public class ContainerOperationClient implements ScmClient {
}
// connect to pipeline leader and allocate container on leader datanode.
client = xceiverClientManager.acquireClient(pipeline,
- container.getContainerID());
- createContainer(client, container.getContainerID());
- return container;
+ containerWithPipeline.getContainerInfo().getContainerID());
+ createContainer(client,
+ containerWithPipeline.getContainerInfo().getContainerID());
+ return containerWithPipeline;
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
@@ -256,24 +260,27 @@ public class ContainerOperationClient implements ScmClient {
}
/**
- * Delete the container, this will release any resource it uses.
- * @param pipeline - Pipeline that represents the container.
- * @param force - True to forcibly delete the container.
+ * Deletes an existing container.
+ *
+ * @param containerId - ID of the container.
+ * @param pipeline - Pipeline that represents the container.
+ * @param force - true to forcibly delete the container.
* @throws IOException
*/
@Override
- public void deleteContainer(long containerID, Pipeline pipeline, boolean force)
- throws IOException {
+ public void deleteContainer(long containerId, Pipeline pipeline,
+ boolean force) throws IOException {
XceiverClientSpi client = null;
try {
- client = xceiverClientManager.acquireClient(pipeline, containerID);
+ client = xceiverClientManager.acquireClient(pipeline, containerId);
String traceID = UUID.randomUUID().toString();
- ContainerProtocolCalls.deleteContainer(client, containerID, force, traceID);
+ ContainerProtocolCalls
+ .deleteContainer(client, containerId, force, traceID);
storageContainerLocationClient
- .deleteContainer(containerID);
+ .deleteContainer(containerId);
if (LOG.isDebugEnabled()) {
LOG.debug("Deleted container {}, leader: {}, machines: {} ",
- containerID,
+ containerId,
pipeline.getLeader(),
pipeline.getMachines());
}
@@ -285,6 +292,19 @@ public class ContainerOperationClient implements ScmClient {
}
/**
+ * Delete the container, this will release any resource it uses.
+ * @param containerID - containerID.
+ * @param force - True to forcibly delete the container.
+ * @throws IOException
+ */
+ @Override
+ public void deleteContainer(long containerID, boolean force)
+ throws IOException {
+ ContainerWithPipeline info = getContainerWithPipeline(containerID);
+ deleteContainer(containerID, info.getPipeline(), force);
+ }
+
+ /**
* {@inheritDoc}
*/
@Override
@@ -297,9 +317,9 @@ public class ContainerOperationClient implements ScmClient {
/**
* Get meta data from an existing container.
*
- * @param pipeline - pipeline that represents the container.
- * @return ContainerInfo - a message of protobuf which has basic info
- * of a container.
+ * @param containerID - ID of the container.
+ * @param pipeline - Pipeline where the container is located.
+ * @return ContainerInfo
* @throws IOException
*/
@Override
@@ -326,6 +346,19 @@ public class ContainerOperationClient implements ScmClient {
}
/**
+ * Get meta data from an existing container.
+ * @param containerID - ID of the container.
+ * @return ContainerInfo - a message of protobuf which has basic info
+ * of a container.
+ * @throws IOException
+ */
+ @Override
+ public ContainerData readContainer(long containerID) throws IOException {
+ ContainerWithPipeline info = getContainerWithPipeline(containerID);
+ return readContainer(containerID, info.getPipeline());
+ }
+
+ /**
* Given an id, return the pipeline associated with the container.
* @param containerId - String Container ID
* @return Pipeline of the existing container, corresponding to the given id.
@@ -338,6 +371,19 @@ public class ContainerOperationClient implements ScmClient {
}
/**
+ * Gets a container by Name -- Throws if the container does not exist.
+ *
+ * @param containerId - Container ID
+ * @return ContainerWithPipeline
+ * @throws IOException
+ */
+ @Override
+ public ContainerWithPipeline getContainerWithPipeline(long containerId)
+ throws IOException {
+ return storageContainerLocationClient.getContainerWithPipeline(containerId);
+ }
+
+ /**
* Close a container.
*
* @param pipeline the container to be closed.
@@ -392,6 +438,19 @@ public class ContainerOperationClient implements ScmClient {
}
/**
+ * Close a container.
+ *
+ * @throws IOException
+ */
+ @Override
+ public void closeContainer(long containerId)
+ throws IOException {
+ ContainerWithPipeline info = getContainerWithPipeline(containerId);
+ Pipeline pipeline = info.getPipeline();
+ closeContainer(containerId, pipeline);
+ }
+
+ /**
* Get the the current usage information.
* @param containerID - ID of the container.
* @return the size of the given container.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index b52819a..ecb2173 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.client;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -45,7 +46,7 @@ public interface ScmClient {
* @return ContainerInfo
* @throws IOException
*/
- ContainerInfo createContainer(String owner) throws IOException;
+ ContainerWithPipeline createContainer(String owner) throws IOException;
/**
* Gets a container by Name -- Throws if the container does not exist.
@@ -56,6 +57,14 @@ public interface ScmClient {
ContainerInfo getContainer(long containerId) throws IOException;
/**
+ * Gets a container by Name -- Throws if the container does not exist.
+ * @param containerId - Container ID
+ * @return ContainerWithPipeline
+ * @throws IOException
+ */
+ ContainerWithPipeline getContainerWithPipeline(long containerId) throws IOException;
+
+ /**
* Close a container.
*
* @param containerId - ID of the container.
@@ -65,6 +74,14 @@ public interface ScmClient {
void closeContainer(long containerId, Pipeline pipeline) throws IOException;
/**
+ * Close a container.
+ *
+ * @param containerId - ID of the container.
+ * @throws IOException
+ */
+ void closeContainer(long containerId) throws IOException;
+
+ /**
* Deletes an existing container.
* @param containerId - ID of the container.
* @param pipeline - Pipeline that represents the container.
@@ -74,6 +91,14 @@ public interface ScmClient {
void deleteContainer(long containerId, Pipeline pipeline, boolean force) throws IOException;
/**
+ * Deletes an existing container.
+ * @param containerId - ID of the container.
+ * @param force - true to forcibly delete the container.
+ * @throws IOException
+ */
+ void deleteContainer(long containerId, boolean force) throws IOException;
+
+ /**
* Lists a range of containers and get their info.
*
* @param startContainerID start containerID.
@@ -96,6 +121,15 @@ public interface ScmClient {
throws IOException;
/**
+ * Read meta data from an existing container.
+ * @param containerID - ID of the container.
+ * @return ContainerInfo
+ * @throws IOException
+ */
+ ContainerData readContainer(long containerID)
+ throws IOException;
+
+ /**
* Gets the container size -- Computed by SCM from Container Reports.
* @param containerID - ID of the container.
* @return number of bytes used by this container.
@@ -110,7 +144,7 @@ public interface ScmClient {
* @return ContainerInfo
* @throws IOException - in case of error.
*/
- ContainerInfo createContainer(HddsProtos.ReplicationType type,
+ ContainerWithPipeline createContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor replicationFactor,
String owner) throws IOException;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
index ee05c87..9593717 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
@@ -15,34 +15,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hdds.scm.container.common.helpers;
+import static java.lang.Math.max;
+
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+import java.util.Comparator;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.util.Time;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Comparator;
-
-import static java.lang.Math.max;
-
/**
* Class wraps ozone container info.
*/
-public class ContainerInfo
- implements Comparator<ContainerInfo>, Comparable<ContainerInfo> {
+public class ContainerInfo implements Comparator<ContainerInfo>,
+ Comparable<ContainerInfo>, Externalizable {
private static final ObjectWriter WRITER;
+ private static final String SERIALIZATION_ERROR_MSG = "Java serialization not"
+ + " supported. Use protobuf instead.";
static {
ObjectMapper mapper = new ObjectMapper();
@@ -53,7 +58,9 @@ public class ContainerInfo
}
private HddsProtos.LifeCycleState state;
- private Pipeline pipeline;
+ private String pipelineName;
+ private ReplicationFactor replicationFactor;
+ private ReplicationType replicationType;
// Bytes allocated by SCM for clients.
private long allocatedBytes;
// Actual container usage, updated through heartbeat.
@@ -75,15 +82,17 @@ public class ContainerInfo
ContainerInfo(
long containerID,
HddsProtos.LifeCycleState state,
- Pipeline pipeline,
+ String pipelineName,
long allocatedBytes,
long usedBytes,
long numberOfKeys,
long stateEnterTime,
String owner,
- long deleteTransactionId) {
+ long deleteTransactionId,
+ ReplicationFactor replicationFactor,
+ ReplicationType repType) {
this.containerID = containerID;
- this.pipeline = pipeline;
+ this.pipelineName = pipelineName;
this.allocatedBytes = allocatedBytes;
this.usedBytes = usedBytes;
this.numberOfKeys = numberOfKeys;
@@ -92,6 +101,8 @@ public class ContainerInfo
this.stateEnterTime = stateEnterTime;
this.owner = owner;
this.deleteTransactionId = deleteTransactionId;
+ this.replicationFactor = replicationFactor;
+ this.replicationType = repType;
}
/**
@@ -102,16 +113,18 @@ public class ContainerInfo
public static ContainerInfo fromProtobuf(HddsProtos.SCMContainerInfo info) {
ContainerInfo.Builder builder = new ContainerInfo.Builder();
- builder.setPipeline(Pipeline.getFromProtoBuf(info.getPipeline()));
- builder.setAllocatedBytes(info.getAllocatedBytes());
- builder.setUsedBytes(info.getUsedBytes());
- builder.setNumberOfKeys(info.getNumberOfKeys());
- builder.setState(info.getState());
- builder.setStateEnterTime(info.getStateEnterTime());
- builder.setOwner(info.getOwner());
- builder.setContainerID(info.getContainerID());
- builder.setDeleteTransactionId(info.getDeleteTransactionId());
- return builder.build();
+ return builder.setPipelineName(info.getPipelineName())
+ .setAllocatedBytes(info.getAllocatedBytes())
+ .setUsedBytes(info.getUsedBytes())
+ .setNumberOfKeys(info.getNumberOfKeys())
+ .setState(info.getState())
+ .setStateEnterTime(info.getStateEnterTime())
+ .setOwner(info.getOwner())
+ .setContainerID(info.getContainerID())
+ .setDeleteTransactionId(info.getDeleteTransactionId())
+ .setReplicationFactor(info.getReplicationFactor())
+ .setReplicationType(info.getReplicationType())
+ .build();
}
public long getContainerID() {
@@ -130,8 +143,12 @@ public class ContainerInfo
return stateEnterTime;
}
- public Pipeline getPipeline() {
- return pipeline;
+ public ReplicationFactor getReplicationFactor() {
+ return replicationFactor;
+ }
+
+ public String getPipelineName() {
+ return pipelineName;
}
public long getAllocatedBytes() {
@@ -177,6 +194,10 @@ public class ContainerInfo
return lastUsed;
}
+ public ReplicationType getReplicationType() {
+ return replicationType;
+ }
+
public void updateLastUsedTime() {
lastUsed = Time.monotonicNow();
}
@@ -190,19 +211,17 @@ public class ContainerInfo
public HddsProtos.SCMContainerInfo getProtobuf() {
HddsProtos.SCMContainerInfo.Builder builder =
HddsProtos.SCMContainerInfo.newBuilder();
- builder.setPipeline(getPipeline().getProtobufMessage());
- builder.setAllocatedBytes(getAllocatedBytes());
- builder.setUsedBytes(getUsedBytes());
- builder.setNumberOfKeys(getNumberOfKeys());
- builder.setState(state);
- builder.setStateEnterTime(stateEnterTime);
- builder.setContainerID(getContainerID());
- builder.setDeleteTransactionId(deleteTransactionId);
-
- if (getOwner() != null) {
- builder.setOwner(getOwner());
- }
- return builder.build();
+ return builder.setAllocatedBytes(getAllocatedBytes())
+ .setContainerID(getContainerID())
+ .setUsedBytes(getUsedBytes())
+ .setNumberOfKeys(getNumberOfKeys()).setState(getState())
+ .setStateEnterTime(getStateEnterTime()).setContainerID(getContainerID())
+ .setDeleteTransactionId(getDeleteTransactionId())
+ .setPipelineName(getPipelineName())
+ .setReplicationFactor(getReplicationFactor())
+ .setReplicationType(getReplicationType())
+ .setOwner(getOwner())
+ .build();
}
public String getOwner() {
@@ -217,7 +236,7 @@ public class ContainerInfo
public String toString() {
return "ContainerInfo{"
+ "state=" + state
- + ", pipeline=" + pipeline
+ + ", pipelineName=" + pipelineName
+ ", stateEnterTime=" + stateEnterTime
+ ", owner=" + owner
+ '}';
@@ -252,9 +271,7 @@ public class ContainerInfo
public int hashCode() {
return new HashCodeBuilder(11, 811)
.append(getContainerID())
- .append(pipeline.getFactor())
- .append(pipeline.getType())
- .append(owner)
+ .append(getOwner())
.toHashCode();
}
@@ -327,12 +344,44 @@ public class ContainerInfo
this.data = Arrays.copyOf(data, data.length);
}
}
+
+ /**
+ * Throws IOException as default java serialization is not supported. Use
+ * serialization via protobuf instead.
+ *
+ * @param out the stream to write the object to
+ * @throws IOException Includes any I/O exceptions that may occur
+ * @serialData Overriding methods should use this tag to describe
+ * the data layout of this Externalizable object.
+ * List the sequence of element types and, if possible,
+ * relate the element to a public/protected field and/or
+ * method of this Externalizable class.
+ */
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ throw new IOException(SERIALIZATION_ERROR_MSG);
+ }
+
+ /**
+ * Throws IOException as default java serialization is not supported. Use
+ * serialization via protobuf instead.
+ *
+ * @param in the stream to read data from in order to restore the object
+ * @throws IOException if I/O errors occur
+ * @throws ClassNotFoundException If the class for an object being
+ * restored cannot be found.
+ */
+ @Override
+ public void readExternal(ObjectInput in)
+ throws IOException, ClassNotFoundException {
+ throw new IOException(SERIALIZATION_ERROR_MSG);
+ }
+
/**
* Builder class for ContainerInfo.
*/
public static class Builder {
private HddsProtos.LifeCycleState state;
- private Pipeline pipeline;
private long allocated;
private long used;
private long keys;
@@ -340,6 +389,25 @@ public class ContainerInfo
private String owner;
private long containerID;
private long deleteTransactionId;
+ private String pipelineName;
+ private ReplicationFactor replicationFactor;
+ private ReplicationType replicationType;
+
+ public Builder setReplicationType(
+ ReplicationType replicationType) {
+ this.replicationType = replicationType;
+ return this;
+ }
+
+ public Builder setPipelineName(String pipelineName) {
+ this.pipelineName = pipelineName;
+ return this;
+ }
+
+ public Builder setReplicationFactor(ReplicationFactor repFactor) {
+ this.replicationFactor = repFactor;
+ return this;
+ }
public Builder setContainerID(long id) {
Preconditions.checkState(id >= 0);
@@ -352,11 +420,6 @@ public class ContainerInfo
return this;
}
- public Builder setPipeline(Pipeline containerPipeline) {
- this.pipeline = containerPipeline;
- return this;
- }
-
public Builder setAllocatedBytes(long bytesAllocated) {
this.allocated = bytesAllocated;
return this;
@@ -388,9 +451,9 @@ public class ContainerInfo
}
public ContainerInfo build() {
- return new
- ContainerInfo(containerID, state, pipeline, allocated,
- used, keys, stateEnterTime, owner, deleteTransactionId);
+ return new ContainerInfo(containerID, state, pipelineName, allocated,
+ used, keys, stateEnterTime, owner, deleteTransactionId,
+ replicationFactor, replicationType);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
new file mode 100644
index 0000000..e71d429
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.common.helpers;
+
+import java.util.Comparator;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
+/**
+ * Class wraps ozone container info.
+ */
+public class ContainerWithPipeline
+ implements Comparator<ContainerWithPipeline>, Comparable<ContainerWithPipeline> {
+
+ private final ContainerInfo containerInfo;
+ private final Pipeline pipeline;
+
+ public ContainerWithPipeline(ContainerInfo containerInfo, Pipeline pipeline) {
+ this.containerInfo = containerInfo;
+ this.pipeline = pipeline;
+ }
+
+ public ContainerInfo getContainerInfo() {
+ return containerInfo;
+ }
+
+ public Pipeline getPipeline() {
+ return pipeline;
+ }
+
+ public static ContainerWithPipeline fromProtobuf(HddsProtos.ContainerWithPipeline allocatedContainer) {
+ return new ContainerWithPipeline(
+ ContainerInfo.fromProtobuf(allocatedContainer.getContainerInfo()),
+ Pipeline.getFromProtoBuf(allocatedContainer.getPipeline()));
+ }
+
+ public HddsProtos.ContainerWithPipeline getProtobuf() {
+ HddsProtos.ContainerWithPipeline.Builder builder =
+ HddsProtos.ContainerWithPipeline.newBuilder();
+ builder.setContainerInfo(getContainerInfo().getProtobuf())
+ .setPipeline(getPipeline().getProtobufMessage());
+
+ return builder.build();
+ }
+
+
+ @Override
+ public String toString() {
+ return containerInfo.toString() + " | " + pipeline.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ContainerWithPipeline that = (ContainerWithPipeline) o;
+
+ return new EqualsBuilder()
+ .append(getContainerInfo(), that.getContainerInfo())
+ .append(getPipeline(), that.getPipeline())
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(11, 811)
+ .append(getContainerInfo())
+ .append(getPipeline())
+ .toHashCode();
+ }
+
+ /**
+ * Compares its two arguments for order. Returns a negative integer, zero, or
+ * a positive integer as the first argument is less than, equal to, or greater
+ * than the second.<p>
+ *
+ * @param o1 the first object to be compared.
+ * @param o2 the second object to be compared.
+ * @return a negative integer, zero, or a positive integer as the first
+ * argument is less than, equal to, or greater than the second.
+ * @throws NullPointerException if an argument is null and this comparator
+ * does not permit null arguments
+ * @throws ClassCastException if the arguments' types prevent them from
+ * being compared by this comparator.
+ */
+ @Override
+ public int compare(ContainerWithPipeline o1, ContainerWithPipeline o2) {
+ return o1.getContainerInfo().compareTo(o2.getContainerInfo());
+ }
+
+ /**
+ * Compares this object with the specified object for order. Returns a
+ * negative integer, zero, or a positive integer as this object is less than,
+ * equal to, or greater than the specified object.
+ *
+ * @param o the object to be compared.
+ * @return a negative integer, zero, or a positive integer as this object is
+ * less than, equal to, or greater than the specified object.
+ * @throws NullPointerException if the specified object is null
+ * @throws ClassCastException if the specified object's type prevents it
+ * from being compared to this object.
+ */
+ @Override
+ public int compareTo(ContainerWithPipeline o) {
+ return this.compare(this, o);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index e8d85e0..b787409 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.protocol;
import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -38,7 +39,7 @@ public interface StorageContainerLocationProtocol {
* set of datanodes that should be used creating this container.
*
*/
- ContainerInfo allocateContainer(HddsProtos.ReplicationType replicationType,
+ ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType replicationType,
HddsProtos.ReplicationFactor factor, String owner)
throws IOException;
@@ -54,6 +55,16 @@ public interface StorageContainerLocationProtocol {
ContainerInfo getContainer(long containerID) throws IOException;
/**
+ * Ask SCM the location of the container. SCM responds with a group of
+ * nodes where this container and its replicas are located.
+ *
+ * @param containerID - ID of the container.
+ * @return ContainerWithPipeline - the container info with the pipeline.
+ * @throws IOException
+ */
+ ContainerWithPipeline getContainerWithPipeline(long containerID) throws IOException;
+
+ /**
* Ask SCM a list of containers with a range of container names
* and the limit of count.
* Search container names between start name(exclusive), and
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index bba4e17..4b03d12 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -20,7 +20,10 @@ import com.google.common.base.Preconditions;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
@@ -95,7 +98,7 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
* @throws IOException
*/
@Override
- public ContainerInfo allocateContainer(HddsProtos.ReplicationType type,
+ public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor factor, String owner) throws IOException {
ContainerRequestProto request = ContainerRequestProto.newBuilder()
@@ -114,7 +117,7 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
throw new IOException(response.hasErrorMessage() ?
response.getErrorMessage() : "Allocate container failed.");
}
- return ContainerInfo.fromProtobuf(response.getContainerInfo());
+ return ContainerWithPipeline.fromProtobuf(response.getContainerWithPipeline());
}
public ContainerInfo getContainer(long containerID) throws IOException {
@@ -136,6 +139,25 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
/**
* {@inheritDoc}
*/
+ public ContainerWithPipeline getContainerWithPipeline(long containerID) throws IOException {
+ Preconditions.checkState(containerID >= 0,
+ "Container ID cannot be negative");
+ GetContainerWithPipelineRequestProto request = GetContainerWithPipelineRequestProto
+ .newBuilder()
+ .setContainerID(containerID)
+ .build();
+ try {
+ GetContainerWithPipelineResponseProto response =
+ rpcProxy.getContainerWithPipeline(NULL_RPC_CONTROLLER, request);
+ return ContainerWithPipeline.fromProtobuf(response.getContainerWithPipeline());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
@Override
public List<ContainerInfo> listContainer(long startContainerID, int count)
throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 70a0e8a..d66919f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -21,7 +21,10 @@ package org.apache.hadoop.ozone.protocolPB;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
@@ -82,10 +85,11 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
public ContainerResponseProto allocateContainer(RpcController unused,
ContainerRequestProto request) throws ServiceException {
try {
- ContainerInfo container = impl.allocateContainer(request.getReplicationType(),
- request.getReplicationFactor(), request.getOwner());
+ ContainerWithPipeline containerWithPipeline = impl
+ .allocateContainer(request.getReplicationType(),
+ request.getReplicationFactor(), request.getOwner());
return ContainerResponseProto.newBuilder()
- .setContainerInfo(container.getProtobuf())
+ .setContainerWithPipeline(containerWithPipeline.getProtobuf())
.setErrorCode(ContainerResponseProto.Error.success)
.build();
@@ -109,6 +113,21 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
}
@Override
+ public GetContainerWithPipelineResponseProto getContainerWithPipeline(
+ RpcController controller, GetContainerWithPipelineRequestProto request)
+ throws ServiceException {
+ try {
+ ContainerWithPipeline container = impl
+ .getContainerWithPipeline(request.getContainerID());
+ return GetContainerWithPipelineResponseProto.newBuilder()
+ .setContainerWithPipeline(container.getProtobuf())
+ .build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
public SCMListContainerResponseProto listContainer(RpcController controller,
SCMListContainerRequestProto request) throws ServiceException {
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
index 090e6eb..143c2ae 100644
--- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
@@ -52,7 +52,7 @@ message ContainerResponseProto {
errorContainerMissing = 3;
}
required Error errorCode = 1;
- required SCMContainerInfo containerInfo = 2;
+ required ContainerWithPipeline containerWithPipeline = 2;
optional string errorMessage = 3;
}
@@ -64,6 +64,14 @@ message GetContainerResponseProto {
required SCMContainerInfo containerInfo = 1;
}
+message GetContainerWithPipelineRequestProto {
+ required int64 containerID = 1;
+}
+
+message GetContainerWithPipelineResponseProto {
+ required ContainerWithPipeline containerWithPipeline = 1;
+}
+
message SCMListContainerRequestProto {
required uint32 count = 1;
optional uint64 startContainerID = 2;
@@ -171,6 +179,11 @@ service StorageContainerLocationProtocolService {
*/
rpc getContainer(GetContainerRequestProto) returns (GetContainerResponseProto);
+ /**
+ * Returns the pipeline for a given container.
+ */
+ rpc getContainerWithPipeline(GetContainerWithPipelineRequestProto) returns (GetContainerWithPipelineResponseProto);
+
rpc listContainer(SCMListContainerRequestProto) returns (SCMListContainerResponseProto);
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/proto/hdds.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto
index 816efa7..1c9ee19 100644
--- a/hadoop-hdds/common/src/main/proto/hdds.proto
+++ b/hadoop-hdds/common/src/main/proto/hdds.proto
@@ -132,7 +132,7 @@ enum LifeCycleEvent {
message SCMContainerInfo {
required int64 containerID = 1;
required LifeCycleState state = 2;
- required Pipeline pipeline = 3;
+ optional string pipelineName = 3;
// This is not total size of container, but space allocated by SCM for
// clients to write blocks
required uint64 allocatedBytes = 4;
@@ -141,6 +141,13 @@ message SCMContainerInfo {
optional int64 stateEnterTime = 7;
required string owner = 8;
optional int64 deleteTransactionId = 9;
+ required ReplicationFactor replicationFactor = 10;
+ required ReplicationType replicationType = 11;
+}
+
+message ContainerWithPipeline {
+ required SCMContainerInfo containerInfo = 1;
+ required Pipeline pipeline = 2;
}
message GetScmInfoRequestProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 7cfbdab..953f71e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -16,10 +16,12 @@
*/
package org.apache.hadoop.hdds.scm.block;
+import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.Mapping;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -156,13 +158,13 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
lock.lock();
try {
for (int i = 0; i < count; i++) {
- ContainerInfo containerInfo = null;
+ ContainerWithPipeline containerWithPipeline = null;
try {
// TODO: Fix this later when Ratis is made the Default.
- containerInfo = containerManager.allocateContainer(type, factor,
+ containerWithPipeline = containerManager.allocateContainer(type, factor,
owner);
- if (containerInfo == null) {
+ if (containerWithPipeline == null) {
LOG.warn("Unable to allocate container.");
continue;
}
@@ -231,30 +233,27 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
can use different kind of policies.
*/
- ContainerInfo containerInfo;
+ ContainerWithPipeline containerWithPipeline;
// Look for ALLOCATED container that matches all other parameters.
- containerInfo =
- containerManager
- .getStateManager()
- .getMatchingContainer(
- size, owner, type, factor, HddsProtos.LifeCycleState
- .ALLOCATED);
- if (containerInfo != null) {
- containerManager.updateContainerState(containerInfo.getContainerID(),
+ containerWithPipeline = containerManager
+ .getMatchingContainerWithPipeline(size, owner, type, factor,
+ HddsProtos.LifeCycleState.ALLOCATED);
+ if (containerWithPipeline != null) {
+ containerManager.updateContainerState(
+ containerWithPipeline.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
- return newBlock(containerInfo, HddsProtos.LifeCycleState.ALLOCATED);
+ return newBlock(containerWithPipeline,
+ HddsProtos.LifeCycleState.ALLOCATED);
}
// Since we found no allocated containers that match our criteria, let us
// look for OPEN containers that match the criteria.
- containerInfo =
- containerManager
- .getStateManager()
- .getMatchingContainer(size, owner, type, factor, HddsProtos
- .LifeCycleState.OPEN);
- if (containerInfo != null) {
- return newBlock(containerInfo, HddsProtos.LifeCycleState.OPEN);
+ containerWithPipeline = containerManager
+ .getMatchingContainerWithPipeline(size, owner, type, factor,
+ HddsProtos.LifeCycleState.OPEN);
+ if (containerWithPipeline != null) {
+ return newBlock(containerWithPipeline, HddsProtos.LifeCycleState.OPEN);
}
// We found neither ALLOCATED or OPEN Containers. This generally means
@@ -264,16 +263,15 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
preAllocateContainers(containerProvisionBatchSize, type, factor, owner);
// Since we just allocated a set of containers this should work
- containerInfo =
- containerManager
- .getStateManager()
- .getMatchingContainer(
- size, owner, type, factor, HddsProtos.LifeCycleState
- .ALLOCATED);
- if (containerInfo != null) {
- containerManager.updateContainerState(containerInfo.getContainerID(),
+ containerWithPipeline = containerManager
+ .getMatchingContainerWithPipeline(size, owner, type, factor,
+ HddsProtos.LifeCycleState.ALLOCATED);
+ if (containerWithPipeline != null) {
+ containerManager.updateContainerState(
+ containerWithPipeline.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
- return newBlock(containerInfo, HddsProtos.LifeCycleState.ALLOCATED);
+ return newBlock(containerWithPipeline,
+ HddsProtos.LifeCycleState.ALLOCATED);
}
// we have tried all strategies we know and but somehow we are not able
@@ -290,18 +288,28 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
}
}
+ private String getChannelName(ReplicationType type) {
+ switch (type) {
+ case RATIS:
+ return "RA" + UUID.randomUUID().toString().substring(3);
+ case STAND_ALONE:
+ return "SA" + UUID.randomUUID().toString().substring(3);
+ default:
+ return "RA" + UUID.randomUUID().toString().substring(3);
+ }
+ }
+
/**
* newBlock - returns a new block assigned to a container.
*
- * @param containerInfo - Container Info.
+ * @param containerWithPipeline - Container Info.
* @param state - Current state of the container.
* @return AllocatedBlock
*/
- private AllocatedBlock newBlock(
- ContainerInfo containerInfo, HddsProtos.LifeCycleState state)
- throws IOException {
-
- if (containerInfo.getPipeline().getMachines().size() == 0) {
+ private AllocatedBlock newBlock(ContainerWithPipeline containerWithPipeline,
+ HddsProtos.LifeCycleState state) throws IOException {
+ ContainerInfo containerInfo = containerWithPipeline.getContainerInfo();
+ if (containerWithPipeline.getPipeline().getDatanodes().size() == 0) {
LOG.error("Pipeline Machine count is zero.");
return null;
}
@@ -317,7 +325,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
AllocatedBlock.Builder abb =
new AllocatedBlock.Builder()
.setBlockID(new BlockID(containerID, localID))
- .setPipeline(containerInfo.getPipeline())
+ .setPipeline(containerWithPipeline.getPipeline())
.setShouldCreateContainer(createContainer);
LOG.trace("New block allocated : {} Container ID: {}", localID,
containerID);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
index 32290cc..d71e7b0 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
@@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.block;
import com.google.common.collect.ArrayListMultimap;
import org.apache.hadoop.hdds.scm.container.Mapping;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
@@ -29,6 +28,7 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
/**
* A wrapper class to hold info about datanode and all deleted block
@@ -54,21 +54,22 @@ public class DatanodeDeletedBlockTransactions {
}
public void addTransaction(DeletedBlocksTransaction tx) throws IOException {
- ContainerInfo info = null;
+ Pipeline pipeline = null;
try {
- info = mappingService.getContainer(tx.getContainerID());
+ pipeline = mappingService.getContainerWithPipeline(tx.getContainerID())
+ .getPipeline();
} catch (IOException e) {
SCMBlockDeletingService.LOG.warn("Got container info error.", e);
}
- if (info == null) {
+ if (pipeline == null) {
SCMBlockDeletingService.LOG.warn(
"Container {} not found, continue to process next",
tx.getContainerID());
return;
}
- for (DatanodeDetails dd : info.getPipeline().getMachines()) {
+ for (DatanodeDetails dd : pipeline.getMachines()) {
UUID dnID = dd.getUuid();
if (transactions.containsKey(dnID)) {
List<DeletedBlocksTransaction> txs = transactions.get(dnID);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index 16e84a3..7b24538 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -16,9 +16,11 @@
*/
package org.apache.hadoop.hdds.scm.container;
+import java.io.IOException;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -54,22 +56,32 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
LOG.info("Close container Event triggered for container : {}",
containerID.getId());
- ContainerStateManager stateManager = containerManager.getStateManager();
- ContainerInfo info = stateManager.getContainer(containerID);
- if (info == null) {
- LOG.info("Container with id : {} does not exist", containerID.getId());
+ ContainerWithPipeline containerWithPipeline = null;
+ ContainerInfo info;
+ try {
+ containerWithPipeline = containerManager.getContainerWithPipeline(containerID.getId());
+ info = containerWithPipeline.getContainerInfo();
+ if (info == null) {
+ LOG.info("Failed to update the container state. Container with id : {} "
+ + "does not exist", containerID.getId());
+ return;
+ }
+ } catch (IOException e) {
+ LOG.info("Failed to update the container state. Container with id : {} "
+ + "does not exist", containerID.getId());
return;
}
+
if (info.getState() == HddsProtos.LifeCycleState.OPEN) {
- for (DatanodeDetails datanode : info.getPipeline().getMachines()) {
+ for (DatanodeDetails datanode : containerWithPipeline.getPipeline().getMachines()) {
containerManager.getNodeManager().addDatanodeCommand(datanode.getUuid(),
new CloseContainerCommand(containerID.getId(),
- info.getPipeline().getType()));
+ info.getReplicationType()));
}
try {
// Finalize event will make sure the state of the container transitions
// from OPEN to CLOSING in containerStateManager.
- stateManager
+ containerManager.getStateManager()
.updateContainerState(info, HddsProtos.LifeCycleEvent.FINALIZE);
} catch (SCMException ex) {
LOG.error("Failed to update the container state for container : {}"
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index 9fd30f2..e25c5b4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -21,6 +21,10 @@ import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
@@ -167,6 +171,44 @@ public class ContainerMapping implements Mapping {
}
/**
+ * Returns the ContainerInfo from the container ID.
+ *
+ * @param containerID - ID of container.
+ * @return - ContainerWithPipeline such as creation state and the pipeline.
+ * @throws IOException
+ */
+ @Override
+ public ContainerWithPipeline getContainerWithPipeline(long containerID)
+ throws IOException {
+ ContainerInfo contInfo;
+ lock.lock();
+ try {
+ byte[] containerBytes = containerStore.get(
+ Longs.toByteArray(containerID));
+ if (containerBytes == null) {
+ throw new SCMException(
+ "Specified key does not exist. key : " + containerID,
+ SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+ }
+ HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER
+ .parseFrom(containerBytes);
+ contInfo = ContainerInfo.fromProtobuf(temp);
+ Pipeline pipeline = pipelineSelector
+ .getPipeline(contInfo.getPipelineName(),
+ contInfo.getReplicationType());
+
+ if(pipeline == null) {
+ pipeline = pipelineSelector
+ .getReplicationPipeline(contInfo.getReplicationType(),
+ contInfo.getReplicationFactor());
+ }
+ return new ContainerWithPipeline(contInfo, pipeline);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
* {@inheritDoc}
*/
@Override
@@ -208,13 +250,15 @@ public class ContainerMapping implements Mapping {
* @throws IOException - Exception
*/
@Override
- public ContainerInfo allocateContainer(
+ public ContainerWithPipeline allocateContainer(
ReplicationType type,
ReplicationFactor replicationFactor,
String owner)
throws IOException {
ContainerInfo containerInfo;
+ ContainerWithPipeline containerWithPipeline;
+
if (!nodeManager.isOutOfChillMode()) {
throw new SCMException(
"Unable to create container while in chill mode",
@@ -223,9 +267,9 @@ public class ContainerMapping implements Mapping {
lock.lock();
try {
- containerInfo =
- containerStateManager.allocateContainer(
+ containerWithPipeline = containerStateManager.allocateContainer(
pipelineSelector, type, replicationFactor, owner);
+ containerInfo = containerWithPipeline.getContainerInfo();
byte[] containerIDBytes = Longs.toByteArray(
containerInfo.getContainerID());
@@ -234,7 +278,7 @@ public class ContainerMapping implements Mapping {
} finally {
lock.unlock();
}
- return containerInfo;
+ return containerWithPipeline;
}
/**
@@ -381,6 +425,35 @@ public class ContainerMapping implements Mapping {
}
/**
+ * Return a container matching the attributes specified.
+ *
+ * @param size - Space needed in the Container.
+ * @param owner - Owner of the container - A specific nameservice.
+ * @param type - Replication Type {StandAlone, Ratis}
+ * @param factor - Replication Factor {ONE, THREE}
+ * @param state - State of the Container-- {Open, Allocated etc.}
+ * @return ContainerInfo, null if there is no match found.
+ */
+ public ContainerWithPipeline getMatchingContainerWithPipeline(final long size,
+ String owner, ReplicationType type, ReplicationFactor factor,
+ LifeCycleState state) throws IOException {
+ ContainerInfo containerInfo = getStateManager()
+ .getMatchingContainer(size, owner, type, factor, state);
+ if (containerInfo == null) {
+ return null;
+ }
+ Pipeline pipeline = pipelineSelector
+ .getPipeline(containerInfo.getPipelineName(),
+ containerInfo.getReplicationType());
+ if (pipeline == null) {
+ pipelineSelector
+ .getReplicationPipeline(containerInfo.getReplicationType(),
+ containerInfo.getReplicationFactor());
+ }
+ return new ContainerWithPipeline(containerInfo, pipeline);
+ }
+
+ /**
* Process container report from Datanode.
* <p>
* Processing follows a very simple logic for time being.
@@ -415,7 +488,7 @@ public class ContainerMapping implements Mapping {
HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
HddsProtos.SCMContainerInfo newState =
- reconcileState(datanodeState, knownState);
+ reconcileState(datanodeState, knownState, datanodeDetails);
// FIX ME: This can be optimized, we write twice to memory, where a
// single write would work well.
@@ -425,8 +498,14 @@ public class ContainerMapping implements Mapping {
containerStore.put(dbKey, newState.toByteArray());
// If the container is closed, then state is already written to SCM
+ Pipeline pipeline = pipelineSelector.getPipeline(newState.getPipelineName(), newState.getReplicationType());
+ if(pipeline == null) {
+ pipeline = pipelineSelector
+ .getReplicationPipeline(newState.getReplicationType(),
+ newState.getReplicationFactor());
+ }
// DB.TODO: So can we can write only once to DB.
- if (closeContainerIfNeeded(newState)) {
+ if (closeContainerIfNeeded(newState, pipeline)) {
LOG.info("Closing the Container: {}", newState.getContainerID());
}
} else {
@@ -447,15 +526,22 @@ public class ContainerMapping implements Mapping {
*
* @param datanodeState - State from the Datanode.
* @param knownState - State inside SCM.
+ * @param dnDetails
* @return new SCM State for this container.
*/
private HddsProtos.SCMContainerInfo reconcileState(
StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState,
- HddsProtos.SCMContainerInfo knownState) {
+ SCMContainerInfo knownState, DatanodeDetails dnDetails) {
HddsProtos.SCMContainerInfo.Builder builder =
HddsProtos.SCMContainerInfo.newBuilder();
- builder.setContainerID(knownState.getContainerID());
- builder.setPipeline(knownState.getPipeline());
+ builder.setContainerID(knownState.getContainerID())
+ .setPipelineName(knownState.getPipelineName())
+ .setReplicationType(knownState.getReplicationType())
+ .setReplicationFactor(knownState.getReplicationFactor());
+
+ // TODO: If current state doesn't have this DN in list of DataNodes with replica
+ // then add it in list of replicas.
+
// If used size is greater than allocated size, we will be updating
// allocated size with used size. This update is done as a fallback
// mechanism in case SCM crashes without properly updating allocated
@@ -464,13 +550,13 @@ public class ContainerMapping implements Mapping {
long usedSize = datanodeState.getUsed();
long allocated = knownState.getAllocatedBytes() > usedSize ?
knownState.getAllocatedBytes() : usedSize;
- builder.setAllocatedBytes(allocated);
- builder.setUsedBytes(usedSize);
- builder.setNumberOfKeys(datanodeState.getKeyCount());
- builder.setState(knownState.getState());
- builder.setStateEnterTime(knownState.getStateEnterTime());
- builder.setContainerID(knownState.getContainerID());
- builder.setDeleteTransactionId(knownState.getDeleteTransactionId());
+ builder.setAllocatedBytes(allocated)
+ .setUsedBytes(usedSize)
+ .setNumberOfKeys(datanodeState.getKeyCount())
+ .setState(knownState.getState())
+ .setStateEnterTime(knownState.getStateEnterTime())
+ .setContainerID(knownState.getContainerID())
+ .setDeleteTransactionId(knownState.getDeleteTransactionId());
if (knownState.getOwner() != null) {
builder.setOwner(knownState.getOwner());
}
@@ -485,9 +571,11 @@ public class ContainerMapping implements Mapping {
* one protobuf in one file and another definition in another file.
*
* @param newState - This is the state we maintain in SCM.
+ * @param pipeline
* @throws IOException
*/
- private boolean closeContainerIfNeeded(HddsProtos.SCMContainerInfo newState)
+ private boolean closeContainerIfNeeded(SCMContainerInfo newState,
+ Pipeline pipeline)
throws IOException {
float containerUsedPercentage = 1.0f *
newState.getUsedBytes() / this.size;
@@ -498,7 +586,7 @@ public class ContainerMapping implements Mapping {
// We will call closer till get to the closed state.
// That is SCM will make this call repeatedly until we reach the closed
// state.
- closer.close(newState);
+ closer.close(newState, pipeline);
if (shouldClose(scmInfo)) {
// This event moves the Container from Open to Closing State, this is
@@ -598,10 +686,12 @@ public class ContainerMapping implements Mapping {
.setAllocatedBytes(info.getAllocatedBytes())
.setNumberOfKeys(oldInfo.getNumberOfKeys())
.setOwner(oldInfo.getOwner())
- .setPipeline(oldInfo.getPipeline())
+ .setPipelineName(oldInfo.getPipelineName())
.setState(oldInfo.getState())
.setUsedBytes(oldInfo.getUsedBytes())
.setDeleteTransactionId(oldInfo.getDeleteTransactionId())
+ .setReplicationFactor(oldInfo.getReplicationFactor())
+ .setReplicationType(oldInfo.getReplicationType())
.build();
containerStore.put(dbKey, newInfo.getProtobuf().toByteArray());
} else {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 08733bd..870ab1d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.container;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.states.ContainerState;
@@ -279,10 +280,10 @@ public class ContainerStateManager implements Closeable {
* @param selector -- Pipeline selector class.
* @param type -- Replication type.
* @param replicationFactor - Replication replicationFactor.
- * @return Container Info.
+ * @return ContainerWithPipeline
* @throws IOException on Failure.
*/
- public ContainerInfo allocateContainer(PipelineSelector selector, HddsProtos
+ public ContainerWithPipeline allocateContainer(PipelineSelector selector, HddsProtos
.ReplicationType type, HddsProtos.ReplicationFactor replicationFactor,
String owner) throws IOException {
@@ -295,7 +296,7 @@ public class ContainerStateManager implements Closeable {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(HddsProtos.LifeCycleState.ALLOCATED)
- .setPipeline(pipeline)
+ .setPipelineName(pipeline.getPipelineName())
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0)
@@ -305,11 +306,13 @@ public class ContainerStateManager implements Closeable {
.setOwner(owner)
.setContainerID(containerCount.incrementAndGet())
.setDeleteTransactionId(0)
+ .setReplicationFactor(replicationFactor)
+ .setReplicationType(pipeline.getType())
.build();
Preconditions.checkNotNull(containerInfo);
containers.addContainer(containerInfo);
LOG.trace("New container allocated: {}", containerInfo);
- return containerInfo;
+ return new ContainerWithPipeline(containerInfo, pipeline);
}
/**
@@ -432,8 +435,8 @@ public class ContainerStateManager implements Closeable {
containerInfo.updateLastUsedTime();
ContainerState key = new ContainerState(owner,
- containerInfo.getPipeline().getType(),
- containerInfo.getPipeline().getFactor());
+ containerInfo.getReplicationType(),
+ containerInfo.getReplicationFactor());
lastUsedMap.put(key, containerInfo.containerID());
return containerInfo;
}
@@ -458,6 +461,20 @@ public class ContainerStateManager implements Closeable {
}
/**
+ * Returns the containerInfo with pipeline for the given container id.
+ * @param selector -- Pipeline selector class.
+ * @param containerID id of the container
+ * @return ContainerInfo containerInfo
+ * @throws IOException
+ */
+ public ContainerWithPipeline getContainer(PipelineSelector selector,
+ ContainerID containerID) throws IOException {
+ ContainerInfo info = containers.getContainerInfo(containerID.getId());
+ Pipeline pipeline = selector.getPipeline(info.getPipelineName(), info.getReplicationType());
+ return new ContainerWithPipeline(info, pipeline);
+ }
+
+ /**
* Returns the containerInfo for the given container id.
* @param containerID id of the container
* @return ContainerInfo containerInfo
@@ -466,6 +483,7 @@ public class ContainerStateManager implements Closeable {
public ContainerInfo getContainer(ContainerID containerID) {
return containers.getContainerInfo(containerID.getId());
}
+
@Override
public void close() throws IOException {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
index e77a4b6..f52eb05 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
@@ -17,6 +17,10 @@
package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
@@ -43,6 +47,16 @@ public interface Mapping extends Closeable {
ContainerInfo getContainer(long containerID) throws IOException;
/**
+ * Returns the ContainerInfo from the container ID.
+ *
+ * @param containerID - ID of container.
+ * @return - ContainerWithPipeline such as creation state and the pipeline.
+ * @throws IOException
+ */
+ ContainerWithPipeline getContainerWithPipeline(long containerID)
+ throws IOException;
+
+ /**
* Returns containers under certain conditions.
* Search container IDs from start ID(exclusive),
* The max size of the searching range cannot exceed the
@@ -65,10 +79,10 @@ public interface Mapping extends Closeable {
*
* @param replicationFactor - replication factor of the container.
* @param owner
- * @return - Container Info.
+ * @return - ContainerWithPipeline.
* @throws IOException
*/
- ContainerInfo allocateContainer(HddsProtos.ReplicationType type,
+ ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor replicationFactor, String owner)
throws IOException;
@@ -120,4 +134,12 @@ public interface Mapping extends Closeable {
* @return NodeManager
*/
NodeManager getNodeManager();
+
+ /**
+ * Returns the ContainerWithPipeline.
+ * @return NodeManager
+ */
+ public ContainerWithPipeline getMatchingContainerWithPipeline(final long size,
+ String owner, ReplicationType type, ReplicationFactor factor,
+ LifeCycleState state) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
index cbb2ba7..3ca8ba9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
@@ -22,6 +22,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -90,8 +92,10 @@ public class ContainerCloser {
* lives.
*
* @param info - ContainerInfo.
+ * @param pipeline
*/
- public void close(HddsProtos.SCMContainerInfo info) {
+ public void close(SCMContainerInfo info,
+ Pipeline pipeline) {
if (commandIssued.containsKey(info.getContainerID())) {
// We check if we issued a close command in last 3 * reportInterval secs.
@@ -126,13 +130,10 @@ public class ContainerCloser {
// this queue can be emptied by a datanode after a close report is send
// to SCM. In that case also, data node will ignore this command.
- HddsProtos.Pipeline pipeline = info.getPipeline();
- for (HddsProtos.DatanodeDetailsProto datanodeDetails :
- pipeline.getMembersList()) {
- nodeManager.addDatanodeCommand(
- DatanodeDetails.getFromProtoBuf(datanodeDetails).getUuid(),
+ for (DatanodeDetails datanodeDetails : pipeline.getMachines()) {
+ nodeManager.addDatanodeCommand(datanodeDetails.getUuid(),
new CloseContainerCommand(info.getContainerID(),
- pipeline.getType()));
+ info.getReplicationType()));
}
if (!commandIssued.containsKey(info.getContainerID())) {
commandIssued.put(info.getContainerID(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index 48c6423..3ada8fe 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -116,7 +116,8 @@ public class ContainerStateMap {
public void addContainer(ContainerInfo info)
throws SCMException {
Preconditions.checkNotNull(info, "Container Info cannot be null");
- Preconditions.checkNotNull(info.getPipeline(), "Pipeline cannot be null");
+ Preconditions.checkArgument(info.getReplicationFactor().getNumber() > 0,
+ "ExpectedReplicaCount should be greater than 0");
try (AutoCloseableLock lock = autoLock.acquire()) {
ContainerID id = ContainerID.valueof(info.getContainerID());
@@ -129,8 +130,8 @@ public class ContainerStateMap {
lifeCycleStateMap.insert(info.getState(), id);
ownerMap.insert(info.getOwner(), id);
- factorMap.insert(info.getPipeline().getFactor(), id);
- typeMap.insert(info.getPipeline().getType(), id);
+ factorMap.insert(info.getReplicationFactor(), id);
+ typeMap.insert(info.getReplicationType(), id);
LOG.trace("Created container with {} successfully.", id);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
index 48affa4..a1fbce6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
@@ -16,6 +16,9 @@
*/
package org.apache.hadoop.hdds.scm.pipelines;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.WeakHashMap;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
@@ -25,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -36,11 +38,13 @@ public abstract class PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(PipelineManager.class);
private final List<Pipeline> activePipelines;
+ private final Map<String, Pipeline> activePipelineMap;
private final AtomicInteger pipelineIndex;
public PipelineManager() {
activePipelines = new LinkedList<>();
pipelineIndex = new AtomicInteger(0);
+ activePipelineMap = new WeakHashMap<>();
}
/**
@@ -76,6 +80,7 @@ public abstract class PipelineManager {
"replicationType:{} replicationFactor:{}",
pipeline.getPipelineName(), replicationType, replicationFactor);
activePipelines.add(pipeline);
+ activePipelineMap.put(pipeline.getPipelineName(), pipeline);
} else {
pipeline =
findOpenPipeline(replicationType, replicationFactor);
@@ -94,6 +99,26 @@ public abstract class PipelineManager {
}
}
+ /**
+ * This function to get pipeline with given pipeline name.
+ *
+ * @param pipelineName
+ * @return a Pipeline.
+ */
+ public synchronized final Pipeline getPipeline(String pipelineName) {
+ Pipeline pipeline = null;
+
+ // 1. Check if pipeline channel already exists
+ if (activePipelineMap.containsKey(pipelineName)) {
+ pipeline = activePipelineMap.get(pipelineName);
+ LOG.debug("Returning pipeline for pipelineName:{}", pipelineName);
+ return pipeline;
+ } else {
+ LOG.debug("Unable to find pipeline for pipelineName:{}", pipelineName);
+ }
+ return pipeline;
+ }
+
protected int getReplicationCount(ReplicationFactor factor) {
switch (factor) {
case ONE:
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
index 508ca9b..3846a84 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.pipelines;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
@@ -177,6 +178,21 @@ public class PipelineSelector {
}
/**
+ * This function to return pipeline for given pipeline name and replication
+ * type.
+ */
+ public Pipeline getPipeline(String pipelineName,
+ ReplicationType replicationType) throws IOException {
+ if (pipelineName == null) {
+ return null;
+ }
+ PipelineManager manager = getPipelineManager(replicationType);
+ Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
+ LOG.debug("Getting replication pipeline forReplicationType {} :" +
+ " pipelineName:{}", replicationType, pipelineName);
+ return manager.getPipeline(pipelineName);
+ }
+ /**
* Creates a pipeline from a specified set of Nodes.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
index ace8758..189060e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.pipelines.ratis;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
index e76027f..579a3a2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.scm.pipelines.standalone;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index d73cccd..e1d478f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
@@ -145,11 +146,12 @@ public class SCMClientProtocolServer implements
}
@Override
- public ContainerInfo allocateContainer(HddsProtos.ReplicationType
+ public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType
replicationType, HddsProtos.ReplicationFactor factor,
String owner) throws IOException {
String remoteUser = getRpcRemoteUsername();
getScm().checkAdminAccess(remoteUser);
+
return scm.getScmContainerManager()
.allocateContainer(replicationType, factor, owner);
}
@@ -163,6 +165,14 @@ public class SCMClientProtocolServer implements
}
@Override
+ public ContainerWithPipeline getContainerWithPipeline(long containerID) throws IOException {
+ String remoteUser = getRpcRemoteUsername();
+ getScm().checkAdminAccess(remoteUser);
+ return scm.getScmContainerManager()
+ .getContainerWithPipeline(containerID);
+ }
+
+ @Override
public List<ContainerInfo> listContainer(long startContainerID,
int count) throws IOException {
return scm.getScmContainerManager().
@@ -248,7 +258,7 @@ public class SCMClientProtocolServer implements
throws IOException {
// TODO: will be addressed in future patch.
// This is needed only for debugging purposes to make sure cluster is
- // working correctly.
+ // working correctly.
return null;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org