You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/05/07 22:00:03 UTC
[6/7] hadoop git commit: HDDS-1. Remove SCM Block DB. Contributed by
Xiaoyu Yao.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 4974268..70a0e8a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -23,7 +23,6 @@ import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -83,11 +82,10 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
public ContainerResponseProto allocateContainer(RpcController unused,
ContainerRequestProto request) throws ServiceException {
try {
- Pipeline pipeline = impl.allocateContainer(request.getReplicationType(),
- request.getReplicationFactor(), request.getContainerName(),
- request.getOwner());
+ ContainerInfo container = impl.allocateContainer(request.getReplicationType(),
+ request.getReplicationFactor(), request.getOwner());
return ContainerResponseProto.newBuilder()
- .setPipeline(pipeline.getProtobufMessage())
+ .setContainerInfo(container.getProtobuf())
.setErrorCode(ContainerResponseProto.Error.success)
.build();
@@ -101,9 +99,9 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
RpcController controller, GetContainerRequestProto request)
throws ServiceException {
try {
- Pipeline pipeline = impl.getContainer(request.getContainerName());
+ ContainerInfo container = impl.getContainer(request.getContainerID());
return GetContainerResponseProto.newBuilder()
- .setPipeline(pipeline.getProtobufMessage())
+ .setContainerInfo(container.getProtobuf())
.build();
} catch (IOException e) {
throw new ServiceException(e);
@@ -114,23 +112,17 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
public SCMListContainerResponseProto listContainer(RpcController controller,
SCMListContainerRequestProto request) throws ServiceException {
try {
- String startName = null;
- String prefixName = null;
+ long startContainerID = 0;
int count = -1;
// Arguments check.
- if (request.hasPrefixName()) {
+ if (request.hasStartContainerID()) {
// End container name is given.
- prefixName = request.getPrefixName();
+ startContainerID = request.getStartContainerID();
}
- if (request.hasStartName()) {
- // End container name is given.
- startName = request.getStartName();
- }
-
count = request.getCount();
List<ContainerInfo> containerList =
- impl.listContainer(startName, prefixName, count);
+ impl.listContainer(startContainerID, count);
SCMListContainerResponseProto.Builder builder =
SCMListContainerResponseProto.newBuilder();
for (ContainerInfo container : containerList) {
@@ -147,7 +139,7 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
RpcController controller, SCMDeleteContainerRequestProto request)
throws ServiceException {
try {
- impl.deleteContainer(request.getContainerName());
+ impl.deleteContainer(request.getContainerID());
return SCMDeleteContainerResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
@@ -178,7 +170,7 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
RpcController controller, ObjectStageChangeRequestProto request)
throws ServiceException {
try {
- impl.notifyObjectStageChange(request.getType(), request.getName(),
+ impl.notifyObjectStageChange(request.getType(), request.getId(),
request.getOp(), request.getStage());
return ObjectStageChangeResponseProto.newBuilder().build();
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
index 83ca83d..13b9180 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
@@ -76,7 +76,9 @@ public class LevelDBStore implements MetadataStore {
}
private void openDB(File dbPath, Options options) throws IOException {
- dbPath.getParentFile().mkdirs();
+ if (dbPath.getParentFile().mkdirs()) {
+ LOG.debug("Db path {} created.", dbPath.getParentFile());
+ }
db = JniDBFactory.factory.open(dbPath, options);
if (LOG.isDebugEnabled()) {
LOG.debug("LevelDB successfully opened");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java
index 3ff0a94..d3a2943 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java
@@ -17,8 +17,8 @@
*/
package org.apache.hadoop.utils;
+import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
-import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.OzoneConsts;
/**
@@ -94,8 +94,8 @@ public final class MetadataKeyFilters {
if (Strings.isNullOrEmpty(keyPrefix)) {
accept = true;
} else {
- if (currentKey != null &&
- DFSUtil.bytes2String(currentKey).startsWith(keyPrefix)) {
+ byte [] prefixBytes = keyPrefix.getBytes();
+ if (currentKey != null && prefixMatch(prefixBytes, currentKey)) {
keysHinted++;
accept = true;
} else {
@@ -114,5 +114,19 @@ public final class MetadataKeyFilters {
public int getKeysHintedNum() {
return keysHinted;
}
+
+ private boolean prefixMatch(byte[] prefix, byte[] key) {
+ Preconditions.checkNotNull(prefix);
+ Preconditions.checkNotNull(key);
+ if (key.length < prefix.length) {
+ return false;
+ }
+ for (int i = 0; i < prefix.length; i++) {
+ if (key[i] != prefix[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
index a60e98d..0dfca20 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
@@ -367,6 +367,7 @@ public class RocksDBStore implements MetadataStore {
public void close() throws IOException {
if (statMBeanName != null) {
MBeans.unregister(statMBeanName);
+ statMBeanName = null;
}
if (db != null) {
db.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index a6270ef..e7494ee 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -197,17 +197,15 @@ message ContainerCommandResponseProto {
}
message ContainerData {
- required string name = 1;
+ required int64 containerID = 1;
repeated KeyValue metadata = 2;
optional string dbPath = 3;
optional string containerPath = 4;
- optional string hash = 6;
- optional int64 bytesUsed = 7;
- optional int64 size = 8;
- optional int64 keyCount = 9;
- //TODO: change required after we switch container ID from string to long
- optional int64 containerID = 10;
- optional LifeCycleState state = 11 [default = OPEN];
+ optional string hash = 5;
+ optional int64 bytesUsed = 6;
+ optional int64 size = 7;
+ optional int64 keyCount = 8;
+ optional LifeCycleState state = 9 [default = OPEN];
}
message ContainerMeta {
@@ -226,7 +224,7 @@ message CreateContainerResponseProto {
message ReadContainerRequestProto {
required Pipeline pipeline = 1;
- required string name = 2;
+ required int64 containerID = 2;
}
message ReadContainerResponseProto {
@@ -243,19 +241,16 @@ message UpdateContainerResponseProto {
}
message DeleteContainerRequestProto {
- required Pipeline pipeline = 1;
- required string name = 2;
- optional bool forceDelete = 3 [default = false];
+ required int64 containerID = 1;
+ optional bool forceDelete = 2 [default = false];
}
message DeleteContainerResponseProto {
}
message ListContainerRequestProto {
- required Pipeline pipeline = 1;
- optional string prefix = 2;
- required uint32 count = 3; // Max Results to return
- optional string prevKey = 4; // if this is not set query from start.
+ required int64 startContainerID = 1;
+ optional uint32 count = 2; // Max Results to return
}
message ListContainerResponseProto {
@@ -263,34 +258,31 @@ message ListContainerResponseProto {
}
message CloseContainerRequestProto {
- required Pipeline pipeline = 1;
+ required int64 containerID = 1;
}
message CloseContainerResponseProto {
- optional Pipeline pipeline = 1;
optional string hash = 2;
+ optional int64 containerID = 3;
}
message KeyData {
- required string containerName = 1;
- required string name = 2;
- optional int64 flags = 3; // for future use.
- repeated KeyValue metadata = 4;
- repeated ChunkInfo chunks = 5;
+ required BlockID blockID = 1;
+ optional int64 flags = 2; // for future use.
+ repeated KeyValue metadata = 3;
+ repeated ChunkInfo chunks = 4;
}
// Key Messages.
message PutKeyRequestProto {
- required Pipeline pipeline = 1;
- required KeyData keyData = 2;
+ required KeyData keyData = 1;
}
message PutKeyResponseProto {
}
message GetKeyRequestProto {
- required Pipeline pipeline = 1;
- required KeyData keyData = 2;
+ required KeyData keyData = 1;
}
message GetKeyResponseProto {
@@ -299,17 +291,15 @@ message GetKeyResponseProto {
message DeleteKeyRequestProto {
- required Pipeline pipeline = 1;
- required string name = 2;
+ required BlockID blockID = 1;
}
message DeleteKeyResponseProto {
}
message ListKeyRequestProto {
- required Pipeline pipeline = 1;
- optional string prefix = 2; // if specified returns keys that match prefix.
- required string prevKey = 3;
+ required int64 containerID = 1;
+ optional int64 startLocalID = 2;
required uint32 count = 4;
}
@@ -335,31 +325,28 @@ enum Stage {
}
message WriteChunkRequestProto {
- required Pipeline pipeline = 1;
- required string keyName = 2;
- required ChunkInfo chunkData = 3;
- optional bytes data = 4;
- optional Stage stage = 5 [default = COMBINED];
+ required BlockID blockID = 1;
+ required ChunkInfo chunkData = 2;
+ optional bytes data = 3;
+ optional Stage stage = 4 [default = COMBINED];
}
message WriteChunkResponseProto {
}
message ReadChunkRequestProto {
- required Pipeline pipeline = 1;
- required string keyName = 2;
- required ChunkInfo chunkData = 3;
+ required BlockID blockID = 1;
+ required ChunkInfo chunkData = 2;
}
message ReadChunkResponseProto {
- required Pipeline pipeline = 1;
+ required BlockID blockID = 1;
required ChunkInfo chunkData = 2;
required bytes data = 3;
}
message DeleteChunkRequestProto {
- required Pipeline pipeline = 1;
- required string keyName = 2;
+ required BlockID blockID = 1;
required ChunkInfo chunkData = 3;
}
@@ -367,10 +354,9 @@ message DeleteChunkResponseProto {
}
message ListChunkRequestProto {
- required Pipeline pipeline = 1;
- required string keyName = 2;
- required string prevChunkName = 3;
- required uint32 count = 4;
+ required BlockID blockID = 1;
+ required string prevChunkName = 2;
+ required uint32 count = 3;
}
message ListChunkResponseProto {
@@ -400,7 +386,7 @@ message GetSmallFileResponseProto {
}
message CopyContainerRequestProto {
- required string containerName = 1;
+ required int64 containerID = 1;
required uint64 readOffset = 2;
optional uint64 len = 3;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto
index 38d2e16..7bea82a 100644
--- a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto
@@ -33,28 +33,6 @@ import "hdds.proto";
// SCM Block protocol
-/**
- * keys - batch of block keys to find
- */
-message GetScmBlockLocationsRequestProto {
- repeated string keys = 1;
-}
-
-/**
- * locatedBlocks - for each requested hash, nodes that currently host the
- * container for that object key hash
- */
-message GetScmBlockLocationsResponseProto {
- repeated ScmLocatedBlockProto locatedBlocks = 1;
-}
-
-/**
- * Holds the nodes that currently host the blocks for a key.
- */
-message ScmLocatedBlockProto {
- required string key = 1;
- required hadoop.hdds.Pipeline pipeline = 2;
-}
/**
* Request send to SCM asking allocate block of specified size.
@@ -84,7 +62,7 @@ message DeleteScmKeyBlocksRequestProto {
*/
message KeyBlocks {
required string key = 1;
- repeated string blocks = 2;
+ repeated BlockID blocks = 2;
}
/**
@@ -112,7 +90,7 @@ message DeleteScmBlockResult {
unknownFailure = 4;
}
required Result result = 1;
- required string key = 2;
+ required BlockID blockID = 2;
}
/**
@@ -126,7 +104,7 @@ message AllocateScmBlockResponseProto {
unknownFailure = 4;
}
required Error errorCode = 1;
- required string key = 2;
+ required BlockID blockID = 2;
required hadoop.hdds.Pipeline pipeline = 3;
required bool createContainer = 4;
optional string errorMessage = 5;
@@ -139,14 +117,6 @@ message AllocateScmBlockResponseProto {
service ScmBlockLocationProtocolService {
/**
- * Find the set of nodes that currently host the block, as
- * identified by the key. This method supports batch lookup by
- * passing multiple keys.
- */
- rpc getScmBlockLocations(GetScmBlockLocationsRequestProto)
- returns (GetScmBlockLocationsResponseProto);
-
- /**
* Creates a block entry in SCM.
*/
rpc allocateScmBlock(AllocateScmBlockRequestProto)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
index d7540a3..090e6eb 100644
--- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
@@ -35,7 +35,6 @@ import "hdds.proto";
* Request send to SCM asking where the container should be created.
*/
message ContainerRequestProto {
- required string containerName = 1;
// Ozone only support replciation of either 1 or 3.
required ReplicationFactor replicationFactor = 2;
required ReplicationType replicationType = 3;
@@ -53,30 +52,29 @@ message ContainerResponseProto {
errorContainerMissing = 3;
}
required Error errorCode = 1;
- required Pipeline pipeline = 2;
+ required SCMContainerInfo containerInfo = 2;
optional string errorMessage = 3;
}
message GetContainerRequestProto {
- required string containerName = 1;
+ required int64 containerID = 1;
}
message GetContainerResponseProto {
- required Pipeline pipeline = 1;
+ required SCMContainerInfo containerInfo = 1;
}
message SCMListContainerRequestProto {
required uint32 count = 1;
- optional string startName = 2;
- optional string prefixName = 3;
-}
+ optional uint64 startContainerID = 2;
+ }
message SCMListContainerResponseProto {
repeated SCMContainerInfo containers = 1;
}
message SCMDeleteContainerRequestProto {
- required string containerName = 1;
+ required int64 containerID = 1;
}
message SCMDeleteContainerResponseProto {
@@ -97,7 +95,7 @@ message ObjectStageChangeRequestProto {
begin = 1;
complete = 2;
}
- required string name = 1;
+ required int64 id = 1;
required Type type = 2;
required Op op= 3;
required Stage stage = 4;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/proto/hdds.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto
index 0b650b4..6ea5727 100644
--- a/hadoop-hdds/common/src/main/proto/hdds.proto
+++ b/hadoop-hdds/common/src/main/proto/hdds.proto
@@ -50,7 +50,6 @@ message PipelineChannel {
// A pipeline is composed of PipelineChannel (Ratis/StandAlone) that back a
// container.
message Pipeline {
- required string containerName = 1;
required PipelineChannel pipelineChannel = 2;
}
@@ -135,8 +134,7 @@ enum LifeCycleEvent {
}
message SCMContainerInfo {
- // TODO : Remove the container name from pipeline.
- required string containerName = 1;
+ required int64 containerID = 1;
required LifeCycleState state = 2;
required Pipeline pipeline = 3;
// This is not total size of container, but space allocated by SCM for
@@ -146,7 +144,6 @@ message SCMContainerInfo {
required uint64 numberOfKeys = 6;
optional int64 stateEnterTime = 7;
required string owner = 8;
- required int64 containerID = 9;
}
message GetScmInfoRequestProto {
@@ -168,3 +165,11 @@ enum ReplicationFactor {
ONE = 1;
THREE = 3;
}
+
+/**
+ * Block ID that uniquely identify a block by SCM.
+ */
+message BlockID {
+ required int64 containerID = 1;
+ required int64 localID = 2;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
index 68bf442..8c5609d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
@@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
@@ -105,18 +104,17 @@ public final class ChunkUtils {
* Validates chunk data and returns a file object to Chunk File that we are
* expected to write data to.
*
- * @param pipeline - pipeline.
* @param data - container data.
* @param info - chunk info.
* @return File
* @throws StorageContainerException
*/
- public static File validateChunk(Pipeline pipeline, ContainerData data,
+ public static File validateChunk(ContainerData data,
ChunkInfo info) throws StorageContainerException {
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
- File chunkFile = getChunkFile(pipeline, data, info);
+ File chunkFile = getChunkFile(data, info);
if (ChunkUtils.isOverWriteRequested(chunkFile, info)) {
if (!ChunkUtils.isOverWritePermitted(info)) {
log.error("Rejecting write chunk request. Chunk overwrite " +
@@ -132,21 +130,21 @@ public final class ChunkUtils {
/**
* Validates that Path to chunk file exists.
*
- * @param pipeline - Container Info.
* @param data - Container Data
* @param info - Chunk info
* @return - File.
* @throws StorageContainerException
*/
- public static File getChunkFile(Pipeline pipeline, ContainerData data,
+ public static File getChunkFile(ContainerData data,
ChunkInfo info) throws StorageContainerException {
+ Preconditions.checkNotNull(data, "Container data can't be null");
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
- if (data == null) {
- log.error("Invalid container Name: {}", pipeline.getContainerName());
- throw new StorageContainerException("Unable to find the container Name:" +
+ if (data.getContainerID() < 0) {
+ log.error("Invalid container id: {}", data.getContainerID());
+ throw new StorageContainerException("Unable to find the container id:" +
" " +
- pipeline.getContainerName(), CONTAINER_NOT_FOUND);
+ data.getContainerID(), CONTAINER_NOT_FOUND);
}
File dataDir = ContainerUtils.getDataDirectory(data).toFile();
@@ -335,7 +333,7 @@ public final class ChunkUtils {
ContainerProtos.ReadChunkResponseProto.newBuilder();
response.setChunkData(info.getProtoBufMessage());
response.setData(ByteString.copyFrom(data));
- response.setPipeline(msg.getReadChunk().getPipeline());
+ response.setBlockID(msg.getReadChunk().getBlockID());
ContainerProtos.ContainerCommandResponseProto.Builder builder =
ContainerUtils.getContainerResponse(msg, ContainerProtos.Result
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
index c29374c..c20282a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
@@ -40,7 +40,6 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class ContainerData {
- private final String containerName;
private final Map<String, String> metadata;
private String dbPath; // Path to Level DB Store.
// Path to Physical file system where container and checksum are stored.
@@ -48,18 +47,18 @@ public class ContainerData {
private String hash;
private AtomicLong bytesUsed;
private long maxSize;
- private Long containerID;
+ private long containerID;
private HddsProtos.LifeCycleState state;
/**
* Constructs a ContainerData Object.
*
- * @param containerName - Name
+ * @param containerID - ID
+ * @param conf - Configuration
*/
- public ContainerData(String containerName, Long containerID,
+ public ContainerData(long containerID,
Configuration conf) {
this.metadata = new TreeMap<>();
- this.containerName = containerName;
this.maxSize = conf.getLong(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY,
ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT) * OzoneConsts.GB;
this.bytesUsed = new AtomicLong(0L);
@@ -76,7 +75,7 @@ public class ContainerData {
public static ContainerData getFromProtBuf(
ContainerProtos.ContainerData protoData, Configuration conf)
throws IOException {
- ContainerData data = new ContainerData(protoData.getName(),
+ ContainerData data = new ContainerData(
protoData.getContainerID(), conf);
for (int x = 0; x < protoData.getMetadataCount(); x++) {
data.addMetadata(protoData.getMetadata(x).getKey(),
@@ -117,7 +116,6 @@ public class ContainerData {
public ContainerProtos.ContainerData getProtoBufMessage() {
ContainerProtos.ContainerData.Builder builder = ContainerProtos
.ContainerData.newBuilder();
- builder.setName(this.getContainerName());
builder.setContainerID(this.getContainerID());
if (this.getDBPath() != null) {
@@ -157,15 +155,6 @@ public class ContainerData {
}
/**
- * Returns the name of the container.
- *
- * @return - name
- */
- public String getContainerName() {
- return containerName;
- }
-
- /**
* Adds metadata.
*/
public void addMetadata(String key, String value) throws IOException {
@@ -231,9 +220,11 @@ public class ContainerData {
*
* @return String Name.
*/
- public String getName() {
- return getContainerName();
- }
+ // TODO: check the ContainerCache class to see if we are using the ContainerID instead.
+ /*
+ public String getName() {
+ return getContainerID();
+ }*/
/**
* Get container file path.
@@ -255,7 +246,7 @@ public class ContainerData {
* Get container ID.
* @return - container ID.
*/
- public synchronized Long getContainerID() {
+ public synchronized long getContainerID() {
return containerID;
}
@@ -284,7 +275,7 @@ public class ContainerData {
// Some thing brain dead for now. name + Time stamp of when we get the close
// container message.
- setHash(DigestUtils.sha256Hex(this.getContainerName() +
+ setHash(DigestUtils.sha256Hex(this.getContainerID() +
Long.toString(Time.monotonicNow())));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
index 50d2da3..19634f4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
*/
public class ContainerReport {
private static final int UNKNOWN = -1;
- private final String containerName;
private final String finalhash;
private long size;
private long keyCount;
@@ -51,11 +50,11 @@ public class ContainerReport {
/**
* Constructs the ContainerReport.
*
- * @param containerName - Container Name.
+ * @param containerID - Container ID.
* @param finalhash - Final Hash.
*/
- public ContainerReport(String containerName, String finalhash) {
- this.containerName = containerName;
+ public ContainerReport(long containerID, String finalhash) {
+ this.containerID = containerID;
this.finalhash = finalhash;
this.size = UNKNOWN;
this.keyCount = UNKNOWN;
@@ -74,7 +73,7 @@ public class ContainerReport {
*/
public static ContainerReport getFromProtoBuf(ContainerInfo info) {
Preconditions.checkNotNull(info);
- ContainerReport report = new ContainerReport(info.getContainerName(),
+ ContainerReport report = new ContainerReport(info.getContainerID(),
info.getFinalhash());
if (info.hasSize()) {
report.setSize(info.getSize());
@@ -103,15 +102,6 @@ public class ContainerReport {
}
/**
- * Gets the container name.
- *
- * @return - Name
- */
- public String getContainerName() {
- return containerName;
- }
-
- /**
* Returns the final signature for this container.
*
* @return - hash
@@ -203,7 +193,6 @@ public class ContainerReport {
*/
public ContainerInfo getProtoBufMessage() {
return ContainerInfo.newBuilder()
- .setContainerName(this.getContainerName())
.setKeyCount(this.getKeyCount())
.setSize(this.getSize())
.setUsed(this.getBytesUsed())
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
index 1818188..e244354 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
@@ -184,6 +184,12 @@ public final class ContainerUtils {
removeExtension(containerFile.getName())).toString();
}
+ public static long getContainerIDFromFile(File containerFile) {
+ Preconditions.checkNotNull(containerFile);
+ String containerID = getContainerNameFromFile(containerFile);
+ return Long.parseLong(containerID);
+ }
+
/**
* Verifies that this in indeed a new container.
*
@@ -289,8 +295,8 @@ public final class ContainerUtils {
*/
public static File getMetadataFile(ContainerData containerData,
Path location) {
- return location.resolve(containerData
- .getContainerName().concat(CONTAINER_META))
+ return location.resolve(Long.toString(containerData
+ .getContainerID()).concat(CONTAINER_META))
.toFile();
}
@@ -303,8 +309,8 @@ public final class ContainerUtils {
*/
public static File getContainerFile(ContainerData containerData,
Path location) {
- return location.resolve(containerData
- .getContainerName().concat(CONTAINER_EXTENSION))
+ return location.resolve(Long.toString(containerData
+ .getContainerID()).concat(CONTAINER_EXTENSION))
.toFile();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java
index ade162a..9d0ec95 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.helpers;
import com.google.common.collect.Maps;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.util.StringUtils;
import java.util.List;
import java.util.Map;
@@ -37,7 +38,7 @@ public final class DeletedContainerBlocksSummary {
// value : the number of blocks need to be deleted in this container
// if the message contains multiple entries for same block,
// blocks will be merged
- private final Map<String, Integer> blockSummary;
+ private final Map<Long, Integer> blockSummary;
// total number of blocks in this message
private int numOfBlocks;
@@ -47,14 +48,14 @@ public final class DeletedContainerBlocksSummary {
blockSummary = Maps.newHashMap();
blocks.forEach(entry -> {
txSummary.put(entry.getTxID(), entry.getCount());
- if (blockSummary.containsKey(entry.getContainerName())) {
- blockSummary.put(entry.getContainerName(),
- blockSummary.get(entry.getContainerName())
- + entry.getBlockIDCount());
+ if (blockSummary.containsKey(entry.getContainerID())) {
+ blockSummary.put(entry.getContainerID(),
+ blockSummary.get(entry.getContainerID())
+ + entry.getLocalIDCount());
} else {
- blockSummary.put(entry.getContainerName(), entry.getBlockIDCount());
+ blockSummary.put(entry.getContainerID(), entry.getLocalIDCount());
}
- numOfBlocks += entry.getBlockIDCount();
+ numOfBlocks += entry.getLocalIDCount();
});
}
@@ -93,9 +94,9 @@ public final class DeletedContainerBlocksSummary {
.append("TimesProceed=")
.append(blks.getCount())
.append(", ")
- .append(blks.getContainerName())
+ .append(blks.getContainerID())
.append(" : [")
- .append(String.join(",", blks.getBlockIDList())).append("]")
+ .append(StringUtils.join(',', blks.getLocalIDList())).append("]")
.append("\n");
}
return sb.toString();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java
index 566db02..ec27452 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java
@@ -65,7 +65,8 @@ public final class FileUtils {
ContainerProtos.ReadChunkResponseProto.newBuilder();
readChunkresponse.setChunkData(info.getProtoBufMessage());
readChunkresponse.setData(ByteString.copyFrom(data));
- readChunkresponse.setPipeline(msg.getGetSmallFile().getKey().getPipeline());
+ readChunkresponse.setBlockID(msg.getGetSmallFile().getKey().
+ getKeyData().getBlockID());
ContainerProtos.GetSmallFileResponseProto.Builder getSmallFile =
ContainerProtos.GetSmallFileResponseProto.newBuilder();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
index 33eb911..dbd5772 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
@@ -63,11 +63,11 @@ public final class KeyUtils {
ContainerCache cache = ContainerCache.getInstance(conf);
Preconditions.checkNotNull(cache);
try {
- return cache.getDB(container.getContainerName(), container.getDBPath());
+ return cache.getDB(container.getContainerID(), container.getDBPath());
} catch (IOException ex) {
String message =
String.format("Unable to open DB. DB Name: %s, Path: %s. ex: %s",
- container.getContainerName(), container.getDBPath(), ex.getMessage());
+ container.getContainerID(), container.getDBPath(), ex.getMessage());
throw new StorageContainerException(message, UNABLE_TO_READ_METADATA_DB);
}
}
@@ -83,7 +83,7 @@ public final class KeyUtils {
Preconditions.checkNotNull(container);
ContainerCache cache = ContainerCache.getInstance(conf);
Preconditions.checkNotNull(cache);
- cache.removeDB(container.getContainerName());
+ cache.removeDB(container.getContainerID());
}
/**
* Shutdown all DB Handles.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
index 457c417..3505196 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
@@ -19,11 +19,11 @@ package org.apache.hadoop.ozone.container.common.impl;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
@@ -66,13 +66,12 @@ public class ChunkManagerImpl implements ChunkManager {
/**
* writes a given chunk.
*
- * @param pipeline - Name and the set of machines that make this container.
- * @param keyName - Name of the Key.
+ * @param blockID - ID of the block.
* @param info - ChunkInfo.
* @throws StorageContainerException
*/
@Override
- public void writeChunk(Pipeline pipeline, String keyName, ChunkInfo info,
+ public void writeChunk(BlockID blockID, ChunkInfo info,
byte[] data, ContainerProtos.Stage stage)
throws StorageContainerException {
// we don't want container manager to go away while we are writing chunks.
@@ -80,13 +79,13 @@ public class ChunkManagerImpl implements ChunkManager {
// TODO : Take keyManager Write lock here.
try {
- Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
- String containerName = pipeline.getContainerName();
- Preconditions.checkNotNull(containerName,
- "Container name cannot be null");
+ Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
+ long containerID = blockID.getContainerID();
+ Preconditions.checkState(containerID >= 0,
+ "Container ID cannot be negative");
ContainerData container =
- containerManager.readContainer(containerName);
- File chunkFile = ChunkUtils.validateChunk(pipeline, container, info);
+ containerManager.readContainer(containerID);
+ File chunkFile = ChunkUtils.validateChunk(container, info);
File tmpChunkFile = getTmpChunkFile(chunkFile, info);
LOG.debug("writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file",
@@ -96,16 +95,16 @@ public class ChunkManagerImpl implements ChunkManager {
ChunkUtils.writeData(tmpChunkFile, info, data);
break;
case COMMIT_DATA:
- commitChunk(tmpChunkFile, chunkFile, containerName, info.getLen());
+ commitChunk(tmpChunkFile, chunkFile, containerID, info.getLen());
break;
case COMBINED:
// directly write to the chunk file
long oldSize = chunkFile.length();
ChunkUtils.writeData(chunkFile, info, data);
long newSize = chunkFile.length();
- containerManager.incrBytesUsed(containerName, newSize - oldSize);
- containerManager.incrWriteCount(containerName);
- containerManager.incrWriteBytes(containerName, info.getLen());
+ containerManager.incrBytesUsed(containerID, newSize - oldSize);
+ containerManager.incrWriteCount(containerID);
+ containerManager.incrWriteBytes(containerID, info.getLen());
break;
default:
throw new IOException("Can not identify write operation.");
@@ -136,22 +135,21 @@ public class ChunkManagerImpl implements ChunkManager {
// Commit the chunk by renaming the temporary chunk file to chunk file
private void commitChunk(File tmpChunkFile, File chunkFile,
- String containerName, long chunkLen) throws IOException {
+ long containerID, long chunkLen) throws IOException {
long sizeDiff = tmpChunkFile.length() - chunkFile.length();
// It is safe to replace here as the earlier chunk if existing should be
// caught as part of validateChunk
Files.move(tmpChunkFile.toPath(), chunkFile.toPath(),
StandardCopyOption.REPLACE_EXISTING);
- containerManager.incrBytesUsed(containerName, sizeDiff);
- containerManager.incrWriteCount(containerName);
- containerManager.incrWriteBytes(containerName, chunkLen);
+ containerManager.incrBytesUsed(containerID, sizeDiff);
+ containerManager.incrWriteCount(containerID);
+ containerManager.incrWriteBytes(containerID, chunkLen);
}
/**
* reads the data defined by a chunk.
*
- * @param pipeline - container pipeline.
- * @param keyName - Name of the Key
+ * @param blockID - ID of the block.
* @param info - ChunkInfo.
* @return byte array
* @throws StorageContainerException
@@ -159,20 +157,20 @@ public class ChunkManagerImpl implements ChunkManager {
* TODO: Explore if we need to do that for ozone.
*/
@Override
- public byte[] readChunk(Pipeline pipeline, String keyName, ChunkInfo info)
+ public byte[] readChunk(BlockID blockID, ChunkInfo info)
throws StorageContainerException {
containerManager.readLock();
try {
- Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
- String containerName = pipeline.getContainerName();
- Preconditions.checkNotNull(containerName,
- "Container name cannot be null");
+ Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
+ long containerID = blockID.getContainerID();
+ Preconditions.checkState(containerID >= 0,
+ "Container ID cannot be negative");
ContainerData container =
- containerManager.readContainer(containerName);
- File chunkFile = ChunkUtils.getChunkFile(pipeline, container, info);
+ containerManager.readContainer(containerID);
+ File chunkFile = ChunkUtils.getChunkFile(container, info);
ByteBuffer data = ChunkUtils.readData(chunkFile, info);
- containerManager.incrReadCount(containerName);
- containerManager.incrReadBytes(containerName, chunkFile.length());
+ containerManager.incrReadCount(containerID);
+ containerManager.incrReadBytes(containerID, chunkFile.length());
return data.array();
} catch (ExecutionException | NoSuchAlgorithmException e) {
LOG.error("read data failed. error: {}", e);
@@ -191,25 +189,25 @@ public class ChunkManagerImpl implements ChunkManager {
/**
* Deletes a given chunk.
*
- * @param pipeline - Pipeline.
- * @param keyName - Key Name
+ * @param blockID - ID of the block.
* @param info - Chunk Info
* @throws StorageContainerException
*/
@Override
- public void deleteChunk(Pipeline pipeline, String keyName, ChunkInfo info)
+ public void deleteChunk(BlockID blockID, ChunkInfo info)
throws StorageContainerException {
containerManager.readLock();
try {
- Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
- String containerName = pipeline.getContainerName();
- Preconditions.checkNotNull(containerName,
- "Container name cannot be null");
- File chunkFile = ChunkUtils.getChunkFile(pipeline, containerManager
- .readContainer(containerName), info);
+ Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
+ long containerID = blockID.getContainerID();
+ Preconditions.checkState(containerID >= 0,
+ "Container ID cannot be negative");
+
+ File chunkFile = ChunkUtils.getChunkFile(containerManager
+ .readContainer(containerID), info);
if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) {
FileUtil.fullyDelete(chunkFile);
- containerManager.decrBytesUsed(containerName, chunkFile.length());
+ containerManager.decrBytesUsed(containerID, chunkFile.length());
} else {
LOG.error("Not Supported Operation. Trying to delete a " +
"chunk that is in shared file. chunk info : " + info.toString());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
index 5e7375c..1893b3b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
@@ -24,7 +24,6 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
@@ -113,7 +112,9 @@ public class ContainerManagerImpl implements ContainerManager {
static final Logger LOG =
LoggerFactory.getLogger(ContainerManagerImpl.class);
- private final ConcurrentSkipListMap<String, ContainerStatus>
+ // TODO: consider primitive collection like eclipse-collections
+ // to avoid autoboxing overhead
+ private final ConcurrentSkipListMap<Long, ContainerStatus>
containerMap = new ConcurrentSkipListMap<>();
// Use a non-fair RW lock for better throughput, we may revisit this decision
@@ -229,6 +230,7 @@ public class ContainerManagerImpl implements ContainerManager {
Preconditions.checkNotNull(keyName,
"Container Name to container key mapping is null");
+ long containerID = Long.parseLong(keyName);
try {
String containerFileName = containerName.concat(CONTAINER_EXTENSION);
String metaFileName = containerName.concat(CONTAINER_META);
@@ -249,7 +251,7 @@ public class ContainerManagerImpl implements ContainerManager {
// when loading the info we get a null, this often means last time
// SCM was ending up at some middle phase causing that the metadata
// was not populated. Such containers are marked as inactive.
- containerMap.put(keyName, new ContainerStatus(null));
+ containerMap.put(containerID, new ContainerStatus(null));
return;
}
containerData = ContainerData.getFromProtBuf(containerDataProto, conf);
@@ -263,7 +265,7 @@ public class ContainerManagerImpl implements ContainerManager {
// Hopefully SCM will ask us to delete this container and rebuild it.
LOG.error("Invalid SHA found for container data. Name :{}"
+ "cowardly refusing to read invalid data", containerName);
- containerMap.put(keyName, new ContainerStatus(null));
+ containerMap.put(containerID, new ContainerStatus(null));
return;
}
@@ -295,7 +297,7 @@ public class ContainerManagerImpl implements ContainerManager {
}).sum();
containerStatus.setBytesUsed(bytesUsed);
- containerMap.put(keyName, containerStatus);
+ containerMap.put(containerID, containerStatus);
} catch (IOException | NoSuchAlgorithmException ex) {
LOG.error("read failed for file: {} ex: {}", containerName,
ex.getMessage());
@@ -303,7 +305,7 @@ public class ContainerManagerImpl implements ContainerManager {
// TODO : Add this file to a recovery Queue.
// Remember that this container is busted and we cannot use it.
- containerMap.put(keyName, new ContainerStatus(null));
+ containerMap.put(containerID, new ContainerStatus(null));
throw new StorageContainerException("Unable to read container info",
UNABLE_TO_READ_METADATA_DB);
} finally {
@@ -316,18 +318,17 @@ public class ContainerManagerImpl implements ContainerManager {
/**
* Creates a container with the given name.
*
- * @param pipeline -- Nodes which make up this container.
* @param containerData - Container Name and metadata.
* @throws StorageContainerException - Exception
*/
@Override
- public void createContainer(Pipeline pipeline, ContainerData containerData)
+ public void createContainer(ContainerData containerData)
throws StorageContainerException {
Preconditions.checkNotNull(containerData, "Container data cannot be null");
writeLock();
try {
- if (containerMap.containsKey(containerData.getName())) {
- LOG.debug("container already exists. {}", containerData.getName());
+ if (containerMap.containsKey(containerData.getContainerID())) {
+ LOG.debug("container already exists. {}", containerData.getContainerID());
throw new StorageContainerException("container already exists.",
CONTAINER_EXISTS);
}
@@ -399,7 +400,7 @@ public class ContainerManagerImpl implements ContainerManager {
location);
File metadataFile = ContainerUtils.getMetadataFile(containerData,
location);
- String containerName = containerData.getContainerName();
+ String containerName = Long.toString(containerData.getContainerID());
if(!overwrite) {
ContainerUtils.verifyIsNewContainer(containerFile, metadataFile);
@@ -446,7 +447,7 @@ public class ContainerManagerImpl implements ContainerManager {
LOG.error("Creation of container failed. Name: {}, we might need to " +
"cleanup partially created artifacts. ",
- containerData.getContainerName(), ex);
+ containerData.getContainerID(), ex);
throw new StorageContainerException("Container creation failed. ",
ex, CONTAINER_INTERNAL_ERROR);
} finally {
@@ -459,45 +460,45 @@ public class ContainerManagerImpl implements ContainerManager {
/**
* Deletes an existing container.
*
- * @param pipeline - nodes that make this container.
- * @param containerName - name of the container.
+ * @param containerID - ID of the container.
* @param forceDelete - whether this container should be deleted forcibly.
* @throws StorageContainerException
*/
@Override
- public void deleteContainer(Pipeline pipeline, String containerName,
+ public void deleteContainer(long containerID,
boolean forceDelete) throws StorageContainerException {
- Preconditions.checkNotNull(containerName, "Container name cannot be null");
- Preconditions.checkState(containerName.length() > 0,
- "Container name length cannot be zero.");
+ Preconditions.checkState(containerID >= 0,
+ "Container ID cannot be negative.");
writeLock();
try {
- if (isOpen(pipeline.getContainerName())) {
+ if (isOpen(containerID)) {
throw new StorageContainerException(
"Deleting an open container is not allowed.",
UNCLOSED_CONTAINER_IO);
}
- ContainerStatus status = containerMap.get(containerName);
+ ContainerStatus status = containerMap.get(containerID);
if (status == null) {
- LOG.debug("No such container. Name: {}", containerName);
- throw new StorageContainerException("No such container. Name : " +
- containerName, CONTAINER_NOT_FOUND);
+ LOG.debug("No such container. ID: {}", containerID);
+ throw new StorageContainerException("No such container. ID : " +
+ containerID, CONTAINER_NOT_FOUND);
}
if (status.getContainer() == null) {
- LOG.debug("Invalid container data. Name: {}", containerName);
+ LOG.debug("Invalid container data. ID: {}", containerID);
throw new StorageContainerException("Invalid container data. Name : " +
- containerName, CONTAINER_NOT_FOUND);
+ containerID, CONTAINER_NOT_FOUND);
}
ContainerUtils.removeContainer(status.getContainer(), conf, forceDelete);
- containerMap.remove(containerName);
+ containerMap.remove(containerID);
} catch (StorageContainerException e) {
throw e;
} catch (IOException e) {
// TODO : An I/O error during delete can leave partial artifacts on the
// disk. We will need the cleaner thread to cleanup this information.
- LOG.error("Failed to cleanup container. Name: {}", containerName, e);
- throw new StorageContainerException(containerName, e, IO_EXCEPTION);
+ String errMsg = String.format("Failed to cleanup container. ID: %d",
+ containerID);
+ LOG.error(errMsg, e);
+ throw new StorageContainerException(errMsg, e, IO_EXCEPTION);
} finally {
writeUnlock();
}
@@ -511,25 +512,29 @@ public class ContainerManagerImpl implements ContainerManager {
* time. It is possible that using this iteration you can miss certain
* container from the listing.
*
- * @param prefix - Return keys that match this prefix.
+ * @param startContainerID - Return containers with ID >= startContainerID.
* @param count - how many to return
- * @param prevKey - Previous Key Value or empty String.
* @param data - Actual containerData
* @throws StorageContainerException
*/
@Override
- public void listContainer(String prefix, long count, String prevKey,
+ public void listContainer(long startContainerID, long count,
List<ContainerData> data) throws StorageContainerException {
- // TODO : Support list with Prefix and PrevKey
Preconditions.checkNotNull(data,
"Internal assertion: data cannot be null");
+ Preconditions.checkState(startContainerID >= 0,
+ "Start container ID cannot be negative");
+ Preconditions.checkState(count > 0,
+ "max number of containers returned " +
+ "must be positive");
+
readLock();
try {
- ConcurrentNavigableMap<String, ContainerStatus> map;
- if (prevKey == null || prevKey.isEmpty()) {
+ ConcurrentNavigableMap<Long, ContainerStatus> map;
+ if (startContainerID == 0) {
map = containerMap.tailMap(containerMap.firstKey(), true);
} else {
- map = containerMap.tailMap(prevKey, false);
+ map = containerMap.tailMap(startContainerID, false);
}
int currentCount = 0;
@@ -549,24 +554,23 @@ public class ContainerManagerImpl implements ContainerManager {
/**
* Get metadata about a specific container.
*
- * @param containerName - Name of the container
+ * @param containerID - ID of the container
* @return ContainerData - Container Data.
* @throws StorageContainerException
*/
@Override
- public ContainerData readContainer(String containerName) throws
- StorageContainerException {
- Preconditions.checkNotNull(containerName, "Container name cannot be null");
- Preconditions.checkState(containerName.length() > 0,
- "Container name length cannot be zero.");
- if (!containerMap.containsKey(containerName)) {
- throw new StorageContainerException("Unable to find the container. Name: "
- + containerName, CONTAINER_NOT_FOUND);
+ public ContainerData readContainer(long containerID)
+ throws StorageContainerException {
+ Preconditions.checkState(containerID >= 0,
+ "Container ID cannot be negative.");
+ if (!containerMap.containsKey(containerID)) {
+ throw new StorageContainerException("Unable to find the container. ID: "
+ + containerID, CONTAINER_NOT_FOUND);
}
- ContainerData cData = containerMap.get(containerName).getContainer();
+ ContainerData cData = containerMap.get(containerID).getContainer();
if (cData == null) {
- throw new StorageContainerException("Invalid container data. Name: "
- + containerName, CONTAINER_INTERNAL_ERROR);
+ throw new StorageContainerException("Invalid container data. ID: "
+ + containerID, CONTAINER_INTERNAL_ERROR);
}
return cData;
}
@@ -575,13 +579,13 @@ public class ContainerManagerImpl implements ContainerManager {
* Closes a open container, if it is already closed or does not exist a
* StorageContainerException is thrown.
*
- * @param containerName - Name of the container.
+ * @param containerID - ID of the container.
* @throws StorageContainerException
*/
@Override
- public void closeContainer(String containerName)
+ public void closeContainer(long containerID)
throws StorageContainerException, NoSuchAlgorithmException {
- ContainerData containerData = readContainer(containerName);
+ ContainerData containerData = readContainer(containerID);
containerData.closeContainer();
writeContainerInfo(containerData, true);
MetadataStore db = KeyUtils.getDB(containerData, conf);
@@ -602,15 +606,13 @@ public class ContainerManagerImpl implements ContainerManager {
// issues.
ContainerStatus status = new ContainerStatus(containerData);
- containerMap.put(containerName, status);
+ containerMap.put(containerID, status);
}
@Override
- public void updateContainer(Pipeline pipeline, String containerName,
- ContainerData data, boolean forceUpdate)
- throws StorageContainerException {
- Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
- Preconditions.checkNotNull(containerName, "Container name cannot be null");
+ public void updateContainer(long containerID, ContainerData data,
+ boolean forceUpdate) throws StorageContainerException {
+ Preconditions.checkState(containerID >= 0, "Container ID cannot be negative.");
Preconditions.checkNotNull(data, "Container data cannot be null");
FileOutputStream containerStream = null;
DigestOutputStream dos = null;
@@ -618,9 +620,9 @@ public class ContainerManagerImpl implements ContainerManager {
File containerFileBK = null, containerFile = null;
boolean deleted = false;
- if(!containerMap.containsKey(containerName)) {
+ if(!containerMap.containsKey(containerID)) {
throw new StorageContainerException("Container doesn't exist. Name :"
- + containerName, CONTAINER_NOT_FOUND);
+ + containerID, CONTAINER_NOT_FOUND);
}
try {
@@ -633,7 +635,7 @@ public class ContainerManagerImpl implements ContainerManager {
try {
Path location = locationManager.getContainerPath();
- ContainerData orgData = containerMap.get(containerName).getContainer();
+ ContainerData orgData = containerMap.get(containerID).getContainer();
if (orgData == null) {
// updating a invalid container
throw new StorageContainerException("Update a container with invalid" +
@@ -642,7 +644,7 @@ public class ContainerManagerImpl implements ContainerManager {
if (!forceUpdate && !orgData.isOpen()) {
throw new StorageContainerException(
- "Update a closed container is not allowed. Name: " + containerName,
+ "Update a closed container is not allowed. ID: " + containerID,
UNSUPPORTED_REQUEST);
}
@@ -652,7 +654,7 @@ public class ContainerManagerImpl implements ContainerManager {
if (!forceUpdate) {
if (!containerFile.exists() || !containerFile.canWrite()) {
throw new StorageContainerException(
- "Container file not exists or corrupted. Name: " + containerName,
+ "Container file not exists or corrupted. ID: " + containerID,
CONTAINER_INTERNAL_ERROR);
}
@@ -672,7 +674,7 @@ public class ContainerManagerImpl implements ContainerManager {
// Update the in-memory map
ContainerStatus newStatus = new ContainerStatus(data);
- containerMap.replace(containerName, newStatus);
+ containerMap.replace(containerID, newStatus);
} catch (IOException e) {
// Restore the container file from backup
if(containerFileBK != null && containerFileBK.exists() && deleted) {
@@ -683,8 +685,8 @@ public class ContainerManagerImpl implements ContainerManager {
CONTAINER_INTERNAL_ERROR);
} else {
throw new StorageContainerException(
- "Failed to restore container data from the backup. Name: "
- + containerName, CONTAINER_INTERNAL_ERROR);
+ "Failed to restore container data from the backup. ID: "
+ + containerID, CONTAINER_INTERNAL_ERROR);
}
} else {
throw new StorageContainerException(
@@ -711,22 +713,22 @@ public class ContainerManagerImpl implements ContainerManager {
/**
* Checks if a container exists.
*
- * @param containerName - Name of the container.
+ * @param containerID - ID of the container.
* @return true if the container is open false otherwise.
* @throws StorageContainerException - Throws Exception if we are not able to
* find the container.
*/
@Override
- public boolean isOpen(String containerName) throws StorageContainerException {
- final ContainerStatus status = containerMap.get(containerName);
+ public boolean isOpen(long containerID) throws StorageContainerException {
+ final ContainerStatus status = containerMap.get(containerID);
if (status == null) {
throw new StorageContainerException(
- "Container status not found: " + containerName, CONTAINER_NOT_FOUND);
+ "Container status not found: " + containerID, CONTAINER_NOT_FOUND);
}
final ContainerData cData = status.getContainer();
if (cData == null) {
throw new StorageContainerException(
- "Container not found: " + containerName, CONTAINER_NOT_FOUND);
+ "Container not found: " + containerID, CONTAINER_NOT_FOUND);
}
return cData.isOpen();
}
@@ -746,7 +748,7 @@ public class ContainerManagerImpl implements ContainerManager {
@VisibleForTesting
- public ConcurrentSkipListMap<String, ContainerStatus> getContainerMap() {
+ public ConcurrentSkipListMap<Long, ContainerStatus> getContainerMap() {
return containerMap;
}
@@ -901,7 +903,7 @@ public class ContainerManagerImpl implements ContainerManager {
for (ContainerStatus container: containers) {
StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
- ciBuilder.setContainerName(container.getContainer().getContainerName())
+ ciBuilder.setContainerID(container.getContainer().getContainerID())
.setSize(container.getContainer().getMaxSize())
.setUsed(container.getContainer().getBytesUsed())
.setKeyCount(container.getContainer().getKeyCount())
@@ -966,7 +968,7 @@ public class ContainerManagerImpl implements ContainerManager {
}
@Override
- public void incrPendingDeletionBlocks(int numBlocks, String containerId) {
+ public void incrPendingDeletionBlocks(int numBlocks, long containerId) {
writeLock();
try {
ContainerStatus status = containerMap.get(containerId);
@@ -977,7 +979,7 @@ public class ContainerManagerImpl implements ContainerManager {
}
@Override
- public void decrPendingDeletionBlocks(int numBlocks, String containerId) {
+ public void decrPendingDeletionBlocks(int numBlocks, long containerId) {
writeLock();
try {
ContainerStatus status = containerMap.get(containerId);
@@ -990,35 +992,35 @@ public class ContainerManagerImpl implements ContainerManager {
/**
* Increase the read count of the container.
*
- * @param containerName - Name of the container.
+ * @param containerId - ID of the container.
*/
@Override
- public void incrReadCount(String containerName) {
- ContainerStatus status = containerMap.get(containerName);
+ public void incrReadCount(long containerId) {
+ ContainerStatus status = containerMap.get(containerId);
status.incrReadCount();
}
- public long getReadCount(String containerName) {
- ContainerStatus status = containerMap.get(containerName);
+ public long getReadCount(long containerId) {
+ ContainerStatus status = containerMap.get(containerId);
return status.getReadCount();
}
/**
* Increse the read counter for bytes read from the container.
*
- * @param containerName - Name of the container.
+ * @param containerId - ID of the container.
* @param readBytes - bytes read from the container.
*/
@Override
- public void incrReadBytes(String containerName, long readBytes) {
- ContainerStatus status = containerMap.get(containerName);
+ public void incrReadBytes(long containerId, long readBytes) {
+ ContainerStatus status = containerMap.get(containerId);
status.incrReadBytes(readBytes);
}
- public long getReadBytes(String containerName) {
+ public long getReadBytes(long containerId) {
readLock();
try {
- ContainerStatus status = containerMap.get(containerName);
+ ContainerStatus status = containerMap.get(containerId);
return status.getReadBytes();
} finally {
readUnlock();
@@ -1028,76 +1030,76 @@ public class ContainerManagerImpl implements ContainerManager {
/**
* Increase the write count of the container.
*
- * @param containerName - Name of the container.
+ * @param containerId - Name of the container.
*/
@Override
- public void incrWriteCount(String containerName) {
- ContainerStatus status = containerMap.get(containerName);
+ public void incrWriteCount(long containerId) {
+ ContainerStatus status = containerMap.get(containerId);
status.incrWriteCount();
}
- public long getWriteCount(String containerName) {
- ContainerStatus status = containerMap.get(containerName);
+ public long getWriteCount(long containerId) {
+ ContainerStatus status = containerMap.get(containerId);
return status.getWriteCount();
}
/**
* Increse the write counter for bytes write into the container.
*
- * @param containerName - Name of the container.
+ * @param containerId - ID of the container.
* @param writeBytes - bytes write into the container.
*/
@Override
- public void incrWriteBytes(String containerName, long writeBytes) {
- ContainerStatus status = containerMap.get(containerName);
+ public void incrWriteBytes(long containerId, long writeBytes) {
+ ContainerStatus status = containerMap.get(containerId);
status.incrWriteBytes(writeBytes);
}
- public long getWriteBytes(String containerName) {
- ContainerStatus status = containerMap.get(containerName);
+ public long getWriteBytes(long containerId) {
+ ContainerStatus status = containerMap.get(containerId);
return status.getWriteBytes();
}
/**
* Increase the bytes used by the container.
*
- * @param containerName - Name of the container.
+ * @param containerId - ID of the container.
* @param used - additional bytes used by the container.
* @return the current bytes used.
*/
@Override
- public long incrBytesUsed(String containerName, long used) {
- ContainerStatus status = containerMap.get(containerName);
+ public long incrBytesUsed(long containerId, long used) {
+ ContainerStatus status = containerMap.get(containerId);
return status.incrBytesUsed(used);
}
/**
* Decrease the bytes used by the container.
*
- * @param containerName - Name of the container.
+ * @param containerId - ID of the container.
* @param used - additional bytes reclaimed by the container.
* @return the current bytes used.
*/
@Override
- public long decrBytesUsed(String containerName, long used) {
- ContainerStatus status = containerMap.get(containerName);
+ public long decrBytesUsed(long containerId, long used) {
+ ContainerStatus status = containerMap.get(containerId);
return status.decrBytesUsed(used);
}
- public long getBytesUsed(String containerName) {
- ContainerStatus status = containerMap.get(containerName);
+ public long getBytesUsed(long containerId) {
+ ContainerStatus status = containerMap.get(containerId);
return status.getBytesUsed();
}
/**
* Get the number of keys in the container.
*
- * @param containerName - Name of the container.
+ * @param containerId - ID of the container.
* @return the current key count.
*/
@Override
- public long getNumKeys(String containerName) {
- ContainerStatus status = containerMap.get(containerName);
+ public long getNumKeys(long containerId) {
+ ContainerStatus status = containerMap.get(containerId);
return status.getNumKeys(); }
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
index d319565..46bd842 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Type;
+import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
@@ -186,7 +187,7 @@ public class Dispatcher implements ContainerDispatcher {
} catch (IOException ex) {
LOG.warn("Container operation failed. " +
"Container: {} Operation: {} trace ID: {} Error: {}",
- msg.getCreateContainer().getContainerData().getName(),
+ msg.getCreateContainer().getContainerData().getContainerID(),
msg.getCmdType().name(),
msg.getTraceID(),
ex.toString(), ex);
@@ -230,7 +231,7 @@ public class Dispatcher implements ContainerDispatcher {
} catch (IOException ex) {
LOG.warn("Container operation failed. " +
"Container: {} Operation: {} trace ID: {} Error: {}",
- msg.getCreateContainer().getContainerData().getName(),
+ msg.getCreateContainer().getContainerData().getContainerID(),
msg.getCmdType().name(),
msg.getTraceID(),
ex.toString(), ex);
@@ -273,7 +274,7 @@ public class Dispatcher implements ContainerDispatcher {
} catch (IOException ex) {
LOG.warn("Container operation failed. " +
"Container: {} Operation: {} trace ID: {} Error: {}",
- msg.getCreateContainer().getContainerData().getName(),
+ msg.getCreateContainer().getContainerData().getContainerID(),
msg.getCmdType().name(),
msg.getTraceID(),
ex.toString(), ex);
@@ -318,17 +319,14 @@ public class Dispatcher implements ContainerDispatcher {
msg.getTraceID());
return ContainerUtils.malformedRequest(msg);
}
-
- Pipeline pipeline = Pipeline.getFromProtoBuf(
- msg.getUpdateContainer().getPipeline());
- String containerName = msg.getUpdateContainer()
- .getContainerData().getName();
+ long containerID = msg.getUpdateContainer()
+ .getContainerData().getContainerID();
ContainerData data = ContainerData.getFromProtBuf(
msg.getUpdateContainer().getContainerData(), conf);
boolean forceUpdate = msg.getUpdateContainer().getForceUpdate();
- this.containerManager.updateContainer(
- pipeline, containerName, data, forceUpdate);
+ this.containerManager.updateContainer(containerID,
+ data, forceUpdate);
return ContainerUtils.getContainerResponse(msg);
}
@@ -349,8 +347,9 @@ public class Dispatcher implements ContainerDispatcher {
return ContainerUtils.malformedRequest(msg);
}
- String name = msg.getReadContainer().getName();
- ContainerData container = this.containerManager.readContainer(name);
+ long containerID = msg.getReadContainer().getContainerID();
+ ContainerData container = this.containerManager.
+ readContainer(containerID);
return ContainerUtils.getReadContainerResponse(msg, container);
}
@@ -370,12 +369,9 @@ public class Dispatcher implements ContainerDispatcher {
return ContainerUtils.malformedRequest(msg);
}
- Pipeline pipeline = Pipeline.getFromProtoBuf(
- msg.getDeleteContainer().getPipeline());
- Preconditions.checkNotNull(pipeline);
- String name = msg.getDeleteContainer().getName();
+ long containerID = msg.getDeleteContainer().getContainerID();
boolean forceDelete = msg.getDeleteContainer().getForceDelete();
- this.containerManager.deleteContainer(pipeline, name, forceDelete);
+ this.containerManager.deleteContainer(containerID, forceDelete);
return ContainerUtils.getContainerResponse(msg);
}
@@ -401,7 +397,7 @@ public class Dispatcher implements ContainerDispatcher {
msg.getCreateContainer().getPipeline());
Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
- this.containerManager.createContainer(pipeline, cData);
+ this.containerManager.createContainer(cData);
return ContainerUtils.getContainerResponse(msg);
}
@@ -420,14 +416,12 @@ public class Dispatcher implements ContainerDispatcher {
msg.getTraceID());
return ContainerUtils.malformedRequest(msg);
}
- Pipeline pipeline = Pipeline.getFromProtoBuf(msg.getCloseContainer()
- .getPipeline());
- Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
- if (!this.containerManager.isOpen(pipeline.getContainerName())) {
+ long containerID = msg.getCloseContainer().getContainerID();
+ if (!this.containerManager.isOpen(containerID)) {
throw new StorageContainerException("Attempting to close a closed " +
"container.", CLOSED_CONTAINER_IO);
}
- this.containerManager.closeContainer(pipeline.getContainerName());
+ this.containerManager.closeContainer(containerID);
return ContainerUtils.getContainerResponse(msg);
} catch (NoSuchAlgorithmException e) {
throw new StorageContainerException("No such Algorithm", e,
@@ -449,11 +443,9 @@ public class Dispatcher implements ContainerDispatcher {
msg.getTraceID());
return ContainerUtils.malformedRequest(msg);
}
- String keyName = msg.getWriteChunk().getKeyName();
- Pipeline pipeline = Pipeline.getFromProtoBuf(
- msg.getWriteChunk().getPipeline());
- Preconditions.checkNotNull(pipeline);
- if (!this.containerManager.isOpen(pipeline.getContainerName())) {
+ BlockID blockID = BlockID.getFromProtobuf(
+ msg.getWriteChunk().getBlockID());
+ if (!this.containerManager.isOpen(blockID.getContainerID())) {
throw new StorageContainerException("Write to closed container.",
CLOSED_CONTAINER_IO);
}
@@ -469,7 +461,7 @@ public class Dispatcher implements ContainerDispatcher {
}
this.containerManager.getChunkManager()
- .writeChunk(pipeline, keyName, chunkInfo,
+ .writeChunk(blockID, chunkInfo,
data, msg.getWriteChunk().getStage());
return ChunkUtils.getChunkResponse(msg);
@@ -489,17 +481,13 @@ public class Dispatcher implements ContainerDispatcher {
msg.getTraceID());
return ContainerUtils.malformedRequest(msg);
}
-
- String keyName = msg.getReadChunk().getKeyName();
- Pipeline pipeline = Pipeline.getFromProtoBuf(
- msg.getReadChunk().getPipeline());
- Preconditions.checkNotNull(pipeline);
-
+ BlockID blockID = BlockID.getFromProtobuf(
+ msg.getReadChunk().getBlockID());
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getReadChunk()
.getChunkData());
Preconditions.checkNotNull(chunkInfo);
- byte[] data = this.containerManager.getChunkManager().readChunk(pipeline,
- keyName, chunkInfo);
+ byte[] data = this.containerManager.getChunkManager().
+ readChunk(blockID, chunkInfo);
metrics.incContainerBytesStats(Type.ReadChunk, data.length);
return ChunkUtils.getReadChunkResponse(msg, data, chunkInfo);
}
@@ -519,11 +507,10 @@ public class Dispatcher implements ContainerDispatcher {
return ContainerUtils.malformedRequest(msg);
}
- String keyName = msg.getDeleteChunk().getKeyName();
- Pipeline pipeline = Pipeline.getFromProtoBuf(
- msg.getDeleteChunk().getPipeline());
- Preconditions.checkNotNull(pipeline);
- if (!this.containerManager.isOpen(pipeline.getContainerName())) {
+ BlockID blockID = BlockID.getFromProtobuf(msg.getDeleteChunk()
+ .getBlockID());
+ long containerID = blockID.getContainerID();
+ if (!this.containerManager.isOpen(containerID)) {
throw new StorageContainerException("Write to closed container.",
CLOSED_CONTAINER_IO);
}
@@ -531,7 +518,7 @@ public class Dispatcher implements ContainerDispatcher {
.getChunkData());
Preconditions.checkNotNull(chunkInfo);
- this.containerManager.getChunkManager().deleteChunk(pipeline, keyName,
+ this.containerManager.getChunkManager().deleteChunk(blockID,
chunkInfo);
return ChunkUtils.getChunkResponse(msg);
}
@@ -550,15 +537,16 @@ public class Dispatcher implements ContainerDispatcher {
msg.getTraceID());
return ContainerUtils.malformedRequest(msg);
}
- Pipeline pipeline = Pipeline.getFromProtoBuf(msg.getPutKey().getPipeline());
- Preconditions.checkNotNull(pipeline);
- if (!this.containerManager.isOpen(pipeline.getContainerName())) {
+ BlockID blockID = BlockID.getFromProtobuf(
+ msg.getPutKey().getKeyData().getBlockID());
+ long containerID = blockID.getContainerID();
+ if (!this.containerManager.isOpen(containerID)) {
throw new StorageContainerException("Write to closed container.",
CLOSED_CONTAINER_IO);
}
KeyData keyData = KeyData.getFromProtoBuf(msg.getPutKey().getKeyData());
Preconditions.checkNotNull(keyData);
- this.containerManager.getKeyManager().putKey(pipeline, keyData);
+ this.containerManager.getKeyManager().putKey(keyData);
long numBytes = keyData.getProtoBufMessage().toByteArray().length;
metrics.incContainerBytesStats(Type.PutKey, numBytes);
return KeyUtils.getKeyResponse(msg);
@@ -601,17 +589,15 @@ public class Dispatcher implements ContainerDispatcher {
msg.getTraceID());
return ContainerUtils.malformedRequest(msg);
}
- Pipeline pipeline =
- Pipeline.getFromProtoBuf(msg.getDeleteKey().getPipeline());
- Preconditions.checkNotNull(pipeline);
- if (!this.containerManager.isOpen(pipeline.getContainerName())) {
+ BlockID blockID = BlockID.getFromProtobuf(msg.getDeleteKey()
+ .getBlockID());
+ Preconditions.checkNotNull(blockID);
+ long containerID = blockID.getContainerID();
+ if (!this.containerManager.isOpen(containerID)) {
throw new StorageContainerException("Write to closed container.",
CLOSED_CONTAINER_IO);
}
- String keyName = msg.getDeleteKey().getName();
- Preconditions.checkNotNull(keyName);
- Preconditions.checkState(!keyName.isEmpty());
- this.containerManager.getKeyManager().deleteKey(pipeline, keyName);
+ this.containerManager.getKeyManager().deleteKey(blockID);
return KeyUtils.getKeyResponse(msg);
}
@@ -632,12 +618,11 @@ public class Dispatcher implements ContainerDispatcher {
}
try {
- Pipeline pipeline =
- Pipeline.getFromProtoBuf(msg.getPutSmallFile()
- .getKey().getPipeline());
+ BlockID blockID = BlockID.getFromProtobuf(msg.
+ getPutSmallFile().getKey().getKeyData().getBlockID());
+ long containerID = blockID.getContainerID();
- Preconditions.checkNotNull(pipeline);
- if (!this.containerManager.isOpen(pipeline.getContainerName())) {
+ if (!this.containerManager.isOpen(containerID)) {
throw new StorageContainerException("Write to closed container.",
CLOSED_CONTAINER_IO);
}
@@ -648,12 +633,12 @@ public class Dispatcher implements ContainerDispatcher {
byte[] data = msg.getPutSmallFile().getData().toByteArray();
metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
- this.containerManager.getChunkManager().writeChunk(pipeline, keyData
- .getKeyName(), chunkInfo, data, ContainerProtos.Stage.COMBINED);
+ this.containerManager.getChunkManager().writeChunk(blockID,
+ chunkInfo, data, ContainerProtos.Stage.COMBINED);
List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
chunks.add(chunkInfo.getProtoBufMessage());
keyData.setChunks(chunks);
- this.containerManager.getKeyManager().putKey(pipeline, keyData);
+ this.containerManager.getKeyManager().putKey(keyData);
return FileUtils.getPutFileResponse(msg);
} catch (StorageContainerException e) {
return ContainerUtils.logAndReturnError(LOG, e, msg);
@@ -680,12 +665,7 @@ public class Dispatcher implements ContainerDispatcher {
return ContainerUtils.malformedRequest(msg);
}
try {
- Pipeline pipeline =
- Pipeline.getFromProtoBuf(msg.getGetSmallFile()
- .getKey().getPipeline());
-
long bytes = 0;
- Preconditions.checkNotNull(pipeline);
KeyData keyData = KeyData.getFromProtoBuf(msg.getGetSmallFile()
.getKey().getKeyData());
KeyData data = this.containerManager.getKeyManager().getKey(keyData);
@@ -694,9 +674,8 @@ public class Dispatcher implements ContainerDispatcher {
bytes += chunk.getSerializedSize();
ByteString current =
ByteString.copyFrom(this.containerManager.getChunkManager()
- .readChunk(
- pipeline, keyData.getKeyName(), ChunkInfo.getFromProtoBuf(
- chunk)));
+ .readChunk(keyData.getBlockID(),
+ ChunkInfo.getFromProtoBuf(chunk)));
dataBuf = dataBuf.concat(current);
c = chunk;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org