You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/05/09 17:40:45 UTC
[07/26] hadoop git commit: HDDS-1. Remove SCM Block DB. Contributed
by Xiaoyu Yao.
HDDS-1. Remove SCM Block DB. Contributed by Xiaoyu Yao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3a43ac28
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3a43ac28
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3a43ac28
Branch: refs/heads/HDDS-4
Commit: 3a43ac2851f5dea4deb8a1dfebf9bf65fc57bd76
Parents: a3a1552
Author: Anu Engineer <ae...@apache.org>
Authored: Mon May 7 14:42:18 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Mon May 7 14:58:52 2018 -0700
----------------------------------------------------------------------
.../main/java/org/apache/hadoop/util/Time.java | 12 +
.../hadoop/hdds/scm/XceiverClientManager.java | 17 +-
.../scm/client/ContainerOperationClient.java | 121 +++---
.../hdds/scm/storage/ChunkInputStream.java | 13 +-
.../hdds/scm/storage/ChunkOutputStream.java | 18 +-
.../org/apache/hadoop/hdds/client/BlockID.java | 59 +++
.../hadoop/hdds/scm/client/ScmClient.java | 46 +-
.../common/helpers/AllocatedBlock.java | 20 +-
.../container/common/helpers/ContainerInfo.java | 22 +-
.../common/helpers/DeleteBlockResult.java | 16 +-
.../scm/container/common/helpers/Pipeline.java | 18 +-
.../common/helpers/PipelineChannel.java | 2 +
.../scm/protocol/ScmBlockLocationProtocol.java | 12 -
.../StorageContainerLocationProtocol.java | 26 +-
...kLocationProtocolClientSideTranslatorPB.java | 46 +-
...rLocationProtocolClientSideTranslatorPB.java | 60 ++-
.../scm/storage/ContainerProtocolCalls.java | 66 ++-
.../apache/hadoop/ozone/common/BlockGroup.java | 26 +-
.../ozone/common/DeleteBlockGroupResult.java | 13 +-
.../ozone/container/common/helpers/KeyData.java | 41 +-
...kLocationProtocolServerSideTranslatorPB.java | 38 +-
...rLocationProtocolServerSideTranslatorPB.java | 30 +-
.../org/apache/hadoop/utils/LevelDBStore.java | 4 +-
.../apache/hadoop/utils/MetadataKeyFilters.java | 20 +-
.../org/apache/hadoop/utils/RocksDBStore.java | 1 +
.../main/proto/DatanodeContainerProtocol.proto | 82 ++--
.../main/proto/ScmBlockLocationProtocol.proto | 36 +-
.../StorageContainerLocationProtocol.proto | 16 +-
hadoop-hdds/common/src/main/proto/hdds.proto | 13 +-
.../container/common/helpers/ChunkUtils.java | 20 +-
.../container/common/helpers/ContainerData.java | 33 +-
.../common/helpers/ContainerReport.java | 19 +-
.../common/helpers/ContainerUtils.java | 14 +-
.../helpers/DeletedContainerBlocksSummary.java | 19 +-
.../container/common/helpers/FileUtils.java | 3 +-
.../container/common/helpers/KeyUtils.java | 6 +-
.../container/common/impl/ChunkManagerImpl.java | 76 ++--
.../common/impl/ContainerManagerImpl.java | 210 +++++-----
.../ozone/container/common/impl/Dispatcher.java | 121 +++---
.../container/common/impl/KeyManagerImpl.java | 66 ++-
.../RandomContainerDeletionChoosingPolicy.java | 4 +-
...NOrderedContainerDeletionChoosingPolicy.java | 4 +-
.../common/interfaces/ChunkManager.java | 19 +-
.../ContainerDeletionChoosingPolicy.java | 2 +-
.../common/interfaces/ContainerManager.java | 76 ++--
.../container/common/interfaces/KeyManager.java | 17 +-
.../background/BlockDeletingService.java | 8 +-
.../commandhandler/CloseContainerHandler.java | 9 +-
.../DeleteBlocksCommandHandler.java | 9 +-
.../states/endpoint/HeartbeatEndpointTask.java | 2 +-
.../server/ratis/ContainerStateMachine.java | 21 +-
.../container/common/utils/ContainerCache.java | 36 +-
.../commands/CloseContainerCommand.java | 14 +-
.../StorageContainerDatanodeProtocol.proto | 9 +-
.../ozone/container/common/ScmTestMock.java | 2 +-
.../hadoop/hdds/scm/block/BlockManager.java | 12 +-
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 171 ++------
.../block/DatanodeDeletedBlockTransactions.java | 6 +-
.../hadoop/hdds/scm/block/DeletedBlockLog.java | 6 +-
.../hdds/scm/block/DeletedBlockLogImpl.java | 24 +-
.../hdds/scm/container/ContainerMapping.java | 80 ++--
.../scm/container/ContainerStateManager.java | 12 +-
.../hadoop/hdds/scm/container/Mapping.java | 33 +-
.../scm/container/closer/ContainerCloser.java | 16 +-
.../container/replication/InProgressPool.java | 10 +-
.../hdds/scm/node/SCMNodePoolManager.java | 2 +-
.../hdds/scm/pipelines/PipelineManager.java | 15 +-
.../hdds/scm/pipelines/PipelineSelector.java | 8 +-
.../scm/pipelines/ratis/RatisManagerImpl.java | 2 +-
.../hdds/scm/server/SCMBlockProtocolServer.java | 20 +-
.../scm/server/SCMClientProtocolServer.java | 47 ++-
.../scm/server/StorageContainerManager.java | 9 +-
.../hadoop/hdds/scm/block/TestBlockManager.java | 34 +-
.../hdds/scm/block/TestDeletedBlockLog.java | 56 +--
.../scm/container/TestContainerMapping.java | 79 ++--
.../container/closer/TestContainerCloser.java | 25 +-
.../hdds/scm/node/TestContainerPlacement.java | 9 +-
.../ozone/container/common/TestEndPoint.java | 7 +-
.../replication/TestContainerSupervisor.java | 32 +-
.../ReplicationDatanodeStateManager.java | 11 +-
.../cli/container/CloseContainerHandler.java | 26 +-
.../cli/container/ContainerCommandHandler.java | 1 -
.../cli/container/CreateContainerHandler.java | 22 +-
.../cli/container/DeleteContainerHandler.java | 26 +-
.../scm/cli/container/InfoContainerHandler.java | 31 +-
.../scm/cli/container/ListContainerHandler.java | 12 +-
.../ozone/client/io/ChunkGroupInputStream.java | 25 +-
.../ozone/client/io/ChunkGroupOutputStream.java | 35 +-
.../client/io/OzoneContainerTranslation.java | 11 +-
.../ozone/ksm/helpers/KsmKeyLocationInfo.java | 41 +-
.../ksm/helpers/KsmKeyLocationInfoGroup.java | 2 +-
.../hadoop/ozone/web/handlers/UserArgs.java | 8 +-
.../main/proto/KeySpaceManagerProtocol.proto | 11 +-
.../container/TestContainerStateManager.java | 124 +++---
.../hadoop/ozone/TestContainerOperations.java | 10 +-
.../hadoop/ozone/TestMiniOzoneCluster.java | 3 +-
.../ozone/TestStorageContainerManager.java | 64 +--
.../TestStorageContainerManagerHelper.java | 33 +-
.../ozone/client/rpc/TestOzoneRpcClient.java | 10 +-
.../ozone/container/ContainerTestHelper.java | 180 ++++----
.../common/TestBlockDeletingService.java | 25 +-
.../TestContainerDeletionChoosingPolicy.java | 19 +-
.../common/impl/TestContainerPersistence.java | 419 +++++++++----------
.../TestCloseContainerHandler.java | 14 +-
.../container/metrics/TestContainerMetrics.java | 14 +-
.../container/ozoneimpl/TestOzoneContainer.java | 142 +++----
.../ozoneimpl/TestOzoneContainerRatis.java | 3 +-
.../container/server/TestContainerServer.java | 18 +-
.../ozone/ksm/TestContainerReportWithKeys.java | 8 +-
.../hadoop/ozone/ksm/TestKeySpaceManager.java | 8 -
.../ozone/ksm/TestKsmBlockVersioning.java | 2 +-
.../hadoop/ozone/scm/TestAllocateContainer.java | 24 +-
.../hadoop/ozone/scm/TestContainerSQLCli.java | 40 +-
.../ozone/scm/TestContainerSmallFile.java | 68 +--
.../org/apache/hadoop/ozone/scm/TestSCMCli.java | 210 ++++------
.../apache/hadoop/ozone/scm/TestSCMMetrics.java | 5 +-
.../ozone/scm/TestXceiverClientManager.java | 86 ++--
.../ozone/scm/TestXceiverClientMetrics.java | 19 +-
.../hadoop/ozone/web/client/TestKeys.java | 5 +-
.../ozone/ksm/KSMMetadataManagerImpl.java | 10 +-
.../hadoop/ozone/ksm/KeyDeletingService.java | 3 +-
.../apache/hadoop/ozone/ksm/KeyManagerImpl.java | 6 +-
.../hadoop/ozone/ksm/KeySpaceManager.java | 2 +-
.../hadoop/ozone/ksm/OpenKeyCleanupService.java | 3 +-
.../genesis/BenchMarkContainerStateMap.java | 10 +-
.../genesis/BenchMarkDatanodeDispatcher.java | 170 +++++---
.../ozone/genesis/BenchMarkRocksDbStore.java | 14 +-
127 files changed, 2059 insertions(+), 2402 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java
index db5a567..42005f0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.util;
import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.TimeZone;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -34,6 +36,8 @@ public final class Time {
*/
private static final long NANOSECONDS_PER_MILLISECOND = 1000000;
+ private static final TimeZone UTC_ZONE = TimeZone.getTimeZone("UTC");
+
private static final ThreadLocal<SimpleDateFormat> DATE_FORMAT =
new ThreadLocal<SimpleDateFormat>() {
@Override
@@ -82,4 +86,12 @@ public final class Time {
public static String formatTime(long millis) {
return DATE_FORMAT.get().format(millis);
}
+
+ /**
+ * Get the current UTC time in milliseconds.
+ * @return the current UTC time in milliseconds.
+ */
+ public static long getUtcTime() {
+ return Calendar.getInstance(UTC_ZONE).getTimeInMillis();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index 7585104..dcaa576 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -60,7 +60,7 @@ public class XceiverClientManager implements Closeable {
//TODO : change this to SCM configuration class
private final Configuration conf;
- private final Cache<String, XceiverClientSpi> clientCache;
+ private final Cache<Long, XceiverClientSpi> clientCache;
private final boolean useRatis;
private static XceiverClientMetrics metrics;
@@ -84,10 +84,10 @@ public class XceiverClientManager implements Closeable {
.expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS)
.maximumSize(maxSize)
.removalListener(
- new RemovalListener<String, XceiverClientSpi>() {
+ new RemovalListener<Long, XceiverClientSpi>() {
@Override
public void onRemoval(
- RemovalNotification<String, XceiverClientSpi>
+ RemovalNotification<Long, XceiverClientSpi>
removalNotification) {
synchronized (clientCache) {
// Mark the entry as evicted
@@ -99,7 +99,7 @@ public class XceiverClientManager implements Closeable {
}
@VisibleForTesting
- public Cache<String, XceiverClientSpi> getClientCache() {
+ public Cache<Long, XceiverClientSpi> getClientCache() {
return clientCache;
}
@@ -114,14 +114,14 @@ public class XceiverClientManager implements Closeable {
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
- public XceiverClientSpi acquireClient(Pipeline pipeline)
+ public XceiverClientSpi acquireClient(Pipeline pipeline, long containerID)
throws IOException {
Preconditions.checkNotNull(pipeline);
Preconditions.checkArgument(pipeline.getMachines() != null);
Preconditions.checkArgument(!pipeline.getMachines().isEmpty());
synchronized (clientCache) {
- XceiverClientSpi info = getClient(pipeline);
+ XceiverClientSpi info = getClient(pipeline, containerID);
info.incrementReference();
return info;
}
@@ -139,11 +139,10 @@ public class XceiverClientManager implements Closeable {
}
}
- private XceiverClientSpi getClient(Pipeline pipeline)
+ private XceiverClientSpi getClient(Pipeline pipeline, long containerID)
throws IOException {
- String containerName = pipeline.getContainerName();
try {
- return clientCache.get(containerName,
+ return clientCache.get(containerID,
new Callable<XceiverClientSpi>() {
@Override
public XceiverClientSpi call() throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
index 8f30a7f..15d197c 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
@@ -86,15 +86,16 @@ public class ContainerOperationClient implements ScmClient {
* @inheritDoc
*/
@Override
- public Pipeline createContainer(String containerId, String owner)
+ public ContainerInfo createContainer(String owner)
throws IOException {
XceiverClientSpi client = null;
try {
- Pipeline pipeline =
+ ContainerInfo container =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), containerId, owner);
- client = xceiverClientManager.acquireClient(pipeline);
+ xceiverClientManager.getFactor(), owner);
+ Pipeline pipeline = container.getPipeline();
+ client = xceiverClientManager.acquireClient(pipeline, container.getContainerID());
// Allocated State means that SCM has allocated this pipeline in its
// namespace. The client needs to create the pipeline on the machines
@@ -104,10 +105,8 @@ public class ContainerOperationClient implements ScmClient {
if (pipeline.getLifeCycleState() == ALLOCATED) {
createPipeline(client, pipeline);
}
- // TODO : Container Client State needs to be updated.
- // TODO : Return ContainerInfo instead of Pipeline
- createContainer(containerId, client, pipeline);
- return pipeline;
+ createContainer(client, container.getContainerID());
+ return container;
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
@@ -118,20 +117,19 @@ public class ContainerOperationClient implements ScmClient {
/**
* Create a container over pipeline specified by the SCM.
*
- * @param containerId - Container ID
- * @param client - Client to communicate with Datanodes
- * @param pipeline - A pipeline that is already created.
+ * @param client - Client to communicate with Datanodes.
+ * @param containerId - Container ID.
* @throws IOException
*/
- public void createContainer(String containerId, XceiverClientSpi client,
- Pipeline pipeline) throws IOException {
+ public void createContainer(XceiverClientSpi client,
+ long containerId) throws IOException {
String traceID = UUID.randomUUID().toString();
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerId,
ObjectStageChangeRequestProto.Op.create,
ObjectStageChangeRequestProto.Stage.begin);
- ContainerProtocolCalls.createContainer(client, traceID);
+ ContainerProtocolCalls.createContainer(client, containerId, traceID);
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerId,
@@ -142,8 +140,8 @@ public class ContainerOperationClient implements ScmClient {
// creation state.
if (LOG.isDebugEnabled()) {
LOG.debug("Created container " + containerId
- + " leader:" + pipeline.getLeader()
- + " machines:" + pipeline.getMachines());
+ + " leader:" + client.getPipeline().getLeader()
+ + " machines:" + client.getPipeline().getMachines());
}
}
@@ -168,20 +166,25 @@ public class ContainerOperationClient implements ScmClient {
// 2. Talk to Datanodes to create the pipeline.
//
// 3. update SCM that pipeline creation was successful.
- storageContainerLocationClient.notifyObjectStageChange(
- ObjectStageChangeRequestProto.Type.pipeline,
- pipeline.getPipelineName(),
- ObjectStageChangeRequestProto.Op.create,
- ObjectStageChangeRequestProto.Stage.begin);
+
+ // TODO: this has not been fully implemented on server side
+ // SCMClientProtocolServer#notifyObjectStageChange
+ // TODO: when implement the pipeline state machine, change
+ // the pipeline name (string) to pipeline id (long)
+ //storageContainerLocationClient.notifyObjectStageChange(
+ // ObjectStageChangeRequestProto.Type.pipeline,
+ // pipeline.getPipelineName(),
+ // ObjectStageChangeRequestProto.Op.create,
+ // ObjectStageChangeRequestProto.Stage.begin);
client.createPipeline(pipeline.getPipelineName(),
pipeline.getMachines());
- storageContainerLocationClient.notifyObjectStageChange(
- ObjectStageChangeRequestProto.Type.pipeline,
- pipeline.getPipelineName(),
- ObjectStageChangeRequestProto.Op.create,
- ObjectStageChangeRequestProto.Stage.complete);
+ //storageContainerLocationClient.notifyObjectStageChange(
+ // ObjectStageChangeRequestProto.Type.pipeline,
+ // pipeline.getPipelineName(),
+ // ObjectStageChangeRequestProto.Op.create,
+ // ObjectStageChangeRequestProto.Stage.complete);
// TODO : Should we change the state on the client side ??
// That makes sense, but it is not needed for the client to work.
@@ -193,16 +196,17 @@ public class ContainerOperationClient implements ScmClient {
* @inheritDoc
*/
@Override
- public Pipeline createContainer(HddsProtos.ReplicationType type,
- HddsProtos.ReplicationFactor factor,
- String containerId, String owner) throws IOException {
+ public ContainerInfo createContainer(HddsProtos.ReplicationType type,
+ HddsProtos.ReplicationFactor factor, String owner) throws IOException {
XceiverClientSpi client = null;
try {
// allocate container on SCM.
- Pipeline pipeline =
+ ContainerInfo container =
storageContainerLocationClient.allocateContainer(type, factor,
- containerId, owner);
- client = xceiverClientManager.acquireClient(pipeline);
+ owner);
+ Pipeline pipeline = container.getPipeline();
+ client = xceiverClientManager.acquireClient(pipeline,
+ container.getContainerID());
// Allocated State means that SCM has allocated this pipeline in its
// namespace. The client needs to create the pipeline on the machines
@@ -210,12 +214,11 @@ public class ContainerOperationClient implements ScmClient {
if (pipeline.getLifeCycleState() == ALLOCATED) {
createPipeline(client, pipeline);
}
-
- // TODO : Return ContainerInfo instead of Pipeline
// connect to pipeline leader and allocate container on leader datanode.
- client = xceiverClientManager.acquireClient(pipeline);
- createContainer(containerId, client, pipeline);
- return pipeline;
+ client = xceiverClientManager.acquireClient(pipeline,
+ container.getContainerID());
+ createContainer(client, container.getContainerID());
+ return container;
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
@@ -258,18 +261,18 @@ public class ContainerOperationClient implements ScmClient {
* @throws IOException
*/
@Override
- public void deleteContainer(Pipeline pipeline, boolean force)
+ public void deleteContainer(long containerID, Pipeline pipeline, boolean force)
throws IOException {
XceiverClientSpi client = null;
try {
- client = xceiverClientManager.acquireClient(pipeline);
+ client = xceiverClientManager.acquireClient(pipeline, containerID);
String traceID = UUID.randomUUID().toString();
- ContainerProtocolCalls.deleteContainer(client, force, traceID);
+ ContainerProtocolCalls.deleteContainer(client, containerID, force, traceID);
storageContainerLocationClient
- .deleteContainer(pipeline.getContainerName());
+ .deleteContainer(containerID);
if (LOG.isDebugEnabled()) {
LOG.debug("Deleted container {}, leader: {}, machines: {} ",
- pipeline.getContainerName(),
+ containerID,
pipeline.getLeader(),
pipeline.getMachines());
}
@@ -284,11 +287,10 @@ public class ContainerOperationClient implements ScmClient {
* {@inheritDoc}
*/
@Override
- public List<ContainerInfo> listContainer(String startName,
- String prefixName, int count)
- throws IOException {
+ public List<ContainerInfo> listContainer(long startContainerID,
+ int count) throws IOException {
return storageContainerLocationClient.listContainer(
- startName, prefixName, count);
+ startContainerID, count);
}
/**
@@ -300,17 +302,17 @@ public class ContainerOperationClient implements ScmClient {
* @throws IOException
*/
@Override
- public ContainerData readContainer(Pipeline pipeline) throws IOException {
+ public ContainerData readContainer(long containerID,
+ Pipeline pipeline) throws IOException {
XceiverClientSpi client = null;
try {
- client = xceiverClientManager.acquireClient(pipeline);
+ client = xceiverClientManager.acquireClient(pipeline, containerID);
String traceID = UUID.randomUUID().toString();
ReadContainerResponseProto response =
- ContainerProtocolCalls.readContainer(client,
- pipeline.getContainerName(), traceID);
+ ContainerProtocolCalls.readContainer(client, containerID, traceID);
if (LOG.isDebugEnabled()) {
LOG.debug("Read container {}, leader: {}, machines: {} ",
- pipeline.getContainerName(),
+ containerID,
pipeline.getLeader(),
pipeline.getMachines());
}
@@ -329,7 +331,7 @@ public class ContainerOperationClient implements ScmClient {
* @throws IOException
*/
@Override
- public Pipeline getContainer(String containerId) throws
+ public ContainerInfo getContainer(long containerId) throws
IOException {
return storageContainerLocationClient.getContainer(containerId);
}
@@ -341,7 +343,8 @@ public class ContainerOperationClient implements ScmClient {
* @throws IOException
*/
@Override
- public void closeContainer(Pipeline pipeline) throws IOException {
+ public void closeContainer(long containerId, Pipeline pipeline)
+ throws IOException {
XceiverClientSpi client = null;
try {
LOG.debug("Close container {}", pipeline);
@@ -364,18 +367,16 @@ public class ContainerOperationClient implements ScmClient {
For now, take the #2 way.
*/
// Actually close the container on Datanode
- client = xceiverClientManager.acquireClient(pipeline);
+ client = xceiverClientManager.acquireClient(pipeline, containerId);
String traceID = UUID.randomUUID().toString();
- String containerId = pipeline.getContainerName();
-
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerId,
ObjectStageChangeRequestProto.Op.close,
ObjectStageChangeRequestProto.Stage.begin);
- ContainerProtocolCalls.closeContainer(client, traceID);
+ ContainerProtocolCalls.closeContainer(client, containerId, traceID);
// Notify SCM to close the container
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
@@ -391,13 +392,13 @@ public class ContainerOperationClient implements ScmClient {
/**
* Get the the current usage information.
- * @param pipeline - Pipeline
+ * @param containerID - ID of the container.
* @return the size of the given container.
* @throws IOException
*/
@Override
- public long getContainerSize(Pipeline pipeline) throws IOException {
- // TODO : Pipeline can be null, handle it correctly.
+ public long getContainerSize(long containerID) throws IOException {
+ // TODO : Fix this, it currently returns the capacity but not the current usage.
long size = getContainerSizeB();
if (size == -1) {
throw new IOException("Container size unknown!");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
index 9b8eaa9..c4c3362 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
.ReadChunkResponseProto;
+import org.apache.hadoop.hdds.client.BlockID;
import java.io.EOFException;
import java.io.IOException;
@@ -45,7 +46,7 @@ public class ChunkInputStream extends InputStream implements Seekable {
private static final int EOF = -1;
- private final String key;
+ private final BlockID blockID;
private final String traceID;
private XceiverClientManager xceiverClientManager;
private XceiverClientSpi xceiverClient;
@@ -58,15 +59,15 @@ public class ChunkInputStream extends InputStream implements Seekable {
/**
* Creates a new ChunkInputStream.
*
- * @param key chunk key
+ * @param blockID block ID of the chunk
* @param xceiverClientManager client manager that controls client
* @param xceiverClient client to perform container calls
* @param chunks list of chunks to read
* @param traceID container protocol call traceID
*/
- public ChunkInputStream(String key, XceiverClientManager xceiverClientManager,
+ public ChunkInputStream(BlockID blockID, XceiverClientManager xceiverClientManager,
XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID) {
- this.key = key;
+ this.blockID = blockID;
this.traceID = traceID;
this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClient;
@@ -196,7 +197,7 @@ public class ChunkInputStream extends InputStream implements Seekable {
final ReadChunkResponseProto readChunkResponse;
try {
readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient,
- chunks.get(chunkIndex), key, traceID);
+ chunks.get(chunkIndex), blockID, traceID);
} catch (IOException e) {
throw new IOException("Unexpected OzoneException: " + e.toString(), e);
}
@@ -211,7 +212,7 @@ public class ChunkInputStream extends InputStream implements Seekable {
|| pos >= chunkOffset[chunks.size() - 1] + chunks.get(chunks.size() - 1)
.getLen()) {
throw new EOFException(
- "EOF encountered pos: " + pos + " container key: " + key);
+ "EOF encountered pos: " + pos + " container key: " + blockID.getLocalID());
}
if (chunkIndex == -1) {
chunkIndex = Arrays.binarySearch(chunkOffset, pos);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
index b65df9f..325f110 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.KeyData;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.KeyValue;
+import org.apache.hadoop.hdds.client.BlockID;
import java.io.IOException;
import java.io.OutputStream;
@@ -53,7 +54,7 @@ import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
*/
public class ChunkOutputStream extends OutputStream {
- private final String containerKey;
+ private final BlockID blockID;
private final String key;
private final String traceID;
private final KeyData.Builder containerKeyData;
@@ -67,25 +68,24 @@ public class ChunkOutputStream extends OutputStream {
/**
* Creates a new ChunkOutputStream.
*
- * @param containerKey container key
+ * @param blockID block ID
* @param key chunk key
* @param xceiverClientManager client manager that controls client
* @param xceiverClient client to perform container calls
* @param traceID container protocol call args
* @param chunkSize chunk size
*/
- public ChunkOutputStream(String containerKey, String key,
- XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
- String traceID, int chunkSize) {
- this.containerKey = containerKey;
+ public ChunkOutputStream(BlockID blockID, String key,
+ XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
+ String traceID, int chunkSize) {
+ this.blockID = blockID;
this.key = key;
this.traceID = traceID;
this.chunkSize = chunkSize;
KeyValue keyValue = KeyValue.newBuilder()
.setKey("TYPE").setValue("KEY").build();
this.containerKeyData = KeyData.newBuilder()
- .setContainerName(xceiverClient.getPipeline().getContainerName())
- .setName(containerKey)
+ .setBlockID(blockID.getProtobuf())
.addMetadata(keyValue);
this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClient;
@@ -217,7 +217,7 @@ public class ChunkOutputStream extends OutputStream {
.setLen(data.size())
.build();
try {
- writeChunk(xceiverClient, chunk, key, data, traceID);
+ writeChunk(xceiverClient, chunk, blockID, data, traceID);
} catch (IOException e) {
throw new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java
new file mode 100644
index 0000000..7236af7
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.client;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
+/**
+ * BlockID of ozone (containerID + localID)
+ */
+public class BlockID {
+ private long containerID;
+ private long localID;
+
+ public BlockID(long containerID, long localID) {
+ this.containerID = containerID;
+ this.localID = localID;
+ }
+
+ public long getContainerID() {
+ return containerID;
+ }
+
+ public long getLocalID() {
+ return localID;
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this).
+ append("containerID", containerID).
+ append("localID", localID).
+ toString();
+ }
+
+ public HddsProtos.BlockID getProtobuf() {
+ return HddsProtos.BlockID.newBuilder().
+ setContainerID(containerID).setLocalID(localID).build();
+ }
+
+ public static BlockID getFromProtobuf(HddsProtos.BlockID blockID) {
+ return new BlockID(blockID.getContainerID(),
+ blockID.getLocalID());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index 0d4a299..dcf9fed 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -41,78 +41,76 @@ import java.util.List;
public interface ScmClient {
/**
* Creates a Container on SCM and returns the pipeline.
- * @param containerId - String container ID
- * @return Pipeline
+ * @return ContainerInfo
* @throws IOException
*/
- Pipeline createContainer(String containerId, String owner) throws IOException;
+ ContainerInfo createContainer(String owner) throws IOException;
/**
* Gets a container by Name -- Throws if the container does not exist.
- * @param containerId - String Container ID
+ * @param containerId - Container ID
* @return Pipeline
* @throws IOException
*/
- Pipeline getContainer(String containerId) throws IOException;
+ ContainerInfo getContainer(long containerId) throws IOException;
/**
- * Close a container by name.
+ * Close a container.
*
- * @param pipeline the container to be closed.
+ * @param containerId - ID of the container.
+ * @param pipeline - Pipeline where the container is located.
* @throws IOException
*/
- void closeContainer(Pipeline pipeline) throws IOException;
+ void closeContainer(long containerId, Pipeline pipeline) throws IOException;
/**
* Deletes an existing container.
+ * @param containerId - ID of the container.
* @param pipeline - Pipeline that represents the container.
* @param force - true to forcibly delete the container.
* @throws IOException
*/
- void deleteContainer(Pipeline pipeline, boolean force) throws IOException;
+ void deleteContainer(long containerId, Pipeline pipeline, boolean force) throws IOException;
/**
* Lists a range of containers and get their info.
*
- * @param startName start name, if null, start searching at the head.
- * @param prefixName prefix name, if null, then filter is disabled.
- * @param count count, if count < 0, the max size is unlimited.(
- * Usually the count will be replace with a very big
- * value instead of being unlimited in case the db is very big)
+ * @param startContainerID start containerID.
+ * @param count count must be > 0.
*
* @return a list of pipeline.
* @throws IOException
*/
- List<ContainerInfo> listContainer(String startName, String prefixName,
+ List<ContainerInfo> listContainer(long startContainerID,
int count) throws IOException;
/**
* Read meta data from an existing container.
- * @param pipeline - Pipeline that represents the container.
+ * @param containerID - ID of the container.
+ * @param pipeline - Pipeline where the container is located.
* @return ContainerInfo
* @throws IOException
*/
- ContainerData readContainer(Pipeline pipeline) throws IOException;
-
+ ContainerData readContainer(long containerID, Pipeline pipeline)
+ throws IOException;
/**
* Gets the container size -- Computed by SCM from Container Reports.
- * @param pipeline - Pipeline
+ * @param containerID - ID of the container.
* @return number of bytes used by this container.
* @throws IOException
*/
- long getContainerSize(Pipeline pipeline) throws IOException;
+ long getContainerSize(long containerID) throws IOException;
/**
* Creates a Container on SCM and returns the pipeline.
* @param type - Replication Type.
* @param replicationFactor - Replication Factor
- * @param containerId - Container ID
- * @return Pipeline
+ * @return ContainerInfo
* @throws IOException - in case of error.
*/
- Pipeline createContainer(HddsProtos.ReplicationType type,
- HddsProtos.ReplicationFactor replicationFactor, String containerId,
+ ContainerInfo createContainer(HddsProtos.ReplicationType type,
+ HddsProtos.ReplicationFactor replicationFactor,
String owner) throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java
index d253b15..9b89469 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java
@@ -18,13 +18,15 @@
package org.apache.hadoop.hdds.scm.container.common.helpers;
+import org.apache.hadoop.hdds.client.BlockID;
+
/**
* Allocated block wraps the result returned from SCM#allocateBlock which
* contains a Pipeline and the key.
*/
public final class AllocatedBlock {
private Pipeline pipeline;
- private String key;
+ private BlockID blockID;
// Indicates whether the client should create container before writing block.
private boolean shouldCreateContainer;
@@ -33,7 +35,7 @@ public final class AllocatedBlock {
*/
public static class Builder {
private Pipeline pipeline;
- private String key;
+ private BlockID blockID;
private boolean shouldCreateContainer;
public Builder setPipeline(Pipeline p) {
@@ -41,8 +43,8 @@ public final class AllocatedBlock {
return this;
}
- public Builder setKey(String k) {
- this.key = k;
+ public Builder setBlockID(BlockID blockID) {
+ this.blockID = blockID;
return this;
}
@@ -52,14 +54,14 @@ public final class AllocatedBlock {
}
public AllocatedBlock build() {
- return new AllocatedBlock(pipeline, key, shouldCreateContainer);
+ return new AllocatedBlock(pipeline, blockID, shouldCreateContainer);
}
}
- private AllocatedBlock(Pipeline pipeline, String key,
+ private AllocatedBlock(Pipeline pipeline, BlockID blockID,
boolean shouldCreateContainer) {
this.pipeline = pipeline;
- this.key = key;
+ this.blockID = blockID;
this.shouldCreateContainer = shouldCreateContainer;
}
@@ -67,8 +69,8 @@ public final class AllocatedBlock {
return pipeline;
}
- public String getKey() {
- return key;
+ public BlockID getBlockID() {
+ return blockID;
}
public boolean getCreateContainer() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
index 823a7fb..0bd4c26 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
@@ -43,11 +43,9 @@ public class ContainerInfo
// The wall-clock ms since the epoch at which the current state enters.
private long stateEnterTime;
private String owner;
- private String containerName;
private long containerID;
ContainerInfo(
long containerID,
- final String containerName,
HddsProtos.LifeCycleState state,
Pipeline pipeline,
long allocatedBytes,
@@ -56,7 +54,6 @@ public class ContainerInfo
long stateEnterTime,
String owner) {
this.containerID = containerID;
- this.containerName = containerName;
this.pipeline = pipeline;
this.allocatedBytes = allocatedBytes;
this.usedBytes = usedBytes;
@@ -82,7 +79,6 @@ public class ContainerInfo
builder.setState(info.getState());
builder.setStateEnterTime(info.getStateEnterTime());
builder.setOwner(info.getOwner());
- builder.setContainerName(info.getContainerName());
builder.setContainerID(info.getContainerID());
return builder.build();
}
@@ -91,10 +87,6 @@ public class ContainerInfo
return containerID;
}
- public String getContainerName() {
- return containerName;
- }
-
public HddsProtos.LifeCycleState getState() {
return state;
}
@@ -170,7 +162,6 @@ public class ContainerInfo
if (getOwner() != null) {
builder.setOwner(getOwner());
}
- builder.setContainerName(getContainerName());
return builder.build();
}
@@ -189,7 +180,6 @@ public class ContainerInfo
+ ", pipeline=" + pipeline
+ ", stateEnterTime=" + stateEnterTime
+ ", owner=" + owner
- + ", containerName='" + containerName
+ '}';
}
@@ -206,7 +196,7 @@ public class ContainerInfo
ContainerInfo that = (ContainerInfo) o;
return new EqualsBuilder()
- .append(pipeline.getContainerName(), that.pipeline.getContainerName())
+ .append(getContainerID(), that.getContainerID())
// TODO : Fix this later. If we add these factors some tests fail.
// So Commenting this to continue and will enforce this with
@@ -221,7 +211,7 @@ public class ContainerInfo
@Override
public int hashCode() {
return new HashCodeBuilder(11, 811)
- .append(pipeline.getContainerName())
+ .append(getContainerID())
.append(pipeline.getFactor())
.append(pipeline.getType())
.append(owner)
@@ -275,7 +265,6 @@ public class ContainerInfo
private long keys;
private long stateEnterTime;
private String owner;
- private String containerName;
private long containerID;
public Builder setContainerID(long id) {
@@ -319,14 +308,9 @@ public class ContainerInfo
return this;
}
- public Builder setContainerName(String container) {
- this.containerName = container;
- return this;
- }
-
public ContainerInfo build() {
return new
- ContainerInfo(containerID, containerName, state, pipeline,
+ ContainerInfo(containerID, state, pipeline,
allocated, used, keys, stateEnterTime, owner);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/DeleteBlockResult.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/DeleteBlockResult.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/DeleteBlockResult.java
index fd97eae..5f5aace 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/DeleteBlockResult.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/DeleteBlockResult.java
@@ -17,6 +17,8 @@
package org.apache.hadoop.hdds.scm.container.common.helpers;
+import org.apache.hadoop.hdds.client.BlockID;
+
import static org.apache.hadoop.hdds.protocol.proto
.ScmBlockLocationProtocolProtos.DeleteScmBlockResult;
@@ -24,21 +26,21 @@ import static org.apache.hadoop.hdds.protocol.proto
* Class wraps storage container manager block deletion results.
*/
public class DeleteBlockResult {
- private String key;
+ private BlockID blockID;
private DeleteScmBlockResult.Result result;
- public DeleteBlockResult(final String key,
+ public DeleteBlockResult(final BlockID blockID,
final DeleteScmBlockResult.Result result) {
- this.key = key;
+ this.blockID = blockID;
this.result = result;
}
/**
- * Get key deleted.
- * @return key name.
+ * Get block id deleted.
+ * @return block id.
*/
- public String getKey() {
- return key;
+ public BlockID getBlockID() {
+ return blockID;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
index 32d0a2d..8740838 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
@@ -57,7 +57,6 @@ public class Pipeline {
WRITER = mapper.writer(filters);
}
- private String containerName;
private PipelineChannel pipelineChannel;
/**
* Allows you to maintain private data on pipelines. This is not serialized
@@ -68,11 +67,9 @@ public class Pipeline {
/**
* Constructs a new pipeline data structure.
*
- * @param containerName - Container
* @param pipelineChannel - transport information for this container
*/
- public Pipeline(String containerName, PipelineChannel pipelineChannel) {
- this.containerName = containerName;
+ public Pipeline(PipelineChannel pipelineChannel) {
this.pipelineChannel = pipelineChannel;
data = null;
}
@@ -87,7 +84,7 @@ public class Pipeline {
Preconditions.checkNotNull(pipeline);
PipelineChannel pipelineChannel =
PipelineChannel.getFromProtoBuf(pipeline.getPipelineChannel());
- return new Pipeline(pipeline.getContainerName(), pipelineChannel);
+ return new Pipeline(pipelineChannel);
}
public HddsProtos.ReplicationFactor getFactor() {
@@ -146,21 +143,11 @@ public class Pipeline {
public HddsProtos.Pipeline getProtobufMessage() {
HddsProtos.Pipeline.Builder builder =
HddsProtos.Pipeline.newBuilder();
- builder.setContainerName(this.containerName);
builder.setPipelineChannel(this.pipelineChannel.getProtobufMessage());
return builder.build();
}
/**
- * Returns containerName if available.
- *
- * @return String.
- */
- public String getContainerName() {
- return containerName;
- }
-
- /**
* Returns private data that is set on this pipeline.
*
* @return blob, the user can interpret it any way they like.
@@ -223,7 +210,6 @@ public class Pipeline {
pipelineChannel.getDatanodes().keySet().stream()
.forEach(id -> b.
append(id.endsWith(pipelineChannel.getLeaderID()) ? "*" + id : id));
- b.append("] container:").append(containerName);
b.append(" name:").append(getPipelineName());
if (getType() != null) {
b.append(" type:").append(getType().toString());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineChannel.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineChannel.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineChannel.java
index ebd52e9..655751d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineChannel.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineChannel.java
@@ -40,6 +40,8 @@ public class PipelineChannel {
private ReplicationType type;
private ReplicationFactor factor;
private String name;
+ // TODO: change to long based id
+ //private long id;
public PipelineChannel(String leaderID, LifeCycleState lifeCycleState,
ReplicationType replicationType, ReplicationFactor replicationFactor,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
index f100fc7..c8d4a80 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
import java.io.IOException;
import java.util.List;
-import java.util.Set;
/**
* ScmBlockLocationProtocol is used by an HDFS node to find the set of nodes
@@ -35,17 +34,6 @@ import java.util.Set;
public interface ScmBlockLocationProtocol {
/**
- * Find the set of nodes to read/write a block, as
- * identified by the block key. This method supports batch lookup by
- * passing multiple keys.
- *
- * @param keys batch of block keys to find
- * @return allocated blocks for each block key
- * @throws IOException if there is any failure
- */
- Set<AllocatedBlock> getBlockLocations(Set<String> keys) throws IOException;
-
- /**
* Asks SCM where a block should be allocated. SCM responds with the
* set of datanodes that should be used creating this block.
* @param size - size of the block.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index a60fbb2..e8d85e0 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -38,19 +38,20 @@ public interface StorageContainerLocationProtocol {
* set of datanodes that should be used creating this container.
*
*/
- Pipeline allocateContainer(HddsProtos.ReplicationType replicationType,
- HddsProtos.ReplicationFactor factor, String containerName, String owner)
+ ContainerInfo allocateContainer(HddsProtos.ReplicationType replicationType,
+ HddsProtos.ReplicationFactor factor, String owner)
throws IOException;
/**
* Ask SCM the location of the container. SCM responds with a group of
* nodes where this container and its replicas are located.
*
- * @param containerName - Name of the container.
- * @return Pipeline - the pipeline where container locates.
+ * @param containerID - ID of the container.
+ * @return ContainerInfo - the container info such as where the pipeline
+ * is located.
* @throws IOException
*/
- Pipeline getContainer(String containerName) throws IOException;
+ ContainerInfo getContainer(long containerID) throws IOException;
/**
* Ask SCM a list of containers with a range of container names
@@ -59,8 +60,7 @@ public interface StorageContainerLocationProtocol {
* use prefix name to filter the result. the max size of the
* searching range cannot exceed the value of count.
*
- * @param startName start name, if null, start searching at the head.
- * @param prefixName prefix name, if null, then filter is disabled.
+ * @param startContainerID start container ID.
* @param count count, if count < 0, the max size is unlimited.(
* Usually the count will be replace with a very big
* value instead of being unlimited in case the db is very big)
@@ -68,18 +68,18 @@ public interface StorageContainerLocationProtocol {
* @return a list of container.
* @throws IOException
*/
- List<ContainerInfo> listContainer(String startName, String prefixName,
- int count) throws IOException;
+ List<ContainerInfo> listContainer(long startContainerID, int count)
+ throws IOException;
/**
* Deletes a container in SCM.
*
- * @param containerName
+ * @param containerID
* @throws IOException
* if failed to delete the container mapping from db store
* or container doesn't exist.
*/
- void deleteContainer(String containerName) throws IOException;
+ void deleteContainer(long containerID) throws IOException;
/**
* Queries a list of Node Statuses.
@@ -94,12 +94,12 @@ public interface StorageContainerLocationProtocol {
* or containers on datanodes.
* Container will be in Operational state after that.
* @param type object type
- * @param name object name
+ * @param id object id
* @param op operation type (e.g., create, close, delete)
* @param stage creation stage
*/
void notifyObjectStageChange(
- ObjectStageChangeRequestProto.Type type, String name,
+ ObjectStageChangeRequestProto.Type type, long id,
ObjectStageChangeRequestProto.Op op,
ObjectStageChangeRequestProto.Stage stage) throws IOException;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
index 0012f3e..aed0fb7 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -17,10 +17,10 @@
package org.apache.hadoop.hdds.scm.protocolPB;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
@@ -35,13 +35,7 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.DeleteScmKeyBlocksResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
- .GetScmBlockLocationsRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
- .GetScmBlockLocationsResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.KeyBlocks;
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
- .ScmLocatedBlockProto;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
@@ -52,7 +46,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -82,41 +75,6 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
}
/**
- * Find the set of nodes to read/write a block, as
- * identified by the block key. This method supports batch lookup by
- * passing multiple keys.
- *
- * @param keys batch of block keys to find
- * @return allocated blocks for each block key
- * @throws IOException if there is any failure
- */
- @Override
- public Set<AllocatedBlock> getBlockLocations(Set<String> keys)
- throws IOException {
- GetScmBlockLocationsRequestProto.Builder req =
- GetScmBlockLocationsRequestProto.newBuilder();
- for (String key : keys) {
- req.addKeys(key);
- }
- final GetScmBlockLocationsResponseProto resp;
- try {
- resp = rpcProxy.getScmBlockLocations(NULL_RPC_CONTROLLER,
- req.build());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
- Set<AllocatedBlock> locatedBlocks =
- Sets.newLinkedHashSetWithExpectedSize(resp.getLocatedBlocksCount());
- for (ScmLocatedBlockProto locatedBlock : resp.getLocatedBlocksList()) {
- locatedBlocks.add(new AllocatedBlock.Builder()
- .setKey(locatedBlock.getKey())
- .setPipeline(Pipeline.getFromProtoBuf(locatedBlock.getPipeline()))
- .build());
- }
- return locatedBlocks;
- }
-
- /**
* Asks SCM where a block should be allocated. SCM responds with the
* set of datanodes that should be used creating this block.
* @param size - size of the block.
@@ -144,7 +102,7 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
response.getErrorMessage() : "Allocate block failed.");
}
AllocatedBlock.Builder builder = new AllocatedBlock.Builder()
- .setKey(response.getKey())
+ .setBlockID(BlockID.getFromProtobuf(response.getBlockID()))
.setPipeline(Pipeline.getFromProtoBuf(response.getPipeline()))
.setShouldCreateContainer(response.getCreateContainer());
return builder.build();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 3638f63..bba4e17 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -17,7 +17,6 @@
package org.apache.hadoop.hdds.scm.protocolPB;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -92,20 +91,14 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
* supports replication factor of either 1 or 3.
* @param type - Replication Type
* @param factor - Replication Count
- * @param containerName - Name
* @return
* @throws IOException
*/
@Override
- public Pipeline allocateContainer(HddsProtos.ReplicationType type,
- HddsProtos.ReplicationFactor factor, String
- containerName, String owner) throws IOException {
+ public ContainerInfo allocateContainer(HddsProtos.ReplicationType type,
+ HddsProtos.ReplicationFactor factor, String owner) throws IOException {
- Preconditions.checkNotNull(containerName, "Container Name cannot be Null");
- Preconditions.checkState(!containerName.isEmpty(), "Container name cannot" +
- " be empty");
ContainerRequestProto request = ContainerRequestProto.newBuilder()
- .setContainerName(containerName)
.setReplicationFactor(factor)
.setReplicationType(type)
.setOwner(owner)
@@ -121,22 +114,20 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
throw new IOException(response.hasErrorMessage() ?
response.getErrorMessage() : "Allocate container failed.");
}
- return Pipeline.getFromProtoBuf(response.getPipeline());
+ return ContainerInfo.fromProtobuf(response.getContainerInfo());
}
- public Pipeline getContainer(String containerName) throws IOException {
- Preconditions.checkNotNull(containerName,
- "Container Name cannot be Null");
- Preconditions.checkState(!containerName.isEmpty(),
- "Container name cannot be empty");
+ public ContainerInfo getContainer(long containerID) throws IOException {
+ Preconditions.checkState(containerID >= 0,
+ "Container ID cannot be negative");
GetContainerRequestProto request = GetContainerRequestProto
.newBuilder()
- .setContainerName(containerName)
+ .setContainerID(containerID)
.build();
try {
GetContainerResponseProto response =
rpcProxy.getContainer(NULL_RPC_CONTROLLER, request);
- return Pipeline.getFromProtoBuf(response.getPipeline());
+ return ContainerInfo.fromProtobuf(response.getContainerInfo());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@@ -146,16 +137,15 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
* {@inheritDoc}
*/
@Override
- public List<ContainerInfo> listContainer(String startName, String prefixName,
- int count) throws IOException {
+ public List<ContainerInfo> listContainer(long startContainerID, int count)
+ throws IOException {
+ Preconditions.checkState(startContainerID >= 0,
+ "Container ID cannot be negative.");
+ Preconditions.checkState(count > 0,
+ "Container count must be greater than 0.");
SCMListContainerRequestProto.Builder builder = SCMListContainerRequestProto
.newBuilder();
- if (prefixName != null) {
- builder.setPrefixName(prefixName);
- }
- if (startName != null) {
- builder.setStartName(startName);
- }
+ builder.setStartContainerID(startContainerID);
builder.setCount(count);
SCMListContainerRequestProto request = builder.build();
@@ -177,17 +167,17 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
* Ask SCM to delete a container by name. SCM will remove
* the container mapping in its database.
*
- * @param containerName
+ * @param containerID
* @throws IOException
*/
@Override
- public void deleteContainer(String containerName)
+ public void deleteContainer(long containerID)
throws IOException {
- Preconditions.checkState(!Strings.isNullOrEmpty(containerName),
- "Container name cannot be null or empty");
+ Preconditions.checkState(containerID >= 0,
+ "Container ID cannot be negative");
SCMDeleteContainerRequestProto request = SCMDeleteContainerRequestProto
.newBuilder()
- .setContainerName(containerName)
+ .setContainerID(containerID)
.build();
try {
rpcProxy.deleteContainer(NULL_RPC_CONTROLLER, request);
@@ -226,21 +216,21 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
/**
* Notify from client that creates object on datanodes.
* @param type object type
- * @param name object name
+ * @param id object id
* @param op operation type (e.g., create, close, delete)
* @param stage object creation stage : begin/complete
*/
@Override
public void notifyObjectStageChange(
- ObjectStageChangeRequestProto.Type type, String name,
+ ObjectStageChangeRequestProto.Type type, long id,
ObjectStageChangeRequestProto.Op op,
ObjectStageChangeRequestProto.Stage stage) throws IOException {
- Preconditions.checkState(!Strings.isNullOrEmpty(name),
- "Object name cannot be null or empty");
+ Preconditions.checkState(id >= 0,
+ "Object id cannot be negative.");
ObjectStageChangeRequestProto request =
ObjectStageChangeRequestProto.newBuilder()
.setType(type)
- .setName(name)
+ .setId(id)
.setOp(op)
.setStage(stage)
.build();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 1559816..970e932 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
.WriteChunkRequestProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.KeyValue;
+import org.apache.hadoop.hdds.client.BlockID;
import java.io.IOException;
@@ -79,7 +80,6 @@ public final class ContainerProtocolCalls {
KeyData containerKeyData, String traceID) throws IOException {
GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto
.newBuilder()
- .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
.setKeyData(containerKeyData);
String id = xceiverClient.getPipeline().getLeader().getUuidString();
ContainerCommandRequestProto request = ContainerCommandRequestProto
@@ -106,7 +106,6 @@ public final class ContainerProtocolCalls {
KeyData containerKeyData, String traceID) throws IOException {
PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto
.newBuilder()
- .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
.setKeyData(containerKeyData);
String id = xceiverClient.getPipeline().getLeader().getUuidString();
ContainerCommandRequestProto request = ContainerCommandRequestProto
@@ -125,18 +124,16 @@ public final class ContainerProtocolCalls {
*
* @param xceiverClient client to perform call
* @param chunk information about chunk to read
- * @param key the key name
+ * @param blockID ID of the block
* @param traceID container protocol call args
* @return container protocol read chunk response
* @throws IOException if there is an I/O error while performing the call
*/
public static ReadChunkResponseProto readChunk(XceiverClientSpi xceiverClient,
- ChunkInfo chunk, String key, String traceID)
- throws IOException {
+ ChunkInfo chunk, BlockID blockID, String traceID) throws IOException {
ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto
.newBuilder()
- .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
- .setKeyName(key)
+ .setBlockID(blockID.getProtobuf())
.setChunkData(chunk);
String id = xceiverClient.getPipeline().getLeader().getUuidString();
ContainerCommandRequestProto request = ContainerCommandRequestProto
@@ -156,18 +153,17 @@ public final class ContainerProtocolCalls {
*
* @param xceiverClient client to perform call
* @param chunk information about chunk to write
- * @param key the key name
+ * @param blockID ID of the block
* @param data the data of the chunk to write
* @param traceID container protocol call args
* @throws IOException if there is an I/O error while performing the call
*/
public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk,
- String key, ByteString data, String traceID)
+ BlockID blockID, ByteString data, String traceID)
throws IOException {
WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
.newBuilder()
- .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
- .setKeyName(key)
+ .setBlockID(blockID.getProtobuf())
.setChunkData(chunk)
.setData(data);
String id = xceiverClient.getPipeline().getLeader().getUuidString();
@@ -189,30 +185,29 @@ public final class ContainerProtocolCalls {
* than 1 MB.
*
* @param client - client that communicates with the container.
- * @param containerName - Name of the container
- * @param key - Name of the Key
+ * @param blockID - ID of the block
* @param data - Data to be written into the container.
* @param traceID - Trace ID for logging purpose.
* @throws IOException
*/
public static void writeSmallFile(XceiverClientSpi client,
- String containerName, String key, byte[] data, String traceID)
+ BlockID blockID, byte[] data, String traceID)
throws IOException {
KeyData containerKeyData =
- KeyData.newBuilder().setContainerName(containerName).setName(key)
+ KeyData.newBuilder().setBlockID(blockID.getProtobuf())
.build();
PutKeyRequestProto.Builder createKeyRequest =
PutKeyRequestProto.newBuilder()
- .setPipeline(client.getPipeline().getProtobufMessage())
.setKeyData(containerKeyData);
KeyValue keyValue =
KeyValue.newBuilder().setKey("OverWriteRequested").setValue("true")
.build();
ChunkInfo chunk =
- ChunkInfo.newBuilder().setChunkName(key + "_chunk").setOffset(0)
- .setLen(data.length).addMetadata(keyValue).build();
+ ChunkInfo.newBuilder().setChunkName(blockID.getLocalID()
+ + "_chunk").setOffset(0).setLen(data.length).
+ addMetadata(keyValue).build();
PutSmallFileRequestProto putSmallFileRequest =
PutSmallFileRequestProto.newBuilder().setChunkInfo(chunk)
@@ -234,17 +229,18 @@ public final class ContainerProtocolCalls {
/**
* createContainer call that creates a container on the datanode.
* @param client - client
+ * @param containerID - ID of container
* @param traceID - traceID
* @throws IOException
*/
- public static void createContainer(XceiverClientSpi client, String traceID)
- throws IOException {
+ public static void createContainer(XceiverClientSpi client, long containerID,
+ String traceID) throws IOException {
ContainerProtos.CreateContainerRequestProto.Builder createRequest =
ContainerProtos.CreateContainerRequestProto
.newBuilder();
ContainerProtos.ContainerData.Builder containerData = ContainerProtos
.ContainerData.newBuilder();
- containerData.setName(client.getPipeline().getContainerName());
+ containerData.setContainerID(containerID);
createRequest.setPipeline(client.getPipeline().getProtobufMessage());
createRequest.setContainerData(containerData.build());
@@ -268,12 +264,11 @@ public final class ContainerProtocolCalls {
* @param traceID
* @throws IOException
*/
- public static void deleteContainer(XceiverClientSpi client,
+ public static void deleteContainer(XceiverClientSpi client, long containerID,
boolean force, String traceID) throws IOException {
ContainerProtos.DeleteContainerRequestProto.Builder deleteRequest =
ContainerProtos.DeleteContainerRequestProto.newBuilder();
- deleteRequest.setName(client.getPipeline().getContainerName());
- deleteRequest.setPipeline(client.getPipeline().getProtobufMessage());
+ deleteRequest.setContainerID(containerID);
deleteRequest.setForceDelete(force);
String id = client.getPipeline().getLeader().getUuidString();
ContainerCommandRequestProto.Builder request =
@@ -291,14 +286,15 @@ public final class ContainerProtocolCalls {
* Close a container.
*
* @param client
+ * @param containerID
* @param traceID
* @throws IOException
*/
- public static void closeContainer(XceiverClientSpi client, String traceID)
- throws IOException {
+ public static void closeContainer(XceiverClientSpi client,
+ long containerID, String traceID) throws IOException {
ContainerProtos.CloseContainerRequestProto.Builder closeRequest =
ContainerProtos.CloseContainerRequestProto.newBuilder();
- closeRequest.setPipeline(client.getPipeline().getProtobufMessage());
+ closeRequest.setContainerID(containerID);
String id = client.getPipeline().getLeader().getUuidString();
ContainerCommandRequestProto.Builder request =
@@ -320,11 +316,11 @@ public final class ContainerProtocolCalls {
* @throws IOException
*/
public static ReadContainerResponseProto readContainer(
- XceiverClientSpi client, String containerName,
+ XceiverClientSpi client, long containerID,
String traceID) throws IOException {
ReadContainerRequestProto.Builder readRequest =
ReadContainerRequestProto.newBuilder();
- readRequest.setName(containerName);
+ readRequest.setContainerID(containerID);
readRequest.setPipeline(client.getPipeline().getProtobufMessage());
String id = client.getPipeline().getLeader().getUuidString();
ContainerCommandRequestProto.Builder request =
@@ -340,25 +336,23 @@ public final class ContainerProtocolCalls {
}
/**
- * Reads the data given the container name and key.
+ * Reads the data given the blockID
*
* @param client
- * @param containerName - name of the container
- * @param key - key
+ * @param blockID - ID of the block
* @param traceID - trace ID
* @return GetSmallFileResponseProto
* @throws IOException
*/
public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client,
- String containerName, String key, String traceID) throws IOException {
+ BlockID blockID, String traceID) throws IOException {
KeyData containerKeyData = KeyData
.newBuilder()
- .setContainerName(containerName)
- .setName(key).build();
+ .setBlockID(blockID.getProtobuf())
+ .build();
GetKeyRequestProto.Builder getKey = GetKeyRequestProto
.newBuilder()
- .setPipeline(client.getPipeline().getProtobufMessage())
.setKeyData(containerKeyData);
ContainerProtos.GetSmallFileRequestProto getSmallFileRequest =
GetSmallFileRequestProto
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
index 38ce6cc..7a5403f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
@@ -17,9 +17,12 @@
package org.apache.hadoop.ozone.common;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.KeyBlocks;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -28,13 +31,13 @@ import java.util.List;
public final class BlockGroup {
private String groupID;
- private List<String> blockIDs;
- private BlockGroup(String groupID, List<String> blockIDs) {
+ private List<BlockID> blockIDs;
+ private BlockGroup(String groupID, List<BlockID> blockIDs) {
this.groupID = groupID;
this.blockIDs = blockIDs;
}
- public List<String> getBlockIDList() {
+ public List<BlockID> getBlockIDList() {
return blockIDs;
}
@@ -43,8 +46,11 @@ public final class BlockGroup {
}
public KeyBlocks getProto() {
- return KeyBlocks.newBuilder().setKey(groupID)
- .addAllBlocks(blockIDs).build();
+ KeyBlocks.Builder kbb = KeyBlocks.newBuilder();
+ for (BlockID block : blockIDs) {
+ kbb.addBlocks(block.getProtobuf());
+ }
+ return kbb.setKey(groupID).build();
}
/**
@@ -53,8 +59,12 @@ public final class BlockGroup {
* @return a group of blocks.
*/
public static BlockGroup getFromProto(KeyBlocks proto) {
+ List<BlockID> blockIDs = new ArrayList<>();
+ for (HddsProtos.BlockID block : proto.getBlocksList()) {
+ blockIDs.add(new BlockID(block.getContainerID(), block.getLocalID()));
+ }
return BlockGroup.newBuilder().setKeyName(proto.getKey())
- .addAllBlockIDs(proto.getBlocksList()).build();
+ .addAllBlockIDs(blockIDs).build();
}
public static Builder newBuilder() {
@@ -67,14 +77,14 @@ public final class BlockGroup {
public static class Builder {
private String groupID;
- private List<String> blockIDs;
+ private List<BlockID> blockIDs;
public Builder setKeyName(String blockGroupID) {
this.groupID = blockGroupID;
return this;
}
- public Builder addAllBlockIDs(List<String> keyBlocks) {
+ public Builder addAllBlockIDs(List<BlockID> keyBlocks) {
this.blockIDs = keyBlocks;
return this;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/DeleteBlockGroupResult.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/DeleteBlockGroupResult.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/DeleteBlockGroupResult.java
index ec54ac5..892b695 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/DeleteBlockGroupResult.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/DeleteBlockGroupResult.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.common;
+import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.DeleteScmBlockResult;
@@ -52,7 +53,7 @@ public class DeleteBlockGroupResult {
new ArrayList<>(blockResultList.size());
for (DeleteBlockResult result : blockResultList) {
DeleteScmBlockResult proto = DeleteScmBlockResult.newBuilder()
- .setKey(result.getKey())
+ .setBlockID(result.getBlockID().getProtobuf())
.setResult(result.getResult()).build();
resultProtoList.add(proto);
}
@@ -63,8 +64,8 @@ public class DeleteBlockGroupResult {
List<DeleteScmBlockResult> results) {
List<DeleteBlockResult> protoResults = new ArrayList<>(results.size());
for (DeleteScmBlockResult result : results) {
- protoResults.add(new DeleteBlockResult(result.getKey(),
- result.getResult()));
+ protoResults.add(new DeleteBlockResult(BlockID.getFromProtobuf(
+ result.getBlockID()), result.getResult()));
}
return protoResults;
}
@@ -87,10 +88,10 @@ public class DeleteBlockGroupResult {
/**
* @return A list of deletion failed block IDs.
*/
- public List<String> getFailedBlocks() {
- List<String> failedBlocks = blockResultList.stream()
+ public List<BlockID> getFailedBlocks() {
+ List<BlockID> failedBlocks = blockResultList.stream()
.filter(result -> result.getResult() != Result.success)
- .map(DeleteBlockResult::getKey).collect(Collectors.toList());
+ .map(DeleteBlockResult::getBlockID).collect(Collectors.toList());
return failedBlocks;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java
index be546c7..c3de5ed 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.helpers;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.client.BlockID;
import java.io.IOException;
import java.util.Collections;
@@ -30,8 +31,7 @@ import java.util.TreeMap;
* Helper class to convert Protobuf to Java classes.
*/
public class KeyData {
- private final String containerName;
- private final String keyName;
+ private final BlockID blockID;
private final Map<String, String> metadata;
/**
@@ -44,12 +44,10 @@ public class KeyData {
/**
* Constructs a KeyData Object.
*
- * @param containerName
- * @param keyName
+ * @param blockID
*/
- public KeyData(String containerName, String keyName) {
- this.containerName = containerName;
- this.keyName = keyName;
+ public KeyData(BlockID blockID) {
+ this.blockID = blockID;
this.metadata = new TreeMap<>();
}
@@ -62,7 +60,7 @@ public class KeyData {
*/
public static KeyData getFromProtoBuf(ContainerProtos.KeyData data) throws
IOException {
- KeyData keyData = new KeyData(data.getContainerName(), data.getName());
+ KeyData keyData = new KeyData(BlockID.getFromProtobuf(data.getBlockID()));
for (int x = 0; x < data.getMetadataCount(); x++) {
keyData.addMetadata(data.getMetadata(x).getKey(),
data.getMetadata(x).getValue());
@@ -78,8 +76,7 @@ public class KeyData {
public ContainerProtos.KeyData getProtoBufMessage() {
ContainerProtos.KeyData.Builder builder =
ContainerProtos.KeyData.newBuilder();
- builder.setContainerName(this.containerName);
- builder.setName(this.getKeyName());
+ builder.setBlockID(this.blockID.getProtobuf());
builder.addAllChunks(this.chunks);
for (Map.Entry<String, String> entry : metadata.entrySet()) {
HddsProtos.KeyValue.Builder keyValBuilder =
@@ -135,19 +132,27 @@ public class KeyData {
}
/**
- * Returns container Name.
- * @return String.
+ * Returns container ID.
+ * @return long.
*/
- public String getContainerName() {
- return containerName;
+ public long getContainerID() {
+ return blockID.getContainerID();
}
/**
- * Returns KeyName.
- * @return String.
+ * Returns LocalID.
+ * @return long.
*/
- public String getKeyName() {
- return keyName;
+ public long getLocalID() {
+ return blockID.getLocalID();
+ }
+
+ /**
+ * Return Block ID.
+ * @return BlockID.
+ */
+ public BlockID getBlockID() {
+ return blockID;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
index fa79341..37a1309 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.ozone.protocolPB;
-import com.google.common.collect.Sets;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -38,18 +37,11 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.DeleteScmKeyBlocksRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.DeleteScmKeyBlocksResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
- .GetScmBlockLocationsRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
- .GetScmBlockLocationsResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
- .ScmLocatedBlockProto;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
import java.io.IOException;
import java.util.List;
-import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -73,34 +65,6 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
this.impl = impl;
}
-
- @Override
- public GetScmBlockLocationsResponseProto getScmBlockLocations(
- RpcController controller, GetScmBlockLocationsRequestProto req)
- throws ServiceException {
- Set<String> keys = Sets.newLinkedHashSetWithExpectedSize(
- req.getKeysCount());
- for (String key : req.getKeysList()) {
- keys.add(key);
- }
- final Set<AllocatedBlock> blocks;
- try {
- blocks = impl.getBlockLocations(keys);
- } catch (IOException ex) {
- throw new ServiceException(ex);
- }
- GetScmBlockLocationsResponseProto.Builder resp =
- GetScmBlockLocationsResponseProto.newBuilder();
- for (AllocatedBlock block: blocks) {
- ScmLocatedBlockProto.Builder locatedBlock =
- ScmLocatedBlockProto.newBuilder()
- .setKey(block.getKey())
- .setPipeline(block.getPipeline().getProtobufMessage());
- resp.addLocatedBlocks(locatedBlock.build());
- }
- return resp.build();
- }
-
@Override
public AllocateScmBlockResponseProto allocateScmBlock(
RpcController controller, AllocateScmBlockRequestProto request)
@@ -112,7 +76,7 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
if (allocatedBlock != null) {
return
AllocateScmBlockResponseProto.newBuilder()
- .setKey(allocatedBlock.getKey())
+ .setBlockID(allocatedBlock.getBlockID().getProtobuf())
.setPipeline(allocatedBlock.getPipeline().getProtobufMessage())
.setCreateContainer(allocatedBlock.getCreateContainer())
.setErrorCode(AllocateScmBlockResponseProto.Error.success)
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org