You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/08/23 19:39:14 UTC
hadoop git commit: HDFS-11888. Ozone: SCM: use state machine for open
containers allocated for key/blocks. Contributed by Xiaoyu Yao.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7240 076bd5d64 -> 8333602a9
HDFS-11888. Ozone: SCM: use state machine for open containers allocated for key/blocks. Contributed by Xiaoyu Yao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8333602a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8333602a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8333602a
Branch: refs/heads/HDFS-7240
Commit: 8333602a9967b3c598adc74544152dd99ea5869b
Parents: 076bd5d
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Wed Aug 23 12:37:09 2017 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Wed Aug 23 12:37:09 2017 -0700
----------------------------------------------------------------------
.../ozone/client/io/ChunkGroupOutputStream.java | 43 ++--
.../org/apache/hadoop/scm/ScmConfigKeys.java | 2 +-
.../scm/client/ContainerOperationClient.java | 19 ++
.../common/helpers/BlockContainerInfo.java | 45 ++++
.../container/common/helpers/ContainerInfo.java | 109 +++++++++
.../StorageContainerLocationProtocol.java | 18 +-
...rLocationProtocolClientSideTranslatorPB.java | 28 ++-
.../src/main/proto/Ozone.proto | 49 +++-
.../StorageContainerLocationProtocol.proto | 26 +-
...rLocationProtocolServerSideTranslatorPB.java | 18 +-
.../ozone/scm/StorageContainerManager.java | 36 ++-
.../ozone/scm/block/BlockManagerImpl.java | 241 ++++++++++++++-----
.../org/apache/hadoop/ozone/scm/cli/SQLCLI.java | 13 +-
.../ozone/scm/container/ContainerMapping.java | 150 ++++++++++--
.../hadoop/ozone/scm/container/Mapping.java | 22 +-
.../ozone/scm/exceptions/SCMException.java | 1 +
.../StorageContainerDatanodeProtocol.proto | 6 +-
.../ozone/client/rpc/TestOzoneRpcClient.java | 27 ++-
.../scm/container/TestContainerMapping.java | 19 +-
.../ozone/scm/node/TestContainerPlacement.java | 2 +-
20 files changed, 708 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
index 2cc12f4..ca81324 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@@ -37,8 +38,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
/**
* Maintaining a list of ChunkInputStream. Write based on offset.
@@ -60,10 +59,6 @@ public class ChunkGroupOutputStream extends OutputStream {
private long totalSize;
private long byteOffset;
- //This has to be removed once HDFS-11888 is resolved.
- //local cache which will have list of created container names.
- private static Set<String> containersCreated = new HashSet<>();
-
public ChunkGroupOutputStream() {
this.streamEntries = new ArrayList<>();
this.currentStreamIndex = 0;
@@ -293,27 +288,21 @@ public class ChunkGroupOutputStream extends OutputStream {
XceiverClientSpi xceiverClient =
xceiverClientManager.acquireClient(pipeline);
// create container if needed
- // TODO : should be subKeyInfo.getShouldCreateContainer(), but for now
- //The following change has to reverted once HDFS-11888 is fixed.
- if(!containersCreated.contains(containerName)) {
- synchronized (containerName.intern()) {
- //checking again, there is a chance that some other thread has
- // created it.
- if (!containersCreated.contains(containerName)) {
- LOG.debug("Need to create container {}.", containerName);
- try {
- ContainerProtocolCalls.createContainer(xceiverClient, requestId);
- } catch (StorageContainerException ex) {
- if (ex.getResult().equals(Result.CONTAINER_EXISTS)) {
- //container already exist.
- LOG.debug("Container {} already exists.", containerName);
- } else {
- LOG.error("Container creation failed for {}.",
- containerName, ex);
- throw ex;
- }
- }
- containersCreated.add(containerName);
+ if (subKeyInfo.getShouldCreateContainer()) {
+ try {
+ // Block manager sets the container creation stage begin.
+ ContainerProtocolCalls.createContainer(xceiverClient, requestId);
+ storageContainerLocationClient.notifyObjectCreationStage(
+ NotifyObjectCreationStageRequestProto.Type.container,
+ containerName,
+ NotifyObjectCreationStageRequestProto.Stage.complete);
+ } catch (StorageContainerException ex) {
+ if (ex.getResult().equals(Result.CONTAINER_EXISTS)) {
+ //container already exist, this should never happen
+ LOG.debug("Container {} already exists.", containerName);
+ } else {
+ LOG.error("Container creation failed for {}.", containerName, ex);
+ throw ex;
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
index 73d81f8..0b081a1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
@@ -187,7 +187,7 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE =
"ozone.scm.container.provision_batch_size";
- public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 1;
+ public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 5;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
index a90cff4..3c1a266 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.scm.client;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.XceiverClientManager;
@@ -86,12 +87,20 @@ public class ContainerOperationClient implements ScmClient {
client = xceiverClientManager.acquireClient(pipeline);
String traceID = UUID.randomUUID().toString();
+ storageContainerLocationClient.notifyObjectCreationStage(
+ NotifyObjectCreationStageRequestProto.Type.container,
+ containerId,
+ NotifyObjectCreationStageRequestProto.Stage.begin);
ContainerProtocolCalls.createContainer(client, traceID);
if (LOG.isDebugEnabled()) {
LOG.debug("Created container " + containerId
+ " leader:" + pipeline.getLeader()
+ " machines:" + pipeline.getMachines());
}
+ storageContainerLocationClient.notifyObjectCreationStage(
+ NotifyObjectCreationStageRequestProto.Type.container,
+ containerId,
+ NotifyObjectCreationStageRequestProto.Stage.complete);
return pipeline;
} finally {
if (client != null) {
@@ -116,11 +125,21 @@ public class ContainerOperationClient implements ScmClient {
// connect to pipeline leader and allocate container on leader datanode.
client = xceiverClientManager.acquireClient(pipeline);
String traceID = UUID.randomUUID().toString();
+ storageContainerLocationClient.notifyObjectCreationStage(
+ NotifyObjectCreationStageRequestProto.Type.container,
+ containerId,
+ NotifyObjectCreationStageRequestProto.Stage.begin);
+
ContainerProtocolCalls.createContainer(client, traceID);
LOG.info("Created container " + containerId +
" leader:" + pipeline.getLeader() +
" machines:" + pipeline.getMachines() +
" replication factor:" + factor);
+
+ storageContainerLocationClient.notifyObjectCreationStage(
+ NotifyObjectCreationStageRequestProto.Type.container,
+ containerId,
+ NotifyObjectCreationStageRequestProto.Stage.complete);
return pipeline;
} finally {
if (client != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/BlockContainerInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/BlockContainerInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/BlockContainerInfo.java
new file mode 100644
index 0000000..73c8814
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/BlockContainerInfo.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.scm.container.common.helpers;
+
+/**
+ * Class wraps container + allocated info for containers managed by block svc.
+ */
+public class BlockContainerInfo extends ContainerInfo{
+ private long allocated;
+
+ public BlockContainerInfo(ContainerInfo container, long used) {
+ super(container);
+ this.allocated = used;
+ }
+
+ public long addAllocated(long size) {
+ allocated += size;
+ return allocated;
+ }
+
+ public long subtractAllocated(long size) {
+ allocated -= size;
+ return allocated;
+ }
+
+ public long getAllocated() {
+ return this.allocated;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/ContainerInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/ContainerInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/ContainerInfo.java
new file mode 100644
index 0000000..16bc0db
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/ContainerInfo.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.scm.container.common.helpers;
+
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.util.Time;
+
+/**
+ * Class wraps ozone container info.
+ */
+public class ContainerInfo {
+ private OzoneProtos.LifeCycleState state;
+ private Pipeline pipeline;
+ // The wall-clock ms since the epoch at which the current state enters.
+ private long stateEnterTime;
+
+ ContainerInfo(OzoneProtos.LifeCycleState state, Pipeline pipeline,
+ long stateEnterTime) {
+ this.pipeline = pipeline;
+ this.state = state;
+ this.stateEnterTime = stateEnterTime;
+ }
+
+ public ContainerInfo(ContainerInfo container) {
+ this.pipeline = container.getPipeline();
+ this.state = container.getState();
+ this.stateEnterTime = container.getStateEnterTime();
+ }
+
+ /**
+ * Update the current container state and state enter time to now.
+ * @param state
+ */
+ public void setState(OzoneProtos.LifeCycleState state) {
+ this.state = state;
+ this.stateEnterTime = Time.monotonicNow();
+ }
+
+ public OzoneProtos.LifeCycleState getState() {
+ return state;
+ }
+
+ public long getStateEnterTime() {
+ return stateEnterTime;
+ }
+
+ public Pipeline getPipeline() {
+ return pipeline;
+ }
+
+ public OzoneProtos.SCMContainerInfo getProtobuf() {
+ OzoneProtos.SCMContainerInfo.Builder builder =
+ OzoneProtos.SCMContainerInfo.newBuilder();
+ builder.setPipeline(getPipeline().getProtobufMessage());
+ builder.setState(state);
+ builder.setStateEnterTime(stateEnterTime);
+ return builder.build();
+ }
+
+ public static ContainerInfo fromProtobuf(
+ OzoneProtos.SCMContainerInfo info) {
+ ContainerInfo.Builder builder = new ContainerInfo.Builder();
+ builder.setPipeline(Pipeline.getFromProtoBuf(info.getPipeline()));
+ builder.setState(info.getState());
+ builder.setStateEnterTime(info.getStateEnterTime());
+ return builder.build();
+ }
+
+ public static class Builder {
+ private OzoneProtos.LifeCycleState state;
+ private Pipeline pipeline;
+ private long stateEnterTime;
+
+ public Builder setState(OzoneProtos.LifeCycleState state) {
+ this.state = state;
+ return this;
+ }
+
+ public Builder setPipeline(Pipeline pipeline) {
+ this.pipeline = pipeline;
+ return this;
+ }
+
+ public Builder setStateEnterTime(long stateEnterTime) {
+ this.stateEnterTime = stateEnterTime;
+ return this;
+ }
+
+ public ContainerInfo build() {
+ return new ContainerInfo(state, pipeline, stateEnterTime);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java
index ea0893e..94134d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
@@ -35,8 +36,8 @@ public interface StorageContainerLocationProtocol {
*
*/
Pipeline allocateContainer(OzoneProtos.ReplicationType replicationType,
- OzoneProtos.ReplicationFactor factor,
- String containerName) throws IOException;
+ OzoneProtos.ReplicationFactor factor, String containerName)
+ throws IOException;
/**
* Ask SCM the location of the container. SCM responds with a group of
@@ -86,6 +87,18 @@ public interface StorageContainerLocationProtocol {
OzoneProtos.QueryScope queryScope, String poolName) throws IOException;
/**
+ * Notify from client when begin or finish creating objects like pipeline
+ * or containers on datanodes.
+ * Container will be in Operational state after that.
+ * @param type object type
+ * @param name object name
+ * @param stage creation stage
+ */
+ void notifyObjectCreationStage(
+ NotifyObjectCreationStageRequestProto.Type type, String name,
+ NotifyObjectCreationStageRequestProto.Stage stage) throws IOException;
+
+ /**
* Creates a replication pipeline of a specified type.
* @param type - replication type
* @param factor - factor 1 or 3
@@ -95,5 +108,4 @@ public interface StorageContainerLocationProtocol {
Pipeline createReplicationPipeline(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor factor, OzoneProtos.NodePool nodePool)
throws IOException;
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 93cd0cf..8dc1c6c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -35,9 +35,9 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolPr
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
-
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.Closeable;
@@ -208,6 +208,32 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
}
/**
+ * Notify from client that creates object on datanodes.
+ * @param type object type
+ * @param name object name
+ * @param stage object creation stage : begin/complete
+ */
+ @Override
+ public void notifyObjectCreationStage(
+ NotifyObjectCreationStageRequestProto.Type type,
+ String name,
+ NotifyObjectCreationStageRequestProto.Stage stage) throws IOException {
+ Preconditions.checkState(!Strings.isNullOrEmpty(name),
+ "Object name cannot be null or empty");
+ NotifyObjectCreationStageRequestProto request =
+ NotifyObjectCreationStageRequestProto.newBuilder()
+ .setType(type)
+ .setName(name)
+ .setStage(stage)
+ .build();
+ try {
+ rpcProxy.notifyObjectCreationStage(NULL_RPC_CONTROLLER, request);
+ } catch(ServiceException e){
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ /**
* Creates a replication pipeline of a specified type.
*
* @param replicationType - replication type
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
index 50926c2..36c3736 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
@@ -35,7 +35,7 @@ message Pipeline {
required string leaderID = 1;
repeated DatanodeIDProto members = 2;
required string containerName = 3;
- optional LifeCycleStates state = 4 [default = OPERATIONAL];
+ optional LifeCycleState state = 4 [default = OPEN];
}
message KeyValue {
@@ -72,6 +72,45 @@ message NodePool {
repeated Node nodes = 1;
}
+/**
+ * LifeCycleState for SCM object creation state machine:
+ * ->Allocated: allocated on SCM but clean has not started creating it yet.
+ * ->Creating: allocated and assigned to client to create but not ack-ed yet.
+ * ->Open: allocated on SCM and created on datanodes and ack-ed by a client.
+ * ->Close: container closed due to space all used or error?
+ * ->Timeout -> container failed to create on datanodes or ack-ed by client.
+ * ->Deleting(TBD) -> container will be deleted after timeout
+ * 1. ALLOCATE-ed containers on SCM can't serve key/block related operation
+ * until ACK-ed explicitly which changes the state to OPEN.
+ * 2. Only OPEN/CLOSED containers can serve key/block related operation.
+ * 3. ALLOCATE-ed containers that are not ACK-ed timely will be TIMEOUT and
+ * CLEANUP asynchronously.
+ */
+
+enum LifeCycleState {
+ ALLOCATED = 1;
+ CREATING = 2; // Used for container allocated/created by different client.
+ OPEN =3; // Mostly an update to SCM via HB or client call.
+ CLOSED = 4; // !!State after this has not been used yet.
+ DELETING = 5;
+ DELETED = 6; // object is deleted.
+}
+
+enum LifeCycleEvent {
+ BEGIN_CREATE = 1; // A request to client to create this object
+ COMPLETE_CREATE = 2;
+ CLOSE = 3; // !!Event after this has not been used yet.
+ UPDATE = 4;
+ TIMEOUT = 5; // creation has timed out from SCM's View.
+ DELETE = 6;
+ CLEANUP = 7;
+}
+
+message SCMContainerInfo {
+ required LifeCycleState state = 1;
+ required Pipeline pipeline = 2;
+ optional int64 stateEnterTime = 3;
+}
enum ReplicationType {
RATIS = 1;
@@ -79,15 +118,7 @@ enum ReplicationType {
CHAINED = 3;
}
-
enum ReplicationFactor {
ONE = 1;
THREE = 3;
-}
-
-enum LifeCycleStates {
- CLIENT_CREATE = 1; // A request to client to create this object
- OPERATIONAL = 2; // Mostly an update to SCM via HB or client call.
- TIMED_OUT = 3; // creation has timed out from SCM's View.
- DELETED = 4; // object is deleted.
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
index 30c7166..550f6a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
@@ -82,6 +82,24 @@ message DeleteContainerResponseProto {
// Empty response
}
+message NotifyObjectCreationStageRequestProto {
+ enum Type {
+ container = 1;
+ pipeline = 2;
+ }
+ enum Stage {
+ begin = 1;
+ complete = 2;
+ }
+ required string name = 1;
+ required Type type = 2;
+ required Stage stage = 3;
+}
+
+message NotifyObjectCreationStageResponseProto {
+ // Empty response
+}
+
/*
NodeQueryRequest sends a request to SCM asking to send a list of nodes that
match the NodeState that we are requesting.
@@ -160,7 +178,12 @@ service StorageContainerLocationProtocolService {
*/
rpc queryNode(NodeQueryRequestProto) returns (NodeQueryResponseProto);
- /*
+ /**
+ * Notify from client when begin or finish creating container or pipeline on datanodes.
+ */
+ rpc notifyObjectCreationStage(NotifyObjectCreationStageRequestProto) returns (NotifyObjectCreationStageResponseProto);
+
+ /*
* Apis that Manage Pipelines.
*
* Pipelines are abstractions offered by SCM and Datanode that allows users
@@ -175,5 +198,4 @@ service StorageContainerLocationProtocolService {
*/
rpc allocatePipeline(PipelineRequestProto)
returns (PipelineResponseProto);
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 628de42..f12aafb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
-import static org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerResponseProto;
@@ -39,9 +39,10 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolPr
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.DeleteContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto;
-
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
@@ -160,6 +161,19 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
}
@Override
+ public NotifyObjectCreationStageResponseProto notifyObjectCreationStage(
+ RpcController controller, NotifyObjectCreationStageRequestProto request)
+ throws ServiceException {
+ try {
+ impl.notifyObjectCreationStage(request.getType(), request.getName(),
+ request.getStage());
+ return NotifyObjectCreationStageResponseProto.newBuilder().build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
public PipelineResponseProto allocatePipeline(
RpcController controller, PipelineRequestProto request)
throws ServiceException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
index e320983..fa14ad0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.ozone.protocolPB.ScmBlockLocationProtocolServerSideTranslatorPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
@@ -65,6 +66,7 @@ import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
@@ -373,7 +375,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
@Override
public Pipeline getContainer(String containerName) throws IOException {
checkAdminAccess();
- return scmContainerManager.getContainer(containerName);
+ return scmContainerManager.getContainer(containerName).getPipeline();
}
/**
@@ -425,6 +427,34 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
}
/**
+ * Notify from client when begin/finish creating container/pipeline objects
+ * on datanodes.
+ * @param type
+ * @param name
+ * @param stage
+ */
+ @Override
+ public void notifyObjectCreationStage(
+ NotifyObjectCreationStageRequestProto.Type type, String name,
+ NotifyObjectCreationStageRequestProto.Stage stage) throws IOException {
+
+ if (type == NotifyObjectCreationStageRequestProto.Type.container) {
+ ContainerInfo info = scmContainerManager.getContainer(name);
+ LOG.info("Container {} current state {} new stage {}", name,
+ info.getState(), stage);
+ if (stage == NotifyObjectCreationStageRequestProto.Stage.begin) {
+ scmContainerManager.updateContainerState(name,
+ OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
+ } else {
+ scmContainerManager.updateContainerState(name,
+ OzoneProtos.LifeCycleEvent.COMPLETE_CREATE);
+ }
+ } else if (type == NotifyObjectCreationStageRequestProto.Type.pipeline) {
+ // TODO: pipeline state update will be addressed in future patch.
+ }
+ }
+
+ /**
* Creates a replication pipeline of a specified type.
*/
@Override
@@ -503,7 +533,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
*
* @param containerName - Name of the container.
* @param replicationFactor - replication factor.
- * @return Pipeline.
+ * @return pipeline
* @throws IOException
*/
@Override
@@ -512,7 +542,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
throws IOException {
checkAdminAccess();
return scmContainerManager.allocateContainer(replicationType,
- replicationFactor, containerName);
+ replicationFactor, containerName).getPipeline();
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
index e000ccc..5730589 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.scm.block;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.metrics2.util.MBeans;
@@ -29,6 +30,8 @@ import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.utils.BatchOperation;
@@ -83,8 +86,11 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
private final long containerSize;
private final long cacheSize;
- private final MetadataStore openContainerStore;
- private Map<String, Long> openContainers;
+ // Track all containers owned by block service.
+ private final MetadataStore containerStore;
+
+ private Map<OzoneProtos.LifeCycleState,
+ Map<String, BlockContainerInfo>> containers;
private final int containerProvisionBatchSize;
private final Random rand;
private final ObjectName mxBean;
@@ -121,14 +127,13 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
// Load store of all open contains for block allocation
File openContainsDbPath = new File(scmMetaDataDir, OPEN_CONTAINERS_DB);
- openContainerStore = MetadataStoreBuilder.newBuilder()
+ containerStore = MetadataStoreBuilder.newBuilder()
.setConf(conf)
.setDbFile(openContainsDbPath)
.setCacheSize(this.cacheSize * OzoneConsts.MB)
.build();
- openContainers = new ConcurrentHashMap<>();
- loadOpenContainers();
+ loadAllocatedContainers();
this.containerProvisionBatchSize = conf.getInt(
ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
@@ -141,20 +146,39 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
// TODO: close full (or almost full) containers with a separate thread.
/**
- * Load open containers from persistent store.
+ * Load allocated containers from persistent store.
* @throws IOException
*/
- private void loadOpenContainers() throws IOException {
+ private void loadAllocatedContainers() throws IOException {
+ // Pre-allocate empty map entry by state to avoid null check
+ containers = new ConcurrentHashMap<>();
+ for (OzoneProtos.LifeCycleState state :
+ OzoneProtos.LifeCycleState.values()) {
+ containers.put(state, new ConcurrentHashMap());
+ }
try {
- openContainerStore.iterate(null, (key, value) -> {
+ containerStore.iterate(null, (key, value) -> {
try {
String containerName = DFSUtil.bytes2String(key);
Long containerUsed = Long.parseLong(DFSUtil.bytes2String(value));
- openContainers.put(containerName, containerUsed);
- LOG.debug("Loading open container: {} used : {}", containerName,
- containerUsed);
+ ContainerInfo containerInfo =
+ containerManager.getContainer(containerName);
+ // TODO: remove the container from block manager's container DB
+ // Most likely the allocated container is timeout and cleaned up
+ // by SCM, we should clean up correspondingly instead of just skip it.
+ if (containerInfo == null) {
+ LOG.warn("Container {} allocated by block service" +
+ "can't be found in SCM", containerName);
+ return true;
+ }
+ Map<String, BlockContainerInfo> containersByState =
+ containers.get(containerInfo.getState());
+ containersByState.put(containerName,
+ new BlockContainerInfo(containerInfo, containerUsed));
+ LOG.debug("Loading allocated container: {} used : {} state: {}",
+ containerName, containerUsed, containerInfo.getState());
} catch (Exception e) {
- LOG.warn("Failed loading open container, continue next...");
+ LOG.warn("Failed loading allocated container, continue next...");
}
return true;
});
@@ -166,25 +190,26 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
}
/**
- * Pre-provision specified count of containers for block creation.
- * @param count - number of containers to create.
- * @return list of container names created.
+ * Pre allocate specified count of containers for block creation.
+ * @param count - number of containers to allocate.
+ * @return list of container names allocated.
* @throws IOException
*/
- private List<String> provisionContainers(int count) throws IOException {
+ private List<String> allocateContainers(int count) throws IOException {
List<String> results = new ArrayList();
lock.lock();
try {
for (int i = 0; i < count; i++) {
String containerName = UUID.randomUUID().toString();
+ ContainerInfo containerInfo = null;
try {
// TODO: Fix this later when Ratis is made the Default.
- Pipeline pipeline = containerManager.allocateContainer(
+ containerInfo = containerManager.allocateContainer(
OzoneProtos.ReplicationType.STAND_ALONE,
OzoneProtos.ReplicationFactor.ONE,
containerName);
- if (pipeline == null) {
+ if (containerInfo == null) {
LOG.warn("Unable to allocate container.");
continue;
}
@@ -192,8 +217,12 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
LOG.warn("Unable to allocate container: " + ex);
continue;
}
- openContainers.put(containerName, 0L);
- openContainerStore.put(DFSUtil.string2Bytes(containerName),
+ Map<String, BlockContainerInfo> containersByState =
+ containers.get(OzoneProtos.LifeCycleState.ALLOCATED);
+ Preconditions.checkNotNull(containersByState);
+ containersByState.put(containerName,
+ new BlockContainerInfo(containerInfo, 0));
+ containerStore.put(DFSUtil.string2Bytes(containerName),
DFSUtil.string2Bytes(Long.toString(0L)));
results.add(containerName);
}
@@ -204,6 +233,76 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
}
/**
+ * Filter container by states and size.
+ * @param state the state of the container.
+ * @param size the minimal available size of the container
+ * @return allocated containers satisfy both state and size.
+ */
+ private List <String> filterContainers(OzoneProtos.LifeCycleState state,
+ long size) {
+ Map<String, BlockContainerInfo> containersByState =
+ this.containers.get(state);
+ return containersByState.entrySet().parallelStream()
+ .filter(e -> ((e.getValue().getAllocated() + size < containerSize)))
+ .map(e -> e.getKey())
+ .collect(Collectors.toList());
+ }
+
+ private BlockContainerInfo getContainer(OzoneProtos.LifeCycleState state,
+ String name) {
+ Map<String, BlockContainerInfo> containersByState = this.containers.get(state);
+ return containersByState.get(name);
+ }
+
+ // Relies on the caller such as allocateBlock() to hold the lock
+ // to ensure containers map consistent.
+ private void updateContainer(OzoneProtos.LifeCycleState oldState, String name,
+ OzoneProtos.LifeCycleState newState) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Update container {} from state {} to state {}",
+ name, oldState, newState);
+ }
+ Map<String, BlockContainerInfo> containersInOldState =
+ this.containers.get(oldState);
+ BlockContainerInfo containerInfo = containersInOldState.get(name);
+ Preconditions.checkNotNull(containerInfo);
+ containersInOldState.remove(name);
+ Map<String, BlockContainerInfo> containersInNewState =
+ this.containers.get(newState);
+ containersInNewState.put(name, containerInfo);
+ }
+
+ // Refresh containers that have been allocated.
+ // We may not need to track all the states, just the creating/open/close
+ // should be enough for now.
+ private void refreshContainers() {
+ Map<String, BlockContainerInfo> containersByState =
+ this.containers.get(OzoneProtos.LifeCycleState.ALLOCATED);
+ for (String containerName: containersByState.keySet()) {
+ try {
+ ContainerInfo containerInfo =
+ containerManager.getContainer(containerName);
+ if (containerInfo == null) {
+ // TODO: clean up containers that has been deleted on SCM but
+ // TODO: still in ALLOCATED state in block manager.
+ LOG.debug("Container {} allocated by block service" +
+ "can't be found in SCM", containerName);
+ continue;
+ }
+ if (containerInfo.getState() == OzoneProtos.LifeCycleState.OPEN) {
+ updateContainer(OzoneProtos.LifeCycleState.ALLOCATED, containerName,
+ containerInfo.getState());
+ }
+ // TODO: check containers in other state and refresh as needed.
+ // TODO: ALLOCATED container that is timeout and DELETED. (unit test)
+ // TODO: OPEN container that is CLOSE.
+ } catch (IOException ex) {
+ LOG.debug("Failed to get container info for: {}", containerName);
+ }
+ }
+ }
+
+ /**
* Allocates a new block for a given size.
*
* SCM choose one of the open containers and returns that as the location for
@@ -215,8 +314,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
*/
@Override
public AllocatedBlock allocateBlock(final long size) throws IOException {
- boolean createContainer;
- Pipeline pipeline;
+ boolean createContainer = false;
if (size < 0 || size > containerSize) {
throw new SCMException("Unsupported block size",
INVALID_BLOCK_SIZE);
@@ -228,37 +326,29 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
lock.lock();
try {
+ refreshContainers();
List<String> candidates;
- if (openContainers.size() == 0) {
- try {
- candidates = provisionContainers(containerProvisionBatchSize);
- } catch (IOException ex) {
- throw new SCMException("Unable to allocate container for the block",
- FAILED_TO_ALLOCATE_CONTAINER);
- }
- createContainer = true;
- } else {
- candidates = openContainers.entrySet().parallelStream()
- .filter(e -> (e.getValue() + size < containerSize))
- .map(e -> e.getKey())
- .collect(Collectors.toList());
- createContainer = false;
- }
-
+ candidates = filterContainers(OzoneProtos.LifeCycleState.OPEN, size);
if (candidates.size() == 0) {
- try {
- candidates = provisionContainers(containerProvisionBatchSize);
- } catch (IOException ex) {
- throw new SCMException("Unable to allocate container for the block",
- FAILED_TO_ALLOCATE_CONTAINER);
+ candidates = filterContainers(OzoneProtos.LifeCycleState.ALLOCATED,
+ size);
+ if (candidates.size() == 0) {
+ try {
+ candidates = allocateContainers(containerProvisionBatchSize);
+ } catch (IOException ex) {
+ LOG.error("Unable to allocate container for the block.");
+ throw new SCMException("Unable to allocate container for the block",
+ FAILED_TO_ALLOCATE_CONTAINER);
+ }
+ }
+ // now we should have some candidates in ALLOCATE state
+ if (candidates.size() == 0) {
+ throw new SCMException("Fail to find any container to allocate block " +
+ "of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE);
}
}
- if (candidates.size() == 0) {
- throw new SCMException("Fail to find any container to allocate block " +
- "of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE);
- }
-
+ // Candidates list now should include only ALLOCATE or OPEN containers
int randomIdx = rand.nextInt(candidates.size());
String containerName = candidates.get(randomIdx);
if (LOG.isDebugEnabled()) {
@@ -266,28 +356,46 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
candidates.toString(), containerName);
}
- pipeline = containerManager.getContainer(containerName);
- if (pipeline == null) {
+ ContainerInfo containerInfo =
+ containerManager.getContainer(containerName);
+ if (containerInfo == null) {
LOG.debug("Unable to find container for the block");
throw new SCMException("Unable to find container to allocate block",
FAILED_TO_FIND_CONTAINER);
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Candidate {} state {}", containerName,
+ containerInfo.getState());
+ }
+ // Container must be either OPEN or ALLOCATE state
+ if (containerInfo.getState() == OzoneProtos.LifeCycleState.ALLOCATED) {
+ createContainer = true;
+ }
+
// TODO: make block key easier to debug (e.g., seq no)
// Allocate key for the block
String blockKey = UUID.randomUUID().toString();
AllocatedBlock.Builder abb = new AllocatedBlock.Builder()
- .setKey(blockKey).setPipeline(pipeline)
+ .setKey(blockKey).setPipeline(containerInfo.getPipeline())
.setShouldCreateContainer(createContainer);
- if (pipeline.getMachines().size() > 0) {
+ if (containerInfo.getPipeline().getMachines().size() > 0) {
blockStore.put(DFSUtil.string2Bytes(blockKey),
DFSUtil.string2Bytes(containerName));
// update the container usage information
- Long newUsed = openContainers.get(containerName) + size;
- openContainers.put(containerName, newUsed);
- openContainerStore.put(DFSUtil.string2Bytes(containerName),
- DFSUtil.string2Bytes(Long.toString(newUsed)));
+ BlockContainerInfo containerInfoUpdate =
+ getContainer(containerInfo.getState(), containerName);
+ Preconditions.checkNotNull(containerInfoUpdate);
+ containerInfoUpdate.addAllocated(size);
+ containerStore.put(DFSUtil.string2Bytes(containerName),
+ DFSUtil.string2Bytes(Long.toString(containerInfoUpdate.getAllocated())));
+ if (createContainer) {
+ OzoneProtos.LifeCycleState newState =
+ containerManager.updateContainerState(containerName,
+ OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
+ updateContainer(containerInfo.getState(), containerName, newState);
+ }
return abb.build();
}
} finally {
@@ -312,8 +420,16 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
throw new SCMException("Specified block key does not exist. key : " +
key, FAILED_TO_FIND_BLOCK);
}
- return containerManager.getContainer(
- DFSUtil.bytes2String(containerBytes));
+ String containerName = DFSUtil.bytes2String(containerBytes);
+ ContainerInfo containerInfo = containerManager.getContainer(
+ containerName);
+ if (containerInfo == null) {
+ LOG.debug("Container {} allocated by block service" +
+ "can't be found in SCM", containerName);
+ throw new SCMException("Unable to find container for the block",
+ SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+ }
+ return containerInfo.getPipeline();
} finally {
lock.unlock();
}
@@ -338,8 +454,9 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
throw new SCMException("Specified block key does not exist. key : " +
key, FAILED_TO_FIND_BLOCK);
}
+ // TODO: track the block size info so that we can reclaim the container
+ // TODO: used space when the block is deleted.
BatchOperation batch = new BatchOperation();
- containerManager.getContainer(DFSUtil.bytes2String(containerBytes));
String deletedKeyName = getDeletedKeyName(key);
// Add a tombstone for the deleted key
batch.put(DFSUtil.string2Bytes(deletedKeyName), containerBytes);
@@ -370,8 +487,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
if (blockStore != null) {
blockStore.close();
}
- if (openContainerStore != null) {
- openContainerStore.close();
+ if (containerStore != null) {
+ containerStore.close();
}
MBeans.unregister(mxBean);
@@ -379,6 +496,6 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
@Override
public int getOpenContainersNo() {
- return openContainers.size();
+ return containers.get(OzoneProtos.LifeCycleState.OPEN).size();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
index 9e4053e..d674b64 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.scm.cli;
+import com.google.common.base.Preconditions;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
@@ -33,8 +34,10 @@ import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.Buck
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Pipeline;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.utils.MetadataStore;
@@ -482,10 +485,14 @@ public class SQLCLI extends Configured implements Tool {
HashSet<String> uuidChecked = new HashSet<>();
dbStore.iterate(null, (key, value) -> {
String containerName = new String(key, encoding);
- Pipeline pipeline = null;
- pipeline = Pipeline.parseFrom(value);
+ ContainerInfo containerInfo = null;
+ containerInfo = ContainerInfo.fromProtobuf(
+ OzoneProtos.SCMContainerInfo.PARSER.parseFrom(value));
+ Preconditions.checkNotNull(containerInfo);
try {
- insertContainerDB(conn, containerName, pipeline, uuidChecked);
+ //TODO: include container state to sqllite schema
+ insertContainerDB(conn, containerName,
+ containerInfo.getPipeline().getProtobufMessage(), uuidChecked);
return true;
} catch (SQLException e) {
throw new IOException(e);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
index 8daa5d4..3cf78d9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
@@ -20,13 +20,17 @@ package org.apache.hadoop.ozone.scm.container;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import org.apache.hadoop.ozone.common.statemachine.StateMachine;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
import org.apache.hadoop.utils.MetadataStore;
@@ -38,8 +42,10 @@ import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -60,6 +66,9 @@ public class ContainerMapping implements Mapping {
private final MetadataStore containerStore;
private final PipelineSelector pipelineSelector;
+ private final StateMachine<OzoneProtos.LifeCycleState,
+ OzoneProtos.LifeCycleEvent> stateMachine;
+
/**
* Constructs a mapping class that creates mapping between container names and
* pipelines.
@@ -88,31 +97,80 @@ public class ContainerMapping implements Mapping {
.build();
this.lock = new ReentrantLock();
+
this.pipelineSelector = new PipelineSelector(nodeManager, conf);
+
+ // Initialize the container state machine.
+ Set<OzoneProtos.LifeCycleState> finalStates = new HashSet();
+ finalStates.add(OzoneProtos.LifeCycleState.OPEN);
+ finalStates.add(OzoneProtos.LifeCycleState.CLOSED);
+ finalStates.add(OzoneProtos.LifeCycleState.DELETED);
+
+ this.stateMachine = new StateMachine<>(
+ OzoneProtos.LifeCycleState.ALLOCATED, finalStates);
+ initializeStateMachine();
}
+ // Client-driven Create State Machine
+ // States: <ALLOCATED>------------->CREATING----------------->[OPEN]
+ // Events: (BEGIN_CREATE) | (COMPLETE_CREATE)
+ // |
+ // |(TIMEOUT)
+ // V
+ // DELETING----------------->[DELETED]
+ // (CLEANUP)
+
+ // SCM Open/Close State Machine
+ // States: OPEN------------------>[CLOSED]
+ // Events: (CLOSE)
+
+ // Delete State Machine
+ // States: OPEN------------------>DELETING------------------>[DELETED]
+ // Events: (DELETE) (CLEANUP)
+ private void initializeStateMachine() {
+ stateMachine.addTransition(OzoneProtos.LifeCycleState.ALLOCATED,
+ OzoneProtos.LifeCycleState.CREATING,
+ OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
+
+ stateMachine.addTransition(OzoneProtos.LifeCycleState.CREATING,
+ OzoneProtos.LifeCycleState.OPEN,
+ OzoneProtos.LifeCycleEvent.COMPLETE_CREATE);
+
+ stateMachine.addTransition(OzoneProtos.LifeCycleState.OPEN,
+ OzoneProtos.LifeCycleState.CLOSED,
+ OzoneProtos.LifeCycleEvent.CLOSE);
+ stateMachine.addTransition(OzoneProtos.LifeCycleState.OPEN,
+ OzoneProtos.LifeCycleState.DELETING,
+ OzoneProtos.LifeCycleEvent.DELETE);
+
+ stateMachine.addTransition(OzoneProtos.LifeCycleState.DELETING,
+ OzoneProtos.LifeCycleState.DELETED,
+ OzoneProtos.LifeCycleEvent.CLEANUP);
+
+ // Creating timeout -> Deleting
+ stateMachine.addTransition(OzoneProtos.LifeCycleState.CREATING,
+ OzoneProtos.LifeCycleState.DELETING,
+ OzoneProtos.LifeCycleEvent.TIMEOUT);
+ }
/**
- * Returns the Pipeline from the container name.
- *
- * @param containerName - Name
- * @return - Pipeline that makes up this container.
+ * {@inheritDoc}
*/
@Override
- public Pipeline getContainer(final String containerName) throws IOException {
- Pipeline pipeline;
+ public ContainerInfo getContainer(final String containerName) throws IOException {
+ ContainerInfo containerInfo;
lock.lock();
try {
- byte[] pipelineBytes =
+ byte[] containerBytes =
containerStore.get(containerName.getBytes(encoding));
- if (pipelineBytes == null) {
+ if (containerBytes == null) {
throw new SCMException("Specified key does not exist. key : " +
containerName, SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
}
- pipeline = Pipeline.getFromProtoBuf(
- OzoneProtos.Pipeline.PARSER.parseFrom(pipelineBytes));
- return pipeline;
+ containerInfo = ContainerInfo.fromProtobuf(
+ OzoneProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes));
+ return containerInfo;
} finally {
lock.unlock();
}
@@ -138,10 +196,13 @@ public class ContainerMapping implements Mapping {
containerStore.getRangeKVs(startKey, count, prefixFilter);
// Transform the values into the pipelines.
+ // TODO: return list of ContainerInfo instead of pipelines.
+ // TODO: filter by container state
for (Map.Entry<byte[], byte[]> entry : range) {
- Pipeline pipeline = Pipeline.getFromProtoBuf(
- OzoneProtos.Pipeline.PARSER.parseFrom(entry.getValue()));
- pipelineList.add(pipeline);
+ ContainerInfo containerInfo = ContainerInfo.fromProtobuf(
+ OzoneProtos.SCMContainerInfo.PARSER.parseFrom(entry.getValue()));
+ Preconditions.checkNotNull(containerInfo);
+ pipelineList.add(containerInfo.getPipeline());
}
} finally {
lock.unlock();
@@ -158,12 +219,12 @@ public class ContainerMapping implements Mapping {
* @throws IOException - Exception
*/
@Override
- public Pipeline allocateContainer(OzoneProtos.ReplicationType type,
+ public ContainerInfo allocateContainer(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor replicationFactor,
final String containerName) throws IOException {
Preconditions.checkNotNull(containerName);
Preconditions.checkState(!containerName.isEmpty());
- Pipeline pipeline = null;
+ ContainerInfo containerInfo = null;
if (!nodeManager.isOutOfNodeChillMode()) {
throw new SCMException("Unable to create container while in chill mode",
SCMException.ResultCodes.CHILL_MODE_EXCEPTION);
@@ -171,20 +232,25 @@ public class ContainerMapping implements Mapping {
lock.lock();
try {
- byte[] pipelineBytes =
+ byte[] containerBytes =
containerStore.get(containerName.getBytes(encoding));
- if (pipelineBytes != null) {
+ if (containerBytes != null) {
throw new SCMException("Specified container already exists. key : " +
containerName, SCMException.ResultCodes.CONTAINER_EXISTS);
}
- pipeline = pipelineSelector.getReplicationPipeline(type,
+ Pipeline pipeline = pipelineSelector.getReplicationPipeline(type,
replicationFactor, containerName);
+ containerInfo = new ContainerInfo.Builder()
+ .setState(OzoneProtos.LifeCycleState.ALLOCATED)
+ .setPipeline(pipeline)
+ .setStateEnterTime(Time.monotonicNow())
+ .build();
containerStore.put(containerName.getBytes(encoding),
- pipeline.getProtobufMessage().toByteArray());
+ containerInfo.getProtobuf().toByteArray());
} finally {
lock.unlock();
}
- return pipeline;
+ return containerInfo;
}
/**
@@ -199,9 +265,9 @@ public class ContainerMapping implements Mapping {
lock.lock();
try {
byte[] dbKey = containerName.getBytes(encoding);
- byte[] pipelineBytes =
+ byte[] containerBytes =
containerStore.get(dbKey);
- if (pipelineBytes == null) {
+ if(containerBytes == null) {
throw new SCMException("Failed to delete container "
+ containerName + ", reason : container doesn't exist.",
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
@@ -213,6 +279,44 @@ public class ContainerMapping implements Mapping {
}
/**
+ * {@inheritDoc}
+ * Used by client to update container state on SCM.
+ */
+ @Override
+ public OzoneProtos.LifeCycleState updateContainerState(String containerName,
+ OzoneProtos.LifeCycleEvent event) throws IOException {
+ ContainerInfo containerInfo;
+ lock.lock();
+ try {
+ byte[] dbKey = containerName.getBytes(encoding);
+ byte[] containerBytes =
+ containerStore.get(dbKey);
+ if(containerBytes == null) {
+ throw new SCMException("Failed to update container state"
+ + containerName + ", reason : container doesn't exist.",
+ SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+ }
+ containerInfo = ContainerInfo.fromProtobuf(
+ OzoneProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes));
+
+ OzoneProtos.LifeCycleState newState;
+ try {
+ newState = stateMachine.getNextState(containerInfo.getState(), event);
+ } catch (InvalidStateTransitionException ex) {
+ throw new SCMException("Failed to update container state"
+ + containerName + ", reason : invalid state transition from state: "
+ + containerInfo.getState() + " upon event: " + event + ".",
+ SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE);
+ }
+ containerInfo.setState(newState);
+ containerStore.put(dbKey, containerInfo.getProtobuf().toByteArray());
+ return newState;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
* Closes this stream and releases any system resources associated with it. If
* the stream is already closed then invoking this method has no effect.
* <p>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
index 1ef3572..7cf3f96 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
@@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.scm.container;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.Closeable;
@@ -30,13 +31,13 @@ import java.util.List;
*/
public interface Mapping extends Closeable {
/**
- * Returns the Pipeline from the container name.
+ * Returns the ContainerInfo from the container name.
*
* @param containerName - Name
- * @return - Pipeline that makes up this container.
+ * @return - ContainerInfo such as creation state and the pipeline.
* @throws IOException
*/
- Pipeline getContainer(String containerName) throws IOException;
+ ContainerInfo getContainer(String containerName) throws IOException;
/**
* Returns pipelines under certain conditions.
@@ -57,16 +58,15 @@ public interface Mapping extends Closeable {
List<Pipeline> listContainer(String startName, String prefixName, int count)
throws IOException;
-
/**
* Allocates a new container for a given keyName and replication factor.
*
* @param containerName - Name.
* @param replicationFactor - replication factor of the container.
- * @return - Pipeline that makes up this container.
+ * @return - Container Info.
* @throws IOException
*/
- Pipeline allocateContainer(OzoneProtos.ReplicationType type,
+ ContainerInfo allocateContainer(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor replicationFactor,
String containerName) throws IOException;
@@ -77,4 +77,14 @@ public interface Mapping extends Closeable {
* @throws IOException
*/
void deleteContainer(String containerName) throws IOException;
+
+ /**
+ * Update container state.
+ * @param containerName - Container Name
+ * @param event - container life cycle event
+ * @return - new container state
+ * @throws IOException
+ */
+ OzoneProtos.LifeCycleState updateContainerState(String containerName,
+ OzoneProtos.LifeCycleEvent event) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java
index f60bdc6..4e8470d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java
@@ -106,6 +106,7 @@ public class SCMException extends IOException {
CHILL_MODE_EXCEPTION,
FAILED_TO_LOAD_OPEN_CONTAINER,
FAILED_TO_ALLOCATE_CONTAINER,
+ FAILED_TO_CHANGE_CONTAINER_STATE,
CONTAINER_EXISTS,
FAILED_TO_FIND_CONTAINER,
FAILED_TO_FIND_CONTAINER_WITH_SAPCE,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
index aa52979..8400ee0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -51,7 +51,7 @@ message SCMHeartbeatRequestProto {
optional ReportState containerReportState = 3;
}
-enum ContainerState {
+enum DatanodeContainerState {
closed = 0;
open = 1;
}
@@ -76,7 +76,7 @@ SCM database, This information allows SCM to startup faster and avoid having
all container info in memory all the time.
*/
message ContainerPersistanceProto {
- required ContainerState state = 1;
+ required DatanodeContainerState state = 1;
required hadoop.hdfs.ozone.Pipeline pipeline = 2;
required ContainerInfo info = 3;
}
@@ -89,8 +89,6 @@ message NodeContianerMapping {
repeated string contianerName = 1;
}
-
-
/**
A container report contains the following information.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
index 837d1b7..501475b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
@@ -319,20 +319,25 @@ public class TestOzoneRpcClient {
throws IOException, OzoneException {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
- String keyName = UUID.randomUUID().toString();
+
String value = "sample value";
ozClient.createVolume(volumeName);
ozClient.createBucket(volumeName, bucketName);
- OzoneOutputStream out = ozClient.createKey(volumeName, bucketName,
- keyName, value.getBytes().length);
- out.write(value.getBytes());
- out.close();
- OzoneKey key = ozClient.getKeyDetails(volumeName, bucketName, keyName);
- Assert.assertEquals(keyName, key.getKeyName());
- OzoneInputStream is = ozClient.getKey(volumeName, bucketName, keyName);
- byte[] fileContent = new byte[value.getBytes().length];
- is.read(fileContent);
- Assert.assertEquals(value, new String(fileContent));
+
+ for (int i = 0; i < 10; i++) {
+ String keyName = UUID.randomUUID().toString();
+
+ OzoneOutputStream out = ozClient.createKey(volumeName, bucketName,
+ keyName, value.getBytes().length);
+ out.write(value.getBytes());
+ out.close();
+ OzoneKey key = ozClient.getKeyDetails(volumeName, bucketName, keyName);
+ Assert.assertEquals(keyName, key.getKeyName());
+ OzoneInputStream is = ozClient.getKey(volumeName, bucketName, keyName);
+ byte[] fileContent = new byte[value.getBytes().length];
+ is.read(fileContent);
+ Assert.assertEquals(value, new String(fileContent));
+ }
}
@Test
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java
index cc7c9ff..79e6af6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
@@ -80,11 +81,11 @@ public class TestContainerMapping {
@Test
public void testallocateContainer() throws Exception {
- Pipeline pipeline = mapping.allocateContainer(
+ ContainerInfo containerInfo = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
UUID.randomUUID().toString());
- Assert.assertNotNull(pipeline);
+ Assert.assertNotNull(containerInfo);
}
@Test
@@ -97,13 +98,15 @@ public class TestContainerMapping {
*/
Set<String> pipelineList = new TreeSet<>();
for (int x = 0; x < 30; x++) {
- Pipeline pipeline = mapping.allocateContainer(
+ ContainerInfo containerInfo = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
UUID.randomUUID().toString());
- Assert.assertNotNull(pipeline);
- pipelineList.add(pipeline.getLeader().getDatanodeUuid());
+ Assert.assertNotNull(containerInfo);
+ Assert.assertNotNull(containerInfo.getPipeline());
+ pipelineList.add(containerInfo.getPipeline().getLeader()
+ .getDatanodeUuid());
}
Assert.assertTrue(pipelineList.size() > 5);
}
@@ -113,9 +116,9 @@ public class TestContainerMapping {
String containerName = UUID.randomUUID().toString();
Pipeline pipeline = mapping.allocateContainer(
xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), containerName);
+ xceiverClientManager.getFactor(), containerName).getPipeline();
Assert.assertNotNull(pipeline);
- Pipeline newPipeline = mapping.getContainer(containerName);
+ Pipeline newPipeline = mapping.getContainer(containerName).getPipeline();
Assert.assertEquals(pipeline.getLeader().getDatanodeUuid(),
newPipeline.getLeader().getDatanodeUuid());
}
@@ -125,7 +128,7 @@ public class TestContainerMapping {
String containerName = UUID.randomUUID().toString();
Pipeline pipeline = mapping.allocateContainer(
xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), containerName);
+ xceiverClientManager.getFactor(), containerName).getPipeline();
Assert.assertNotNull(pipeline);
thrown.expectMessage("Specified container already exists.");
mapping.allocateContainer(xceiverClientManager.getType(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java
index 66e5c1e..430d34b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java
@@ -154,7 +154,7 @@ public class TestContainerPlacement {
String container1 = UUID.randomUUID().toString();
Pipeline pipeline1 = containerManager.allocateContainer(
xceiverClientManager.getType(),
- xceiverClientManager.getFactor(), container1);
+ xceiverClientManager.getFactor(), container1).getPipeline();
assertEquals(xceiverClientManager.getFactor().getNumber(),
pipeline1.getMachines().size());
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org