You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/07/03 21:12:09 UTC
[3/3] hadoop git commit: HDDS-175. Refactor ContainerInfo to remove
Pipeline object from it. Contributed by Ajay Kumar.
HDDS-175. Refactor ContainerInfo to remove Pipeline object from it.
Contributed by Ajay Kumar.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7ca4f0ce
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7ca4f0ce
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7ca4f0ce
Branch: refs/heads/trunk
Commit: 7ca4f0cefa220c752920822c8d16469ab3b09b37
Parents: 93ac01c
Author: Anu Engineer <ae...@apache.org>
Authored: Tue Jul 3 13:30:19 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Tue Jul 3 14:11:52 2018 -0700
----------------------------------------------------------------------
.../scm/client/ContainerOperationClient.java | 109 +++++++++---
.../hadoop/hdds/scm/client/ScmClient.java | 38 ++++-
.../container/common/helpers/ContainerInfo.java | 167 +++++++++++++------
.../common/helpers/ContainerWithPipeline.java | 131 +++++++++++++++
.../StorageContainerLocationProtocol.java | 13 +-
...rLocationProtocolClientSideTranslatorPB.java | 26 ++-
...rLocationProtocolServerSideTranslatorPB.java | 25 ++-
.../StorageContainerLocationProtocol.proto | 15 +-
hadoop-hdds/common/src/main/proto/hdds.proto | 9 +-
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 80 +++++----
.../block/DatanodeDeletedBlockTransactions.java | 11 +-
.../container/CloseContainerEventHandler.java | 26 ++-
.../hdds/scm/container/ContainerMapping.java | 128 +++++++++++---
.../scm/container/ContainerStateManager.java | 30 +++-
.../hadoop/hdds/scm/container/Mapping.java | 26 ++-
.../scm/container/closer/ContainerCloser.java | 15 +-
.../scm/container/states/ContainerStateMap.java | 7 +-
.../hdds/scm/pipelines/PipelineManager.java | 27 ++-
.../hdds/scm/pipelines/PipelineSelector.java | 16 ++
.../scm/pipelines/ratis/RatisManagerImpl.java | 1 +
.../standalone/StandaloneManagerImpl.java | 1 +
.../scm/server/SCMClientProtocolServer.java | 14 +-
.../hdds/scm/block/TestDeletedBlockLog.java | 15 +-
.../TestCloseContainerEventHandler.java | 31 ++--
.../scm/container/TestContainerMapping.java | 27 +--
.../container/closer/TestContainerCloser.java | 18 +-
.../hdds/scm/node/TestContainerPlacement.java | 6 +-
.../cli/container/CloseContainerHandler.java | 10 +-
.../cli/container/DeleteContainerHandler.java | 9 +-
.../scm/cli/container/InfoContainerHandler.java | 11 +-
.../ozone/client/io/ChunkGroupInputStream.java | 15 +-
.../ozone/client/io/ChunkGroupOutputStream.java | 9 +-
.../hadoop/ozone/protocolPB/OzonePBHelper.java | 30 ++++
.../container/TestContainerStateManager.java | 161 ++++++++++--------
.../hadoop/ozone/TestContainerOperations.java | 11 +-
.../ozone/TestStorageContainerManager.java | 6 +-
.../TestStorageContainerManagerHelper.java | 10 +-
.../ozone/client/rpc/TestOzoneRpcClient.java | 4 +-
.../TestCloseContainerByPipeline.java | 21 +--
.../ozone/ksm/TestContainerReportWithKeys.java | 2 +-
.../hadoop/ozone/scm/TestAllocateContainer.java | 6 +-
.../ozone/scm/TestContainerSmallFile.java | 36 ++--
.../org/apache/hadoop/ozone/scm/TestSCMCli.java | 135 ++++++++-------
.../ozone/scm/TestXceiverClientManager.java | 62 ++++---
.../ozone/scm/TestXceiverClientMetrics.java | 14 +-
.../genesis/BenchMarkContainerStateMap.java | 16 +-
.../org/apache/hadoop/ozone/scm/cli/SQLCLI.java | 63 +++----
47 files changed, 1139 insertions(+), 504 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
index 07f6cec..b04f8c4 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.client;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.protocolPB
@@ -87,16 +88,17 @@ public class ContainerOperationClient implements ScmClient {
* @inheritDoc
*/
@Override
- public ContainerInfo createContainer(String owner)
+ public ContainerWithPipeline createContainer(String owner)
throws IOException {
XceiverClientSpi client = null;
try {
- ContainerInfo container =
+ ContainerWithPipeline containerWithPipeline =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), owner);
- Pipeline pipeline = container.getPipeline();
- client = xceiverClientManager.acquireClient(pipeline, container.getContainerID());
+ Pipeline pipeline = containerWithPipeline.getPipeline();
+ client = xceiverClientManager.acquireClient(pipeline,
+ containerWithPipeline.getContainerInfo().getContainerID());
// Allocated State means that SCM has allocated this pipeline in its
// namespace. The client needs to create the pipeline on the machines
@@ -106,8 +108,9 @@ public class ContainerOperationClient implements ScmClient {
if (pipeline.getLifeCycleState() == ALLOCATED) {
createPipeline(client, pipeline);
}
- createContainer(client, container.getContainerID());
- return container;
+ createContainer(client,
+ containerWithPipeline.getContainerInfo().getContainerID());
+ return containerWithPipeline;
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
@@ -197,17 +200,17 @@ public class ContainerOperationClient implements ScmClient {
* @inheritDoc
*/
@Override
- public ContainerInfo createContainer(HddsProtos.ReplicationType type,
+ public ContainerWithPipeline createContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor factor, String owner) throws IOException {
XceiverClientSpi client = null;
try {
// allocate container on SCM.
- ContainerInfo container =
+ ContainerWithPipeline containerWithPipeline =
storageContainerLocationClient.allocateContainer(type, factor,
owner);
- Pipeline pipeline = container.getPipeline();
+ Pipeline pipeline = containerWithPipeline.getPipeline();
client = xceiverClientManager.acquireClient(pipeline,
- container.getContainerID());
+ containerWithPipeline.getContainerInfo().getContainerID());
// Allocated State means that SCM has allocated this pipeline in its
// namespace. The client needs to create the pipeline on the machines
@@ -217,9 +220,10 @@ public class ContainerOperationClient implements ScmClient {
}
// connect to pipeline leader and allocate container on leader datanode.
client = xceiverClientManager.acquireClient(pipeline,
- container.getContainerID());
- createContainer(client, container.getContainerID());
- return container;
+ containerWithPipeline.getContainerInfo().getContainerID());
+ createContainer(client,
+ containerWithPipeline.getContainerInfo().getContainerID());
+ return containerWithPipeline;
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
@@ -256,24 +260,27 @@ public class ContainerOperationClient implements ScmClient {
}
/**
- * Delete the container, this will release any resource it uses.
- * @param pipeline - Pipeline that represents the container.
- * @param force - True to forcibly delete the container.
+ * Deletes an existing container.
+ *
+ * @param containerId - ID of the container.
+ * @param pipeline - Pipeline that represents the container.
+ * @param force - true to forcibly delete the container.
* @throws IOException
*/
@Override
- public void deleteContainer(long containerID, Pipeline pipeline, boolean force)
- throws IOException {
+ public void deleteContainer(long containerId, Pipeline pipeline,
+ boolean force) throws IOException {
XceiverClientSpi client = null;
try {
- client = xceiverClientManager.acquireClient(pipeline, containerID);
+ client = xceiverClientManager.acquireClient(pipeline, containerId);
String traceID = UUID.randomUUID().toString();
- ContainerProtocolCalls.deleteContainer(client, containerID, force, traceID);
+ ContainerProtocolCalls
+ .deleteContainer(client, containerId, force, traceID);
storageContainerLocationClient
- .deleteContainer(containerID);
+ .deleteContainer(containerId);
if (LOG.isDebugEnabled()) {
LOG.debug("Deleted container {}, leader: {}, machines: {} ",
- containerID,
+ containerId,
pipeline.getLeader(),
pipeline.getMachines());
}
@@ -285,6 +292,19 @@ public class ContainerOperationClient implements ScmClient {
}
/**
+ * Delete the container, this will release any resource it uses.
+ * @param containerID - containerID.
+ * @param force - True to forcibly delete the container.
+ * @throws IOException
+ */
+ @Override
+ public void deleteContainer(long containerID, boolean force)
+ throws IOException {
+ ContainerWithPipeline info = getContainerWithPipeline(containerID);
+ deleteContainer(containerID, info.getPipeline(), force);
+ }
+
+ /**
* {@inheritDoc}
*/
@Override
@@ -297,9 +317,9 @@ public class ContainerOperationClient implements ScmClient {
/**
* Get meta data from an existing container.
*
- * @param pipeline - pipeline that represents the container.
- * @return ContainerInfo - a message of protobuf which has basic info
- * of a container.
+ * @param containerID - ID of the container.
+ * @param pipeline - Pipeline where the container is located.
+ * @return ContainerInfo
* @throws IOException
*/
@Override
@@ -326,6 +346,19 @@ public class ContainerOperationClient implements ScmClient {
}
/**
+ * Get meta data from an existing container.
+ * @param containerID - ID of the container.
+ * @return ContainerInfo - a message of protobuf which has basic info
+ * of a container.
+ * @throws IOException
+ */
+ @Override
+ public ContainerData readContainer(long containerID) throws IOException {
+ ContainerWithPipeline info = getContainerWithPipeline(containerID);
+ return readContainer(containerID, info.getPipeline());
+ }
+
+ /**
* Given an id, return the pipeline associated with the container.
* @param containerId - String Container ID
* @return Pipeline of the existing container, corresponding to the given id.
@@ -338,6 +371,19 @@ public class ContainerOperationClient implements ScmClient {
}
/**
+ * Gets a container by Name -- Throws if the container does not exist.
+ *
+ * @param containerId - Container ID
+ * @return ContainerWithPipeline
+ * @throws IOException
+ */
+ @Override
+ public ContainerWithPipeline getContainerWithPipeline(long containerId)
+ throws IOException {
+ return storageContainerLocationClient.getContainerWithPipeline(containerId);
+ }
+
+ /**
* Close a container.
*
* @param pipeline the container to be closed.
@@ -392,6 +438,19 @@ public class ContainerOperationClient implements ScmClient {
}
/**
+ * Close a container.
+ *
+ * @throws IOException
+ */
+ @Override
+ public void closeContainer(long containerId)
+ throws IOException {
+ ContainerWithPipeline info = getContainerWithPipeline(containerId);
+ Pipeline pipeline = info.getPipeline();
+ closeContainer(containerId, pipeline);
+ }
+
+ /**
* Get the the current usage information.
* @param containerID - ID of the container.
* @return the size of the given container.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index b52819a..ecb2173 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.client;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -45,7 +46,7 @@ public interface ScmClient {
* @return ContainerInfo
* @throws IOException
*/
- ContainerInfo createContainer(String owner) throws IOException;
+ ContainerWithPipeline createContainer(String owner) throws IOException;
/**
* Gets a container by Name -- Throws if the container does not exist.
@@ -56,6 +57,14 @@ public interface ScmClient {
ContainerInfo getContainer(long containerId) throws IOException;
/**
+ * Gets a container by Name -- Throws if the container does not exist.
+ * @param containerId - Container ID
+ * @return ContainerWithPipeline
+ * @throws IOException
+ */
+ ContainerWithPipeline getContainerWithPipeline(long containerId) throws IOException;
+
+ /**
* Close a container.
*
* @param containerId - ID of the container.
@@ -65,6 +74,14 @@ public interface ScmClient {
void closeContainer(long containerId, Pipeline pipeline) throws IOException;
/**
+ * Close a container.
+ *
+ * @param containerId - ID of the container.
+ * @throws IOException
+ */
+ void closeContainer(long containerId) throws IOException;
+
+ /**
* Deletes an existing container.
* @param containerId - ID of the container.
* @param pipeline - Pipeline that represents the container.
@@ -74,6 +91,14 @@ public interface ScmClient {
void deleteContainer(long containerId, Pipeline pipeline, boolean force) throws IOException;
/**
+ * Deletes an existing container.
+ * @param containerId - ID of the container.
+ * @param force - true to forcibly delete the container.
+ * @throws IOException
+ */
+ void deleteContainer(long containerId, boolean force) throws IOException;
+
+ /**
* Lists a range of containers and get their info.
*
* @param startContainerID start containerID.
@@ -96,6 +121,15 @@ public interface ScmClient {
throws IOException;
/**
+ * Read meta data from an existing container.
+ * @param containerID - ID of the container.
+ * @return ContainerInfo
+ * @throws IOException
+ */
+ ContainerData readContainer(long containerID)
+ throws IOException;
+
+ /**
* Gets the container size -- Computed by SCM from Container Reports.
* @param containerID - ID of the container.
* @return number of bytes used by this container.
@@ -110,7 +144,7 @@ public interface ScmClient {
* @return ContainerInfo
* @throws IOException - in case of error.
*/
- ContainerInfo createContainer(HddsProtos.ReplicationType type,
+ ContainerWithPipeline createContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor replicationFactor,
String owner) throws IOException;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
index ee05c87..9593717 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
@@ -15,34 +15,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hdds.scm.container.common.helpers;
+import static java.lang.Math.max;
+
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+import java.util.Comparator;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.util.Time;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Comparator;
-
-import static java.lang.Math.max;
-
/**
* Class wraps ozone container info.
*/
-public class ContainerInfo
- implements Comparator<ContainerInfo>, Comparable<ContainerInfo> {
+public class ContainerInfo implements Comparator<ContainerInfo>,
+ Comparable<ContainerInfo>, Externalizable {
private static final ObjectWriter WRITER;
+ private static final String SERIALIZATION_ERROR_MSG = "Java serialization not"
+ + " supported. Use protobuf instead.";
static {
ObjectMapper mapper = new ObjectMapper();
@@ -53,7 +58,9 @@ public class ContainerInfo
}
private HddsProtos.LifeCycleState state;
- private Pipeline pipeline;
+ private String pipelineName;
+ private ReplicationFactor replicationFactor;
+ private ReplicationType replicationType;
// Bytes allocated by SCM for clients.
private long allocatedBytes;
// Actual container usage, updated through heartbeat.
@@ -75,15 +82,17 @@ public class ContainerInfo
ContainerInfo(
long containerID,
HddsProtos.LifeCycleState state,
- Pipeline pipeline,
+ String pipelineName,
long allocatedBytes,
long usedBytes,
long numberOfKeys,
long stateEnterTime,
String owner,
- long deleteTransactionId) {
+ long deleteTransactionId,
+ ReplicationFactor replicationFactor,
+ ReplicationType repType) {
this.containerID = containerID;
- this.pipeline = pipeline;
+ this.pipelineName = pipelineName;
this.allocatedBytes = allocatedBytes;
this.usedBytes = usedBytes;
this.numberOfKeys = numberOfKeys;
@@ -92,6 +101,8 @@ public class ContainerInfo
this.stateEnterTime = stateEnterTime;
this.owner = owner;
this.deleteTransactionId = deleteTransactionId;
+ this.replicationFactor = replicationFactor;
+ this.replicationType = repType;
}
/**
@@ -102,16 +113,18 @@ public class ContainerInfo
public static ContainerInfo fromProtobuf(HddsProtos.SCMContainerInfo info) {
ContainerInfo.Builder builder = new ContainerInfo.Builder();
- builder.setPipeline(Pipeline.getFromProtoBuf(info.getPipeline()));
- builder.setAllocatedBytes(info.getAllocatedBytes());
- builder.setUsedBytes(info.getUsedBytes());
- builder.setNumberOfKeys(info.getNumberOfKeys());
- builder.setState(info.getState());
- builder.setStateEnterTime(info.getStateEnterTime());
- builder.setOwner(info.getOwner());
- builder.setContainerID(info.getContainerID());
- builder.setDeleteTransactionId(info.getDeleteTransactionId());
- return builder.build();
+ return builder.setPipelineName(info.getPipelineName())
+ .setAllocatedBytes(info.getAllocatedBytes())
+ .setUsedBytes(info.getUsedBytes())
+ .setNumberOfKeys(info.getNumberOfKeys())
+ .setState(info.getState())
+ .setStateEnterTime(info.getStateEnterTime())
+ .setOwner(info.getOwner())
+ .setContainerID(info.getContainerID())
+ .setDeleteTransactionId(info.getDeleteTransactionId())
+ .setReplicationFactor(info.getReplicationFactor())
+ .setReplicationType(info.getReplicationType())
+ .build();
}
public long getContainerID() {
@@ -130,8 +143,12 @@ public class ContainerInfo
return stateEnterTime;
}
- public Pipeline getPipeline() {
- return pipeline;
+ public ReplicationFactor getReplicationFactor() {
+ return replicationFactor;
+ }
+
+ public String getPipelineName() {
+ return pipelineName;
}
public long getAllocatedBytes() {
@@ -177,6 +194,10 @@ public class ContainerInfo
return lastUsed;
}
+ public ReplicationType getReplicationType() {
+ return replicationType;
+ }
+
public void updateLastUsedTime() {
lastUsed = Time.monotonicNow();
}
@@ -190,19 +211,17 @@ public class ContainerInfo
public HddsProtos.SCMContainerInfo getProtobuf() {
HddsProtos.SCMContainerInfo.Builder builder =
HddsProtos.SCMContainerInfo.newBuilder();
- builder.setPipeline(getPipeline().getProtobufMessage());
- builder.setAllocatedBytes(getAllocatedBytes());
- builder.setUsedBytes(getUsedBytes());
- builder.setNumberOfKeys(getNumberOfKeys());
- builder.setState(state);
- builder.setStateEnterTime(stateEnterTime);
- builder.setContainerID(getContainerID());
- builder.setDeleteTransactionId(deleteTransactionId);
-
- if (getOwner() != null) {
- builder.setOwner(getOwner());
- }
- return builder.build();
+ return builder.setAllocatedBytes(getAllocatedBytes())
+ .setContainerID(getContainerID())
+ .setUsedBytes(getUsedBytes())
+ .setNumberOfKeys(getNumberOfKeys()).setState(getState())
+ .setStateEnterTime(getStateEnterTime()).setContainerID(getContainerID())
+ .setDeleteTransactionId(getDeleteTransactionId())
+ .setPipelineName(getPipelineName())
+ .setReplicationFactor(getReplicationFactor())
+ .setReplicationType(getReplicationType())
+ .setOwner(getOwner())
+ .build();
}
public String getOwner() {
@@ -217,7 +236,7 @@ public class ContainerInfo
public String toString() {
return "ContainerInfo{"
+ "state=" + state
- + ", pipeline=" + pipeline
+ + ", pipelineName=" + pipelineName
+ ", stateEnterTime=" + stateEnterTime
+ ", owner=" + owner
+ '}';
@@ -252,9 +271,7 @@ public class ContainerInfo
public int hashCode() {
return new HashCodeBuilder(11, 811)
.append(getContainerID())
- .append(pipeline.getFactor())
- .append(pipeline.getType())
- .append(owner)
+ .append(getOwner())
.toHashCode();
}
@@ -327,12 +344,44 @@ public class ContainerInfo
this.data = Arrays.copyOf(data, data.length);
}
}
+
+ /**
+ * Throws IOException as default java serialization is not supported. Use
+ * serialization via protobuf instead.
+ *
+ * @param out the stream to write the object to
+ * @throws IOException Includes any I/O exceptions that may occur
+ * @serialData Overriding methods should use this tag to describe
+ * the data layout of this Externalizable object.
+ * List the sequence of element types and, if possible,
+ * relate the element to a public/protected field and/or
+ * method of this Externalizable class.
+ */
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ throw new IOException(SERIALIZATION_ERROR_MSG);
+ }
+
+ /**
+ * Throws IOException as default java serialization is not supported. Use
+ * serialization via protobuf instead.
+ *
+ * @param in the stream to read data from in order to restore the object
+ * @throws IOException if I/O errors occur
+ * @throws ClassNotFoundException If the class for an object being
+ * restored cannot be found.
+ */
+ @Override
+ public void readExternal(ObjectInput in)
+ throws IOException, ClassNotFoundException {
+ throw new IOException(SERIALIZATION_ERROR_MSG);
+ }
+
/**
* Builder class for ContainerInfo.
*/
public static class Builder {
private HddsProtos.LifeCycleState state;
- private Pipeline pipeline;
private long allocated;
private long used;
private long keys;
@@ -340,6 +389,25 @@ public class ContainerInfo
private String owner;
private long containerID;
private long deleteTransactionId;
+ private String pipelineName;
+ private ReplicationFactor replicationFactor;
+ private ReplicationType replicationType;
+
+ public Builder setReplicationType(
+ ReplicationType replicationType) {
+ this.replicationType = replicationType;
+ return this;
+ }
+
+ public Builder setPipelineName(String pipelineName) {
+ this.pipelineName = pipelineName;
+ return this;
+ }
+
+ public Builder setReplicationFactor(ReplicationFactor repFactor) {
+ this.replicationFactor = repFactor;
+ return this;
+ }
public Builder setContainerID(long id) {
Preconditions.checkState(id >= 0);
@@ -352,11 +420,6 @@ public class ContainerInfo
return this;
}
- public Builder setPipeline(Pipeline containerPipeline) {
- this.pipeline = containerPipeline;
- return this;
- }
-
public Builder setAllocatedBytes(long bytesAllocated) {
this.allocated = bytesAllocated;
return this;
@@ -388,9 +451,9 @@ public class ContainerInfo
}
public ContainerInfo build() {
- return new
- ContainerInfo(containerID, state, pipeline, allocated,
- used, keys, stateEnterTime, owner, deleteTransactionId);
+ return new ContainerInfo(containerID, state, pipelineName, allocated,
+ used, keys, stateEnterTime, owner, deleteTransactionId,
+ replicationFactor, replicationType);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
new file mode 100644
index 0000000..e71d429
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.common.helpers;
+
+import java.util.Comparator;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
+/**
+ * Class wraps ozone container info.
+ */
+public class ContainerWithPipeline
+ implements Comparator<ContainerWithPipeline>, Comparable<ContainerWithPipeline> {
+
+ private final ContainerInfo containerInfo;
+ private final Pipeline pipeline;
+
+ public ContainerWithPipeline(ContainerInfo containerInfo, Pipeline pipeline) {
+ this.containerInfo = containerInfo;
+ this.pipeline = pipeline;
+ }
+
+ public ContainerInfo getContainerInfo() {
+ return containerInfo;
+ }
+
+ public Pipeline getPipeline() {
+ return pipeline;
+ }
+
+ public static ContainerWithPipeline fromProtobuf(HddsProtos.ContainerWithPipeline allocatedContainer) {
+ return new ContainerWithPipeline(
+ ContainerInfo.fromProtobuf(allocatedContainer.getContainerInfo()),
+ Pipeline.getFromProtoBuf(allocatedContainer.getPipeline()));
+ }
+
+ public HddsProtos.ContainerWithPipeline getProtobuf() {
+ HddsProtos.ContainerWithPipeline.Builder builder =
+ HddsProtos.ContainerWithPipeline.newBuilder();
+ builder.setContainerInfo(getContainerInfo().getProtobuf())
+ .setPipeline(getPipeline().getProtobufMessage());
+
+ return builder.build();
+ }
+
+
+ @Override
+ public String toString() {
+ return containerInfo.toString() + " | " + pipeline.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ContainerWithPipeline that = (ContainerWithPipeline) o;
+
+ return new EqualsBuilder()
+ .append(getContainerInfo(), that.getContainerInfo())
+ .append(getPipeline(), that.getPipeline())
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(11, 811)
+ .append(getContainerInfo())
+ .append(getPipeline())
+ .toHashCode();
+ }
+
+ /**
+ * Compares its two arguments for order. Returns a negative integer, zero, or
+ * a positive integer as the first argument is less than, equal to, or greater
+ * than the second.<p>
+ *
+ * @param o1 the first object to be compared.
+ * @param o2 the second object to be compared.
+ * @return a negative integer, zero, or a positive integer as the first
+ * argument is less than, equal to, or greater than the second.
+ * @throws NullPointerException if an argument is null and this comparator
+ * does not permit null arguments
+ * @throws ClassCastException if the arguments' types prevent them from
+ * being compared by this comparator.
+ */
+ @Override
+ public int compare(ContainerWithPipeline o1, ContainerWithPipeline o2) {
+ return o1.getContainerInfo().compareTo(o2.getContainerInfo());
+ }
+
+ /**
+ * Compares this object with the specified object for order. Returns a
+ * negative integer, zero, or a positive integer as this object is less than,
+ * equal to, or greater than the specified object.
+ *
+ * @param o the object to be compared.
+ * @return a negative integer, zero, or a positive integer as this object is
+ * less than, equal to, or greater than the specified object.
+ * @throws NullPointerException if the specified object is null
+ * @throws ClassCastException if the specified object's type prevents it
+ * from being compared to this object.
+ */
+ @Override
+ public int compareTo(ContainerWithPipeline o) {
+ return this.compare(this, o);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index e8d85e0..b787409 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.protocol;
import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -38,7 +39,7 @@ public interface StorageContainerLocationProtocol {
* set of datanodes that should be used creating this container.
*
*/
- ContainerInfo allocateContainer(HddsProtos.ReplicationType replicationType,
+ ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType replicationType,
HddsProtos.ReplicationFactor factor, String owner)
throws IOException;
@@ -54,6 +55,16 @@ public interface StorageContainerLocationProtocol {
ContainerInfo getContainer(long containerID) throws IOException;
/**
+ * Ask SCM the location of the container. SCM responds with a group of
+ * nodes where this container and its replicas are located.
+ *
+ * @param containerID - ID of the container.
+ * @return ContainerWithPipeline - the container info with the pipeline.
+ * @throws IOException
+ */
+ ContainerWithPipeline getContainerWithPipeline(long containerID) throws IOException;
+
+ /**
* Ask SCM a list of containers with a range of container names
* and the limit of count.
* Search container names between start name(exclusive), and
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index bba4e17..4b03d12 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -20,7 +20,10 @@ import com.google.common.base.Preconditions;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
@@ -95,7 +98,7 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
* @throws IOException
*/
@Override
- public ContainerInfo allocateContainer(HddsProtos.ReplicationType type,
+ public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor factor, String owner) throws IOException {
ContainerRequestProto request = ContainerRequestProto.newBuilder()
@@ -114,7 +117,7 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
throw new IOException(response.hasErrorMessage() ?
response.getErrorMessage() : "Allocate container failed.");
}
- return ContainerInfo.fromProtobuf(response.getContainerInfo());
+ return ContainerWithPipeline.fromProtobuf(response.getContainerWithPipeline());
}
public ContainerInfo getContainer(long containerID) throws IOException {
@@ -136,6 +139,25 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
/**
* {@inheritDoc}
*/
+ public ContainerWithPipeline getContainerWithPipeline(long containerID) throws IOException {
+ Preconditions.checkState(containerID >= 0,
+ "Container ID cannot be negative");
+ GetContainerWithPipelineRequestProto request = GetContainerWithPipelineRequestProto
+ .newBuilder()
+ .setContainerID(containerID)
+ .build();
+ try {
+ GetContainerWithPipelineResponseProto response =
+ rpcProxy.getContainerWithPipeline(NULL_RPC_CONTROLLER, request);
+ return ContainerWithPipeline.fromProtobuf(response.getContainerWithPipeline());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
@Override
public List<ContainerInfo> listContainer(long startContainerID, int count)
throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 70a0e8a..d66919f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -21,7 +21,10 @@ package org.apache.hadoop.ozone.protocolPB;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
@@ -82,10 +85,11 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
public ContainerResponseProto allocateContainer(RpcController unused,
ContainerRequestProto request) throws ServiceException {
try {
- ContainerInfo container = impl.allocateContainer(request.getReplicationType(),
- request.getReplicationFactor(), request.getOwner());
+ ContainerWithPipeline containerWithPipeline = impl
+ .allocateContainer(request.getReplicationType(),
+ request.getReplicationFactor(), request.getOwner());
return ContainerResponseProto.newBuilder()
- .setContainerInfo(container.getProtobuf())
+ .setContainerWithPipeline(containerWithPipeline.getProtobuf())
.setErrorCode(ContainerResponseProto.Error.success)
.build();
@@ -109,6 +113,21 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
}
@Override
+ public GetContainerWithPipelineResponseProto getContainerWithPipeline(
+ RpcController controller, GetContainerWithPipelineRequestProto request)
+ throws ServiceException {
+ try {
+ ContainerWithPipeline container = impl
+ .getContainerWithPipeline(request.getContainerID());
+ return GetContainerWithPipelineResponseProto.newBuilder()
+ .setContainerWithPipeline(container.getProtobuf())
+ .build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
public SCMListContainerResponseProto listContainer(RpcController controller,
SCMListContainerRequestProto request) throws ServiceException {
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
index 090e6eb..143c2ae 100644
--- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
@@ -52,7 +52,7 @@ message ContainerResponseProto {
errorContainerMissing = 3;
}
required Error errorCode = 1;
- required SCMContainerInfo containerInfo = 2;
+ required ContainerWithPipeline containerWithPipeline = 2;
optional string errorMessage = 3;
}
@@ -64,6 +64,14 @@ message GetContainerResponseProto {
required SCMContainerInfo containerInfo = 1;
}
+message GetContainerWithPipelineRequestProto {
+ required int64 containerID = 1;
+}
+
+message GetContainerWithPipelineResponseProto {
+ required ContainerWithPipeline containerWithPipeline = 1;
+}
+
message SCMListContainerRequestProto {
required uint32 count = 1;
optional uint64 startContainerID = 2;
@@ -171,6 +179,11 @@ service StorageContainerLocationProtocolService {
*/
rpc getContainer(GetContainerRequestProto) returns (GetContainerResponseProto);
+ /**
+ * Returns the pipeline for a given container.
+ */
+ rpc getContainerWithPipeline(GetContainerWithPipelineRequestProto) returns (GetContainerWithPipelineResponseProto);
+
rpc listContainer(SCMListContainerRequestProto) returns (SCMListContainerResponseProto);
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/common/src/main/proto/hdds.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto
index 816efa7..1c9ee19 100644
--- a/hadoop-hdds/common/src/main/proto/hdds.proto
+++ b/hadoop-hdds/common/src/main/proto/hdds.proto
@@ -132,7 +132,7 @@ enum LifeCycleEvent {
message SCMContainerInfo {
required int64 containerID = 1;
required LifeCycleState state = 2;
- required Pipeline pipeline = 3;
+ optional string pipelineName = 3;
// This is not total size of container, but space allocated by SCM for
// clients to write blocks
required uint64 allocatedBytes = 4;
@@ -141,6 +141,13 @@ message SCMContainerInfo {
optional int64 stateEnterTime = 7;
required string owner = 8;
optional int64 deleteTransactionId = 9;
+ required ReplicationFactor replicationFactor = 10;
+ required ReplicationType replicationType = 11;
+}
+
+message ContainerWithPipeline {
+ required SCMContainerInfo containerInfo = 1;
+ required Pipeline pipeline = 2;
}
message GetScmInfoRequestProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 7cfbdab..953f71e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -16,10 +16,12 @@
*/
package org.apache.hadoop.hdds.scm.block;
+import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.Mapping;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -156,13 +158,13 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
lock.lock();
try {
for (int i = 0; i < count; i++) {
- ContainerInfo containerInfo = null;
+ ContainerWithPipeline containerWithPipeline = null;
try {
// TODO: Fix this later when Ratis is made the Default.
- containerInfo = containerManager.allocateContainer(type, factor,
+ containerWithPipeline = containerManager.allocateContainer(type, factor,
owner);
- if (containerInfo == null) {
+ if (containerWithPipeline == null) {
LOG.warn("Unable to allocate container.");
continue;
}
@@ -231,30 +233,27 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
can use different kind of policies.
*/
- ContainerInfo containerInfo;
+ ContainerWithPipeline containerWithPipeline;
// Look for ALLOCATED container that matches all other parameters.
- containerInfo =
- containerManager
- .getStateManager()
- .getMatchingContainer(
- size, owner, type, factor, HddsProtos.LifeCycleState
- .ALLOCATED);
- if (containerInfo != null) {
- containerManager.updateContainerState(containerInfo.getContainerID(),
+ containerWithPipeline = containerManager
+ .getMatchingContainerWithPipeline(size, owner, type, factor,
+ HddsProtos.LifeCycleState.ALLOCATED);
+ if (containerWithPipeline != null) {
+ containerManager.updateContainerState(
+ containerWithPipeline.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
- return newBlock(containerInfo, HddsProtos.LifeCycleState.ALLOCATED);
+ return newBlock(containerWithPipeline,
+ HddsProtos.LifeCycleState.ALLOCATED);
}
// Since we found no allocated containers that match our criteria, let us
// look for OPEN containers that match the criteria.
- containerInfo =
- containerManager
- .getStateManager()
- .getMatchingContainer(size, owner, type, factor, HddsProtos
- .LifeCycleState.OPEN);
- if (containerInfo != null) {
- return newBlock(containerInfo, HddsProtos.LifeCycleState.OPEN);
+ containerWithPipeline = containerManager
+ .getMatchingContainerWithPipeline(size, owner, type, factor,
+ HddsProtos.LifeCycleState.OPEN);
+ if (containerWithPipeline != null) {
+ return newBlock(containerWithPipeline, HddsProtos.LifeCycleState.OPEN);
}
// We found neither ALLOCATED or OPEN Containers. This generally means
@@ -264,16 +263,15 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
preAllocateContainers(containerProvisionBatchSize, type, factor, owner);
// Since we just allocated a set of containers this should work
- containerInfo =
- containerManager
- .getStateManager()
- .getMatchingContainer(
- size, owner, type, factor, HddsProtos.LifeCycleState
- .ALLOCATED);
- if (containerInfo != null) {
- containerManager.updateContainerState(containerInfo.getContainerID(),
+ containerWithPipeline = containerManager
+ .getMatchingContainerWithPipeline(size, owner, type, factor,
+ HddsProtos.LifeCycleState.ALLOCATED);
+ if (containerWithPipeline != null) {
+ containerManager.updateContainerState(
+ containerWithPipeline.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
- return newBlock(containerInfo, HddsProtos.LifeCycleState.ALLOCATED);
+ return newBlock(containerWithPipeline,
+ HddsProtos.LifeCycleState.ALLOCATED);
}
// we have tried all strategies we know and but somehow we are not able
@@ -290,18 +288,28 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
}
}
+ private String getChannelName(ReplicationType type) {
+ switch (type) {
+ case RATIS:
+ return "RA" + UUID.randomUUID().toString().substring(3);
+ case STAND_ALONE:
+ return "SA" + UUID.randomUUID().toString().substring(3);
+ default:
+ return "RA" + UUID.randomUUID().toString().substring(3);
+ }
+ }
+
/**
* newBlock - returns a new block assigned to a container.
*
- * @param containerInfo - Container Info.
+ * @param containerWithPipeline - Container Info.
* @param state - Current state of the container.
* @return AllocatedBlock
*/
- private AllocatedBlock newBlock(
- ContainerInfo containerInfo, HddsProtos.LifeCycleState state)
- throws IOException {
-
- if (containerInfo.getPipeline().getMachines().size() == 0) {
+ private AllocatedBlock newBlock(ContainerWithPipeline containerWithPipeline,
+ HddsProtos.LifeCycleState state) throws IOException {
+ ContainerInfo containerInfo = containerWithPipeline.getContainerInfo();
+ if (containerWithPipeline.getPipeline().getDatanodes().size() == 0) {
LOG.error("Pipeline Machine count is zero.");
return null;
}
@@ -317,7 +325,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
AllocatedBlock.Builder abb =
new AllocatedBlock.Builder()
.setBlockID(new BlockID(containerID, localID))
- .setPipeline(containerInfo.getPipeline())
+ .setPipeline(containerWithPipeline.getPipeline())
.setShouldCreateContainer(createContainer);
LOG.trace("New block allocated : {} Container ID: {}", localID,
containerID);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
index 32290cc..d71e7b0 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
@@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.block;
import com.google.common.collect.ArrayListMultimap;
import org.apache.hadoop.hdds.scm.container.Mapping;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
@@ -29,6 +28,7 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
/**
* A wrapper class to hold info about datanode and all deleted block
@@ -54,21 +54,22 @@ public class DatanodeDeletedBlockTransactions {
}
public void addTransaction(DeletedBlocksTransaction tx) throws IOException {
- ContainerInfo info = null;
+ Pipeline pipeline = null;
try {
- info = mappingService.getContainer(tx.getContainerID());
+ pipeline = mappingService.getContainerWithPipeline(tx.getContainerID())
+ .getPipeline();
} catch (IOException e) {
SCMBlockDeletingService.LOG.warn("Got container info error.", e);
}
- if (info == null) {
+ if (pipeline == null) {
SCMBlockDeletingService.LOG.warn(
"Container {} not found, continue to process next",
tx.getContainerID());
return;
}
- for (DatanodeDetails dd : info.getPipeline().getMachines()) {
+ for (DatanodeDetails dd : pipeline.getMachines()) {
UUID dnID = dd.getUuid();
if (transactions.containsKey(dnID)) {
List<DeletedBlocksTransaction> txs = transactions.get(dnID);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index 16e84a3..7b24538 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -16,9 +16,11 @@
*/
package org.apache.hadoop.hdds.scm.container;
+import java.io.IOException;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -54,22 +56,32 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
LOG.info("Close container Event triggered for container : {}",
containerID.getId());
- ContainerStateManager stateManager = containerManager.getStateManager();
- ContainerInfo info = stateManager.getContainer(containerID);
- if (info == null) {
- LOG.info("Container with id : {} does not exist", containerID.getId());
+ ContainerWithPipeline containerWithPipeline = null;
+ ContainerInfo info;
+ try {
+ containerWithPipeline = containerManager.getContainerWithPipeline(containerID.getId());
+ info = containerWithPipeline.getContainerInfo();
+ if (info == null) {
+ LOG.info("Failed to update the container state. Container with id : {} "
+ + "does not exist", containerID.getId());
+ return;
+ }
+ } catch (IOException e) {
+ LOG.info("Failed to update the container state. Container with id : {} "
+ + "does not exist", containerID.getId());
return;
}
+
if (info.getState() == HddsProtos.LifeCycleState.OPEN) {
- for (DatanodeDetails datanode : info.getPipeline().getMachines()) {
+ for (DatanodeDetails datanode : containerWithPipeline.getPipeline().getMachines()) {
containerManager.getNodeManager().addDatanodeCommand(datanode.getUuid(),
new CloseContainerCommand(containerID.getId(),
- info.getPipeline().getType()));
+ info.getReplicationType()));
}
try {
// Finalize event will make sure the state of the container transitions
// from OPEN to CLOSING in containerStateManager.
- stateManager
+ containerManager.getStateManager()
.updateContainerState(info, HddsProtos.LifeCycleEvent.FINALIZE);
} catch (SCMException ex) {
LOG.error("Failed to update the container state for container : {}"
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index 9fd30f2..e25c5b4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -21,6 +21,10 @@ import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
@@ -167,6 +171,44 @@ public class ContainerMapping implements Mapping {
}
/**
+ * Returns the ContainerInfo from the container ID.
+ *
+ * @param containerID - ID of container.
+ * @return - ContainerWithPipeline such as creation state and the pipeline.
+ * @throws IOException
+ */
+ @Override
+ public ContainerWithPipeline getContainerWithPipeline(long containerID)
+ throws IOException {
+ ContainerInfo contInfo;
+ lock.lock();
+ try {
+ byte[] containerBytes = containerStore.get(
+ Longs.toByteArray(containerID));
+ if (containerBytes == null) {
+ throw new SCMException(
+ "Specified key does not exist. key : " + containerID,
+ SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+ }
+ HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER
+ .parseFrom(containerBytes);
+ contInfo = ContainerInfo.fromProtobuf(temp);
+ Pipeline pipeline = pipelineSelector
+ .getPipeline(contInfo.getPipelineName(),
+ contInfo.getReplicationType());
+
+ if(pipeline == null) {
+ pipeline = pipelineSelector
+ .getReplicationPipeline(contInfo.getReplicationType(),
+ contInfo.getReplicationFactor());
+ }
+ return new ContainerWithPipeline(contInfo, pipeline);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
* {@inheritDoc}
*/
@Override
@@ -208,13 +250,15 @@ public class ContainerMapping implements Mapping {
* @throws IOException - Exception
*/
@Override
- public ContainerInfo allocateContainer(
+ public ContainerWithPipeline allocateContainer(
ReplicationType type,
ReplicationFactor replicationFactor,
String owner)
throws IOException {
ContainerInfo containerInfo;
+ ContainerWithPipeline containerWithPipeline;
+
if (!nodeManager.isOutOfChillMode()) {
throw new SCMException(
"Unable to create container while in chill mode",
@@ -223,9 +267,9 @@ public class ContainerMapping implements Mapping {
lock.lock();
try {
- containerInfo =
- containerStateManager.allocateContainer(
+ containerWithPipeline = containerStateManager.allocateContainer(
pipelineSelector, type, replicationFactor, owner);
+ containerInfo = containerWithPipeline.getContainerInfo();
byte[] containerIDBytes = Longs.toByteArray(
containerInfo.getContainerID());
@@ -234,7 +278,7 @@ public class ContainerMapping implements Mapping {
} finally {
lock.unlock();
}
- return containerInfo;
+ return containerWithPipeline;
}
/**
@@ -381,6 +425,35 @@ public class ContainerMapping implements Mapping {
}
/**
+ * Return a container matching the attributes specified.
+ *
+ * @param size - Space needed in the Container.
+ * @param owner - Owner of the container - A specific nameservice.
+ * @param type - Replication Type {StandAlone, Ratis}
+ * @param factor - Replication Factor {ONE, THREE}
+ * @param state - State of the Container-- {Open, Allocated etc.}
+ * @return ContainerInfo, null if there is no match found.
+ */
+ public ContainerWithPipeline getMatchingContainerWithPipeline(final long size,
+ String owner, ReplicationType type, ReplicationFactor factor,
+ LifeCycleState state) throws IOException {
+ ContainerInfo containerInfo = getStateManager()
+ .getMatchingContainer(size, owner, type, factor, state);
+ if (containerInfo == null) {
+ return null;
+ }
+ Pipeline pipeline = pipelineSelector
+ .getPipeline(containerInfo.getPipelineName(),
+ containerInfo.getReplicationType());
+ if (pipeline == null) {
+ pipelineSelector
+ .getReplicationPipeline(containerInfo.getReplicationType(),
+ containerInfo.getReplicationFactor());
+ }
+ return new ContainerWithPipeline(containerInfo, pipeline);
+ }
+
+ /**
* Process container report from Datanode.
* <p>
* Processing follows a very simple logic for time being.
@@ -415,7 +488,7 @@ public class ContainerMapping implements Mapping {
HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
HddsProtos.SCMContainerInfo newState =
- reconcileState(datanodeState, knownState);
+ reconcileState(datanodeState, knownState, datanodeDetails);
// FIX ME: This can be optimized, we write twice to memory, where a
// single write would work well.
@@ -425,8 +498,14 @@ public class ContainerMapping implements Mapping {
containerStore.put(dbKey, newState.toByteArray());
// If the container is closed, then state is already written to SCM
+ Pipeline pipeline = pipelineSelector.getPipeline(newState.getPipelineName(), newState.getReplicationType());
+ if(pipeline == null) {
+ pipeline = pipelineSelector
+ .getReplicationPipeline(newState.getReplicationType(),
+ newState.getReplicationFactor());
+ }
// DB.TODO: So can we can write only once to DB.
- if (closeContainerIfNeeded(newState)) {
+ if (closeContainerIfNeeded(newState, pipeline)) {
LOG.info("Closing the Container: {}", newState.getContainerID());
}
} else {
@@ -447,15 +526,22 @@ public class ContainerMapping implements Mapping {
*
* @param datanodeState - State from the Datanode.
* @param knownState - State inside SCM.
+ * @param dnDetails
* @return new SCM State for this container.
*/
private HddsProtos.SCMContainerInfo reconcileState(
StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState,
- HddsProtos.SCMContainerInfo knownState) {
+ SCMContainerInfo knownState, DatanodeDetails dnDetails) {
HddsProtos.SCMContainerInfo.Builder builder =
HddsProtos.SCMContainerInfo.newBuilder();
- builder.setContainerID(knownState.getContainerID());
- builder.setPipeline(knownState.getPipeline());
+ builder.setContainerID(knownState.getContainerID())
+ .setPipelineName(knownState.getPipelineName())
+ .setReplicationType(knownState.getReplicationType())
+ .setReplicationFactor(knownState.getReplicationFactor());
+
+ // TODO: If current state doesn't have this DN in list of DataNodes with replica
+ // then add it in list of replicas.
+
// If used size is greater than allocated size, we will be updating
// allocated size with used size. This update is done as a fallback
// mechanism in case SCM crashes without properly updating allocated
@@ -464,13 +550,13 @@ public class ContainerMapping implements Mapping {
long usedSize = datanodeState.getUsed();
long allocated = knownState.getAllocatedBytes() > usedSize ?
knownState.getAllocatedBytes() : usedSize;
- builder.setAllocatedBytes(allocated);
- builder.setUsedBytes(usedSize);
- builder.setNumberOfKeys(datanodeState.getKeyCount());
- builder.setState(knownState.getState());
- builder.setStateEnterTime(knownState.getStateEnterTime());
- builder.setContainerID(knownState.getContainerID());
- builder.setDeleteTransactionId(knownState.getDeleteTransactionId());
+ builder.setAllocatedBytes(allocated)
+ .setUsedBytes(usedSize)
+ .setNumberOfKeys(datanodeState.getKeyCount())
+ .setState(knownState.getState())
+ .setStateEnterTime(knownState.getStateEnterTime())
+ .setContainerID(knownState.getContainerID())
+ .setDeleteTransactionId(knownState.getDeleteTransactionId());
if (knownState.getOwner() != null) {
builder.setOwner(knownState.getOwner());
}
@@ -485,9 +571,11 @@ public class ContainerMapping implements Mapping {
* one protobuf in one file and another definition in another file.
*
* @param newState - This is the state we maintain in SCM.
+ * @param pipeline
* @throws IOException
*/
- private boolean closeContainerIfNeeded(HddsProtos.SCMContainerInfo newState)
+ private boolean closeContainerIfNeeded(SCMContainerInfo newState,
+ Pipeline pipeline)
throws IOException {
float containerUsedPercentage = 1.0f *
newState.getUsedBytes() / this.size;
@@ -498,7 +586,7 @@ public class ContainerMapping implements Mapping {
// We will call closer till get to the closed state.
// That is SCM will make this call repeatedly until we reach the closed
// state.
- closer.close(newState);
+ closer.close(newState, pipeline);
if (shouldClose(scmInfo)) {
// This event moves the Container from Open to Closing State, this is
@@ -598,10 +686,12 @@ public class ContainerMapping implements Mapping {
.setAllocatedBytes(info.getAllocatedBytes())
.setNumberOfKeys(oldInfo.getNumberOfKeys())
.setOwner(oldInfo.getOwner())
- .setPipeline(oldInfo.getPipeline())
+ .setPipelineName(oldInfo.getPipelineName())
.setState(oldInfo.getState())
.setUsedBytes(oldInfo.getUsedBytes())
.setDeleteTransactionId(oldInfo.getDeleteTransactionId())
+ .setReplicationFactor(oldInfo.getReplicationFactor())
+ .setReplicationType(oldInfo.getReplicationType())
.build();
containerStore.put(dbKey, newInfo.getProtobuf().toByteArray());
} else {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 08733bd..870ab1d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.container;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.states.ContainerState;
@@ -279,10 +280,10 @@ public class ContainerStateManager implements Closeable {
* @param selector -- Pipeline selector class.
* @param type -- Replication type.
* @param replicationFactor - Replication replicationFactor.
- * @return Container Info.
+ * @return ContainerWithPipeline
* @throws IOException on Failure.
*/
- public ContainerInfo allocateContainer(PipelineSelector selector, HddsProtos
+ public ContainerWithPipeline allocateContainer(PipelineSelector selector, HddsProtos
.ReplicationType type, HddsProtos.ReplicationFactor replicationFactor,
String owner) throws IOException {
@@ -295,7 +296,7 @@ public class ContainerStateManager implements Closeable {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(HddsProtos.LifeCycleState.ALLOCATED)
- .setPipeline(pipeline)
+ .setPipelineName(pipeline.getPipelineName())
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0)
@@ -305,11 +306,13 @@ public class ContainerStateManager implements Closeable {
.setOwner(owner)
.setContainerID(containerCount.incrementAndGet())
.setDeleteTransactionId(0)
+ .setReplicationFactor(replicationFactor)
+ .setReplicationType(pipeline.getType())
.build();
Preconditions.checkNotNull(containerInfo);
containers.addContainer(containerInfo);
LOG.trace("New container allocated: {}", containerInfo);
- return containerInfo;
+ return new ContainerWithPipeline(containerInfo, pipeline);
}
/**
@@ -432,8 +435,8 @@ public class ContainerStateManager implements Closeable {
containerInfo.updateLastUsedTime();
ContainerState key = new ContainerState(owner,
- containerInfo.getPipeline().getType(),
- containerInfo.getPipeline().getFactor());
+ containerInfo.getReplicationType(),
+ containerInfo.getReplicationFactor());
lastUsedMap.put(key, containerInfo.containerID());
return containerInfo;
}
@@ -458,6 +461,20 @@ public class ContainerStateManager implements Closeable {
}
/**
+ * Returns the containerInfo with pipeline for the given container id.
+ * @param selector -- Pipeline selector class.
+ * @param containerID id of the container
+ * @return ContainerInfo containerInfo
+ * @throws IOException
+ */
+ public ContainerWithPipeline getContainer(PipelineSelector selector,
+ ContainerID containerID) throws IOException {
+ ContainerInfo info = containers.getContainerInfo(containerID.getId());
+ Pipeline pipeline = selector.getPipeline(info.getPipelineName(), info.getReplicationType());
+ return new ContainerWithPipeline(info, pipeline);
+ }
+
+ /**
* Returns the containerInfo for the given container id.
* @param containerID id of the container
* @return ContainerInfo containerInfo
@@ -466,6 +483,7 @@ public class ContainerStateManager implements Closeable {
public ContainerInfo getContainer(ContainerID containerID) {
return containers.getContainerInfo(containerID.getId());
}
+
@Override
public void close() throws IOException {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
index e77a4b6..f52eb05 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
@@ -17,6 +17,10 @@
package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
@@ -43,6 +47,16 @@ public interface Mapping extends Closeable {
ContainerInfo getContainer(long containerID) throws IOException;
/**
+ * Returns the ContainerInfo from the container ID.
+ *
+ * @param containerID - ID of container.
+ * @return - ContainerWithPipeline such as creation state and the pipeline.
+ * @throws IOException
+ */
+ ContainerWithPipeline getContainerWithPipeline(long containerID)
+ throws IOException;
+
+ /**
* Returns containers under certain conditions.
* Search container IDs from start ID(exclusive),
* The max size of the searching range cannot exceed the
@@ -65,10 +79,10 @@ public interface Mapping extends Closeable {
*
* @param replicationFactor - replication factor of the container.
* @param owner
- * @return - Container Info.
+ * @return - ContainerWithPipeline.
* @throws IOException
*/
- ContainerInfo allocateContainer(HddsProtos.ReplicationType type,
+ ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor replicationFactor, String owner)
throws IOException;
@@ -120,4 +134,12 @@ public interface Mapping extends Closeable {
* @return NodeManager
*/
NodeManager getNodeManager();
+
+ /**
+ * Returns the ContainerWithPipeline.
+ * @return NodeManager
+ */
+ public ContainerWithPipeline getMatchingContainerWithPipeline(final long size,
+ String owner, ReplicationType type, ReplicationFactor factor,
+ LifeCycleState state) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
index cbb2ba7..3ca8ba9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
@@ -22,6 +22,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -90,8 +92,10 @@ public class ContainerCloser {
* lives.
*
* @param info - ContainerInfo.
+ * @param pipeline
*/
- public void close(HddsProtos.SCMContainerInfo info) {
+ public void close(SCMContainerInfo info,
+ Pipeline pipeline) {
if (commandIssued.containsKey(info.getContainerID())) {
// We check if we issued a close command in last 3 * reportInterval secs.
@@ -126,13 +130,10 @@ public class ContainerCloser {
// this queue can be emptied by a datanode after a close report is send
// to SCM. In that case also, data node will ignore this command.
- HddsProtos.Pipeline pipeline = info.getPipeline();
- for (HddsProtos.DatanodeDetailsProto datanodeDetails :
- pipeline.getMembersList()) {
- nodeManager.addDatanodeCommand(
- DatanodeDetails.getFromProtoBuf(datanodeDetails).getUuid(),
+ for (DatanodeDetails datanodeDetails : pipeline.getMachines()) {
+ nodeManager.addDatanodeCommand(datanodeDetails.getUuid(),
new CloseContainerCommand(info.getContainerID(),
- pipeline.getType()));
+ info.getReplicationType()));
}
if (!commandIssued.containsKey(info.getContainerID())) {
commandIssued.put(info.getContainerID(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index 48c6423..3ada8fe 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -116,7 +116,8 @@ public class ContainerStateMap {
public void addContainer(ContainerInfo info)
throws SCMException {
Preconditions.checkNotNull(info, "Container Info cannot be null");
- Preconditions.checkNotNull(info.getPipeline(), "Pipeline cannot be null");
+ Preconditions.checkArgument(info.getReplicationFactor().getNumber() > 0,
+ "ExpectedReplicaCount should be greater than 0");
try (AutoCloseableLock lock = autoLock.acquire()) {
ContainerID id = ContainerID.valueof(info.getContainerID());
@@ -129,8 +130,8 @@ public class ContainerStateMap {
lifeCycleStateMap.insert(info.getState(), id);
ownerMap.insert(info.getOwner(), id);
- factorMap.insert(info.getPipeline().getFactor(), id);
- typeMap.insert(info.getPipeline().getType(), id);
+ factorMap.insert(info.getReplicationFactor(), id);
+ typeMap.insert(info.getReplicationType(), id);
LOG.trace("Created container with {} successfully.", id);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
index 48affa4..a1fbce6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
@@ -16,6 +16,9 @@
*/
package org.apache.hadoop.hdds.scm.pipelines;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.WeakHashMap;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
@@ -25,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -36,11 +38,13 @@ public abstract class PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(PipelineManager.class);
private final List<Pipeline> activePipelines;
+ private final Map<String, Pipeline> activePipelineMap;
private final AtomicInteger pipelineIndex;
public PipelineManager() {
activePipelines = new LinkedList<>();
pipelineIndex = new AtomicInteger(0);
+ activePipelineMap = new WeakHashMap<>();
}
/**
@@ -76,6 +80,7 @@ public abstract class PipelineManager {
"replicationType:{} replicationFactor:{}",
pipeline.getPipelineName(), replicationType, replicationFactor);
activePipelines.add(pipeline);
+ activePipelineMap.put(pipeline.getPipelineName(), pipeline);
} else {
pipeline =
findOpenPipeline(replicationType, replicationFactor);
@@ -94,6 +99,26 @@ public abstract class PipelineManager {
}
}
+ /**
+ * This function to get pipeline with given pipeline name.
+ *
+ * @param pipelineName
+ * @return a Pipeline.
+ */
+ public synchronized final Pipeline getPipeline(String pipelineName) {
+ Pipeline pipeline = null;
+
+ // 1. Check if pipeline channel already exists
+ if (activePipelineMap.containsKey(pipelineName)) {
+ pipeline = activePipelineMap.get(pipelineName);
+ LOG.debug("Returning pipeline for pipelineName:{}", pipelineName);
+ return pipeline;
+ } else {
+ LOG.debug("Unable to find pipeline for pipelineName:{}", pipelineName);
+ }
+ return pipeline;
+ }
+
protected int getReplicationCount(ReplicationFactor factor) {
switch (factor) {
case ONE:
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
index 508ca9b..3846a84 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.pipelines;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
@@ -177,6 +178,21 @@ public class PipelineSelector {
}
/**
+ * This function to return pipeline for given pipeline name and replication
+ * type.
+ */
+ public Pipeline getPipeline(String pipelineName,
+ ReplicationType replicationType) throws IOException {
+ if (pipelineName == null) {
+ return null;
+ }
+ PipelineManager manager = getPipelineManager(replicationType);
+ Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
+ LOG.debug("Getting replication pipeline forReplicationType {} :" +
+ " pipelineName:{}", replicationType, pipelineName);
+ return manager.getPipeline(pipelineName);
+ }
+ /**
* Creates a pipeline from a specified set of Nodes.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
index ace8758..189060e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.pipelines.ratis;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
index e76027f..579a3a2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.scm.pipelines.standalone;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ca4f0ce/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index d73cccd..e1d478f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
@@ -145,11 +146,12 @@ public class SCMClientProtocolServer implements
}
@Override
- public ContainerInfo allocateContainer(HddsProtos.ReplicationType
+ public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType
replicationType, HddsProtos.ReplicationFactor factor,
String owner) throws IOException {
String remoteUser = getRpcRemoteUsername();
getScm().checkAdminAccess(remoteUser);
+
return scm.getScmContainerManager()
.allocateContainer(replicationType, factor, owner);
}
@@ -163,6 +165,14 @@ public class SCMClientProtocolServer implements
}
@Override
+ public ContainerWithPipeline getContainerWithPipeline(long containerID) throws IOException {
+ String remoteUser = getRpcRemoteUsername();
+ getScm().checkAdminAccess(remoteUser);
+ return scm.getScmContainerManager()
+ .getContainerWithPipeline(containerID);
+ }
+
+ @Override
public List<ContainerInfo> listContainer(long startContainerID,
int count) throws IOException {
return scm.getScmContainerManager().
@@ -248,7 +258,7 @@ public class SCMClientProtocolServer implements
throws IOException {
// TODO: will be addressed in future patch.
// This is needed only for debugging purposes to make sure cluster is
- // working correctly.
+ // working correctly.
return null;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org