You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/05/23 18:12:11 UTC
hadoop git commit: HDFS-11859. Ozone: SCM: Separate
BlockLocationProtocol from ContainerLocationProtocol. Contributed by Xiaoyu
Yao.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7240 5bba3ce76 -> 9f7b8a166
HDFS-11859. Ozone: SCM: Separate BlockLocationProtocol from ContainerLocationProtocol. Contributed by Xiaoyu Yao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9f7b8a16
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9f7b8a16
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9f7b8a16
Branch: refs/heads/HDFS-7240
Commit: 9f7b8a166b85167f5755c0c258d3550099ba4704
Parents: 5bba3ce
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Tue May 23 10:56:23 2017 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Tue May 23 10:56:45 2017 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs-client/pom.xml | 1 +
.../org/apache/hadoop/scm/ScmConfigKeys.java | 15 ++
.../common/helpers/DeleteBlockResult.java | 2 +-
.../scm/protocol/ScmBlockLocationProtocol.java | 2 +-
...kLocationProtocolClientSideTranslatorPB.java | 186 +++++++++++++++++++
.../protocolPB/ScmBlockLocationProtocolPB.java | 35 ++++
.../main/proto/ScmBlockLocationProtocol.proto | 133 +++++++++++++
.../StorageContainerLocationProtocol.proto | 92 ---------
.../server/datanode/ObjectStoreHandler.java | 19 ++
.../apache/hadoop/ozone/OzoneClientUtils.java | 47 +++++
...kLocationProtocolServerSideTranslatorPB.java | 148 +++++++++++++++
...rLocationProtocolServerSideTranslatorPB.java | 99 +---------
.../ozone/scm/StorageContainerManager.java | 40 +++-
.../apache/hadoop/ozone/MiniOzoneCluster.java | 1 +
14 files changed, 625 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f7b8a16/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
index 067cd7b..d3764fc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
@@ -192,6 +192,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<include>DatanodeContainerProtocol.proto</include>
<include>Ozone.proto</include>
<include>KeySpaceManagerProtocol.proto</include>
+ <include>ScmBlockLocationProtocol.proto</include>
</includes>
</source>
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f7b8a16/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
index 03f9c82..d76565b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
@@ -53,10 +53,17 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_CLIENT_PORT_KEY =
"ozone.scm.client.port";
public static final int OZONE_SCM_CLIENT_PORT_DEFAULT = 9860;
+
public static final String OZONE_SCM_DATANODE_PORT_KEY =
"ozone.scm.datanode.port";
public static final int OZONE_SCM_DATANODE_PORT_DEFAULT = 9861;
+ // OZONE_KSM_PORT_DEFAULT = 9862
+ public static final String OZONE_SCM_BLOCK_CLIENT_PORT_KEY =
+ "ozone.scm.block.client.port";
+ public static final int OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT = 9863;
+
+ // Container service client
public static final String OZONE_SCM_CLIENT_ADDRESS_KEY =
"ozone.scm.client.address";
public static final String OZONE_SCM_CLIENT_BIND_HOST_KEY =
@@ -64,6 +71,14 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_CLIENT_BIND_HOST_DEFAULT =
"0.0.0.0";
+ // Block service client
+ public static final String OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY =
+ "ozone.scm.block.client.address";
+ public static final String OZONE_SCM_BLOCK_CLIENT_BIND_HOST_KEY =
+ "ozone.scm.block.client.bind.host";
+ public static final String OZONE_SCM_BLOCK_CLIENT_BIND_HOST_DEFAULT =
+ "0.0.0.0";
+
public static final String OZONE_SCM_DATANODE_ADDRESS_KEY =
"ozone.scm.datanode.address";
public static final String OZONE_SCM_DATANODE_BIND_HOST_KEY =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f7b8a16/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/DeleteBlockResult.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/DeleteBlockResult.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/DeleteBlockResult.java
index cdb4d05..5dbb853 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/DeleteBlockResult.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/DeleteBlockResult.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.scm.container.common.helpers;
import static org.apache.hadoop.ozone.protocol.proto
- .StorageContainerLocationProtocolProtos.DeleteScmBlockResult;
+ .ScmBlockLocationProtocolProtos.DeleteScmBlockResult;
/**
* Class wraps storage container manager block deletion results.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f7b8a16/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmBlockLocationProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmBlockLocationProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmBlockLocationProtocol.java
index 40b475d..e944375 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmBlockLocationProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmBlockLocationProtocol.java
@@ -60,6 +60,6 @@ public interface ScmBlockLocationProtocol {
* @throws IOException if there is any failure.
*
*/
- List<DeleteBlockResult> deleteBlocks(Set<String> keys);
+ List<DeleteBlockResult> deleteBlocks(Set<String> keys) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f7b8a16/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
new file mode 100644
index 0000000..fa65299
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.scm.protocolPB;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolTranslator;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
+ .AllocateScmBlockRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
+ .AllocateScmBlockResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
+ .DeleteScmBlocksRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
+ .DeleteScmBlocksResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
+ .DeleteScmBlockResult;
+import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
+ .GetScmBlockLocationsRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
+ .GetScmBlockLocationsResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
+ .ScmLocatedBlockProto;
+import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class is the client-side translator to translate the requests made on
+ * the {@link ScmBlockLocationProtocol} interface to the RPC server
+ * implementing {@link ScmBlockLocationProtocolPB}.
+ */
+@InterfaceAudience.Private
+public final class ScmBlockLocationProtocolClientSideTranslatorPB
+ implements ScmBlockLocationProtocol, ProtocolTranslator, Closeable {
+
+ /**
+ * RpcController is not used and hence is set to null.
+ */
+ private static final RpcController NULL_RPC_CONTROLLER = null;
+
+ private final ScmBlockLocationProtocolPB rpcProxy;
+
+ /**
+ * Creates a new StorageContainerLocationProtocolClientSideTranslatorPB.
+ *
+ * @param rpcProxy {@link StorageContainerLocationProtocolPB} RPC proxy
+ */
+ public ScmBlockLocationProtocolClientSideTranslatorPB(
+ ScmBlockLocationProtocolPB rpcProxy) {
+ this.rpcProxy = rpcProxy;
+ }
+
+ /**
+ * Find the set of nodes to read/write a block, as
+ * identified by the block key. This method supports batch lookup by
+ * passing multiple keys.
+ *
+ * @param keys batch of block keys to find
+ * @return allocated blocks for each block key
+ * @throws IOException if there is any failure
+ */
+ @Override
+ public Set<AllocatedBlock> getBlockLocations(Set<String> keys)
+ throws IOException {
+ GetScmBlockLocationsRequestProto.Builder req =
+ GetScmBlockLocationsRequestProto.newBuilder();
+ for (String key : keys) {
+ req.addKeys(key);
+ }
+ final GetScmBlockLocationsResponseProto resp;
+ try {
+ resp = rpcProxy.getScmBlockLocations(NULL_RPC_CONTROLLER,
+ req.build());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ Set<AllocatedBlock> locatedBlocks =
+ Sets.newLinkedHashSetWithExpectedSize(resp.getLocatedBlocksCount());
+ for (ScmLocatedBlockProto locatedBlock : resp.getLocatedBlocksList()) {
+ locatedBlocks.add(new AllocatedBlock.Builder()
+ .setKey(locatedBlock.getKey())
+ .setPipeline(Pipeline.getFromProtoBuf(locatedBlock.getPipeline()))
+ .build());
+ }
+ return locatedBlocks;
+ }
+
+ /**
+ * Asks SCM where a block should be allocated. SCM responds with the
+ * set of datanodes that should be used creating this block.
+ * @param size - size of the block.
+ * @return allocated block accessing info (key, pipeline).
+ * @throws IOException
+ */
+ @Override
+ public AllocatedBlock allocateBlock(long size) throws IOException {
+ Preconditions.checkArgument(size > 0,
+ "block size must be greater than 0");
+
+ AllocateScmBlockRequestProto request = AllocateScmBlockRequestProto
+ .newBuilder().setSize(size).build();
+ final AllocateScmBlockResponseProto response;
+ try {
+ response = rpcProxy.allocateScmBlock(NULL_RPC_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ if (response.getErrorCode() !=
+ AllocateScmBlockResponseProto.Error.success) {
+ throw new IOException(response.hasErrorMessage() ?
+ response.getErrorMessage() : "Allocate block failed.");
+ }
+ AllocatedBlock.Builder builder = new AllocatedBlock.Builder()
+ .setKey(response.getKey())
+ .setPipeline(Pipeline.getFromProtoBuf(response.getPipeline()))
+ .setShouldCreateContainer(response.getCreateContainer());
+ return builder.build();
+ }
+
+ /**
+ * Delete the set of keys specified.
+ *
+ * @param keys batch of block keys to delete.
+ * @return list of block deletion results.
+ * @throws IOException if there is any failure.
+ *
+ */
+ @Override
+ public List<DeleteBlockResult> deleteBlocks(Set<String> keys)
+ throws IOException {
+ Preconditions.checkArgument(keys == null || keys.isEmpty(),
+ "keys to be deleted cannot be null or empty");
+ DeleteScmBlocksRequestProto request = DeleteScmBlocksRequestProto
+ .newBuilder()
+ .addAllKeys(keys)
+ .build();
+ final DeleteScmBlocksResponseProto resp;
+ try {
+ resp = rpcProxy.deleteScmBlocks(NULL_RPC_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ List<DeleteBlockResult> results = new ArrayList(resp.getResultsCount());
+ for (DeleteScmBlockResult result : resp.getResultsList()) {
+ results.add(new DeleteBlockResult(result.getKey(), result.getResult()));
+ }
+ return results;
+ }
+
+ @Override
+ public Object getUnderlyingProxyObject() {
+ return rpcProxy;
+ }
+
+ @Override
+ public void close() {
+ RPC.stopProxy(rpcProxy);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f7b8a16/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolPB.java
new file mode 100644
index 0000000..256e735
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolPB.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.scm.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
+ .ScmBlockLocationProtocolService;
+
+/**
+ * Protocol used from an HDFS node to StorageContainerManager. This extends the
+ * Protocol Buffers service interface to add Hadoop-specific annotations.
+ */
+@ProtocolInfo(protocolName =
+ "org.apache.hadoop.ozone.protocol.ScmBlockLocationProtocol",
+ protocolVersion = 1)
+@InterfaceAudience.Private
+public interface ScmBlockLocationProtocolPB
+ extends ScmBlockLocationProtocolService.BlockingInterface {
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f7b8a16/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ScmBlockLocationProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ScmBlockLocationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ScmBlockLocationProtocol.proto
new file mode 100644
index 0000000..e9c3abf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ScmBlockLocationProtocol.proto
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are private and unstable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *unstable* .proto interface.
+ */
+
+option java_package = "org.apache.hadoop.ozone.protocol.proto";
+option java_outer_classname = "ScmBlockLocationProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.hdfs;
+
+import "hdfs.proto";
+import "Ozone.proto";
+
+
+// SCM Block protocol
+/**
+ * keys - batch of block keys to find
+ */
+message GetScmBlockLocationsRequestProto {
+ repeated string keys = 1;
+}
+
+/**
+ * locatedBlocks - for each requested hash, nodes that currently host the
+ * container for that object key hash
+ */
+message GetScmBlockLocationsResponseProto {
+ repeated ScmLocatedBlockProto locatedBlocks = 1;
+}
+
+/**
+ * Holds the nodes that currently host the blocks for a key.
+ */
+message ScmLocatedBlockProto {
+ required string key = 1;
+ required hadoop.hdfs.ozone.Pipeline pipeline = 2;
+}
+
+/**
+* Request send to SCM asking allocate block of specified size.
+*/
+message AllocateScmBlockRequestProto {
+ required uint64 size = 1;
+}
+
+/**
+ * keys - batch of block keys to deleted
+ */
+message DeleteScmBlocksRequestProto {
+ repeated string keys = 1;
+}
+
+/**
+ * deletedKeys - keys that are deleted successfully
+ */
+message DeleteScmBlocksResponseProto {
+ repeated DeleteScmBlockResult results = 1;
+}
+
+message DeleteScmBlockResult {
+ enum Result {
+ success = 1;
+ chillMode = 2;
+ errorNotFound = 3;
+ unknownFailure = 4;
+ }
+ required Result result = 1;
+ required string key = 2;
+}
+
+/**
+ * Reply from SCM indicating that the container.
+ */
+message AllocateScmBlockResponseProto {
+ enum Error {
+ success = 1;
+ errorNotEnoughSpace = 2;
+ errorSizeTooBig = 3;
+ unknownFailure = 4;
+ }
+ required Error errorCode = 1;
+ required string key = 2;
+ required hadoop.hdfs.ozone.Pipeline pipeline = 3;
+ required bool createContainer = 4;
+ optional string errorMessage = 5;
+}
+
+/**
+ * Protocol used from KeySpaceManager to StorageContainerManager.
+ * See request and response messages for details of the RPC calls.
+ */
+service ScmBlockLocationProtocolService {
+
+ /**
+ * Find the set of nodes that currently host the block, as
+ * identified by the key. This method supports batch lookup by
+ * passing multiple keys.
+ */
+ rpc getScmBlockLocations(GetScmBlockLocationsRequestProto)
+ returns (GetScmBlockLocationsResponseProto);
+
+ /**
+ * Creates a block entry in SCM.
+ */
+ rpc allocateScmBlock(AllocateScmBlockRequestProto)
+ returns (AllocateScmBlockResponseProto);
+
+ /**
+ * Deletes one or multiple block keys from SCM.
+ */
+ rpc deleteScmBlocks(DeleteScmBlocksRequestProto)
+ returns (DeleteScmBlocksResponseProto);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f7b8a16/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
index 147c432..a6f64a1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
@@ -100,78 +100,6 @@ message DeleteContainerResponseProto {
// Empty response
}
-// SCM Block protocol
-/**
- * keys - batch of block keys to find
- */
-message GetScmBlockLocationsRequestProto {
- repeated string keys = 1;
-}
-
-/**
- * locatedBlocks - for each requested hash, nodes that currently host the
- * container for that object key hash
- */
-message GetScmBlockLocationsResponseProto {
- repeated ScmLocatedBlockProto locatedBlocks = 1;
-}
-
-/**
- * Holds the nodes that currently host the blocks for a key.
- */
-message ScmLocatedBlockProto {
- required string key = 1;
- required hadoop.hdfs.ozone.Pipeline pipeline = 2;
-}
-
-/**
-* Request send to SCM asking allocate block of specified size.
-*/
-message AllocateScmBlockRequestProto {
- required uint64 size = 1;
-}
-
-/**
- * keys - batch of block keys to deleted
- */
-message DeleteScmBlocksRequestProto {
- repeated string keys = 1;
-}
-
-/**
- * deletedKeys - keys that are deleted successfully
- */
-message DeleteScmBlocksResponseProto {
- repeated DeleteScmBlockResult results = 1;
-}
-
-message DeleteScmBlockResult {
- enum Result {
- success = 1;
- chillMode = 2;
- errorNotFound = 3;
- unknownFailure = 4;
- }
- required Result result = 1;
- required string key = 2;
-}
-
-/**
- * Reply from SCM indicating that the container.
- */
-message AllocateScmBlockResponseProto {
- enum Error {
- success = 1;
- errorNotEnoughSpace = 2;
- errorSizeTooBig = 3;
- unknownFailure = 4;
- }
- required Error errorCode = 1;
- required string key = 2;
- required hadoop.hdfs.ozone.Pipeline pipeline = 3;
- required bool createContainer = 4;
- optional string errorMessage = 5;
-}
/**
* Protocol used from an HDFS node to StorageContainerManager. See the request
@@ -200,24 +128,4 @@ service StorageContainerLocationProtocolService {
* Deletes a container in SCM.
*/
rpc deleteContainer(DeleteContainerRequestProto) returns (DeleteContainerResponseProto);
-
- /**
- * Find the set of nodes that currently host the block, as
- * identified by the key. This method supports batch lookup by
- * passing multiple keys.
- */
- rpc getScmBlockLocations(GetScmBlockLocationsRequestProto)
- returns (GetScmBlockLocationsResponseProto);
-
- /**
- * Creates a block entry in SCM.
- */
- rpc allocateScmBlock(AllocateScmBlockRequestProto)
- returns (AllocateScmBlockResponseProto);
-
- /**
- * Deletes one or multiple block keys from SCM.
- */
- rpc deleteScmBlocks(DeleteScmBlocksRequestProto)
- returns (DeleteScmBlocksResponseProto);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f7b8a16/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
index 0f1264b..a5d11c9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.ksm.protocolPB
import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +71,8 @@ public final class ObjectStoreHandler implements Closeable {
keySpaceManagerClient;
private final StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
+ private final ScmBlockLocationProtocolClientSideTranslatorPB
+ scmBlockLocationClient;
private final StorageHandler storageHandler;
/**
@@ -91,6 +95,7 @@ public final class ObjectStoreHandler implements Closeable {
ProtobufRpcEngine.class);
long scmVersion =
RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
+
InetSocketAddress scmAddress =
OzoneClientUtils.getScmAddressForClients(conf);
this.storageContainerLocationClient =
@@ -99,6 +104,16 @@ public final class ObjectStoreHandler implements Closeable {
scmAddress, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)));
+
+ InetSocketAddress scmBlockAddress =
+ OzoneClientUtils.getScmAddressForBlockClients(conf);
+ this.scmBlockLocationClient =
+ new ScmBlockLocationProtocolClientSideTranslatorPB(
+ RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion,
+ scmBlockAddress, UserGroupInformation.getCurrentUser(), conf,
+ NetUtils.getDefaultSocketFactory(conf),
+ Client.getRpcTimeout(conf)));
+
long ksmVersion =
RPC.getProtocolVersion(KeySpaceManagerProtocolPB.class);
InetSocketAddress ksmAddress = OzoneClientUtils.getKsmAddress(conf);
@@ -116,6 +131,7 @@ public final class ObjectStoreHandler implements Closeable {
if (OzoneConsts.OZONE_HANDLER_LOCAL.equalsIgnoreCase(shType)) {
storageHandler = new LocalStorageHandler(conf);
this.storageContainerLocationClient = null;
+ this.scmBlockLocationClient = null;
this.keySpaceManagerClient = null;
} else {
throw new IllegalArgumentException(
@@ -162,5 +178,8 @@ public final class ObjectStoreHandler implements Closeable {
if (this.storageContainerLocationClient != null) {
this.storageContainerLocationClient.close();
}
+ if (this.scmBlockLocationClient != null) {
+ this.scmBlockLocationClient.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f7b8a16/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java
index 9476ef5..d20fcb8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java
@@ -152,6 +152,33 @@ public final class OzoneClientUtils {
}
/**
+ * Retrieve the socket address that should be used by clients to connect
+ * to the SCM for block service.
+ *
+ * @param conf
+ * @return Target InetSocketAddress for the SCM block client endpoint.
+ */
+ public static InetSocketAddress getScmAddressForBlockClients(
+ Configuration conf) {
+ final Optional<String> host = getHostNameFromConfigKeys(conf,
+ ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY);
+
+ if (!host.isPresent()) {
+ throw new IllegalArgumentException(
+ ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY +
+ " must be defined. See" +
+ " https://wiki.apache.org/hadoop/Ozone#Configuration for details" +
+ " on configuring Ozone.");
+ }
+
+ final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
+ ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY);
+
+ return NetUtils.createSocketAddr(host.get() + ":" +
+ port.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT));
+ }
+
+ /**
* Retrieve the socket address that should be used by DataNodes to connect
* to the SCM.
*
@@ -208,6 +235,26 @@ public final class OzoneClientUtils {
}
/**
+ * Retrieve the socket address that should be used by clients to connect
+ * to the SCM Block service.
+ *
+ * @param conf
+ * @return Target InetSocketAddress for the SCM block client endpoint.
+ */
+ public static InetSocketAddress getScmBlockClientBindAddress(
+ Configuration conf) {
+ final Optional<String> host = getHostNameFromConfigKeys(conf,
+ ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_KEY);
+
+ final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
+ ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY);
+
+ return NetUtils.createSocketAddr(
+ host.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_DEFAULT) +
+ ":" + port.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT));
+ }
+
+ /**
* Retrieve the socket address that should be used by DataNodes to connect
* to the SCM.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f7b8a16/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
new file mode 100644
index 0000000..e8843fc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocolPB;
+
+import com.google.common.collect.Sets;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
+import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
+import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
+import org.apache.hadoop.ozone.protocol.proto
+ .ScmBlockLocationProtocolProtos.AllocateScmBlockRequestProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .ScmBlockLocationProtocolProtos.DeleteScmBlocksRequestProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .ScmBlockLocationProtocolProtos.DeleteScmBlocksResponseProto;
+import static org.apache.hadoop.ozone.protocol.proto
+ .ScmBlockLocationProtocolProtos.DeleteScmBlockResult;
+import org.apache.hadoop.ozone.protocol.proto
+ .ScmBlockLocationProtocolProtos.GetScmBlockLocationsRequestProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .ScmBlockLocationProtocolProtos.GetScmBlockLocationsResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .ScmBlockLocationProtocolProtos.ScmLocatedBlockProto;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class is the server-side translator that forwards requests received on
+ * {@link StorageContainerLocationProtocolPB} to the
+ * {@link StorageContainerLocationProtocol} server implementation.
+ */
+@InterfaceAudience.Private
+public final class ScmBlockLocationProtocolServerSideTranslatorPB
+ implements ScmBlockLocationProtocolPB {
+
+ private final ScmBlockLocationProtocol impl;
+
+ /**
+ * Creates a new ScmBlockLocationProtocolServerSideTranslatorPB.
+ *
+ * @param impl {@link ScmBlockLocationProtocol} server implementation
+ */
+ public ScmBlockLocationProtocolServerSideTranslatorPB(
+ ScmBlockLocationProtocol impl) throws IOException {
+ this.impl = impl;
+ }
+
+
+ @Override
+ public GetScmBlockLocationsResponseProto getScmBlockLocations(
+ RpcController controller, GetScmBlockLocationsRequestProto req)
+ throws ServiceException {
+ Set<String> keys = Sets.newLinkedHashSetWithExpectedSize(
+ req.getKeysCount());
+ for (String key : req.getKeysList()) {
+ keys.add(key);
+ }
+ final Set<AllocatedBlock> blocks;
+ try {
+ blocks = impl.getBlockLocations(keys);
+ } catch (IOException ex) {
+ throw new ServiceException(ex);
+ }
+ GetScmBlockLocationsResponseProto.Builder resp =
+ GetScmBlockLocationsResponseProto.newBuilder();
+ for (AllocatedBlock block: blocks) {
+ ScmLocatedBlockProto.Builder locatedBlock =
+ ScmLocatedBlockProto.newBuilder()
+ .setKey(block.getKey())
+ .setPipeline(block.getPipeline().getProtobufMessage());
+ resp.addLocatedBlocks(locatedBlock.build());
+ }
+ return resp.build();
+ }
+
+ @Override
+ public AllocateScmBlockResponseProto allocateScmBlock(
+ RpcController controller, AllocateScmBlockRequestProto request)
+ throws ServiceException {
+ try {
+ AllocatedBlock allocatedBlock =
+ impl.allocateBlock(request.getSize());
+ if (allocatedBlock != null) {
+ return
+ AllocateScmBlockResponseProto.newBuilder()
+ .setKey(allocatedBlock.getKey())
+ .setPipeline(allocatedBlock.getPipeline().getProtobufMessage())
+ .setCreateContainer(allocatedBlock.getCreateContainer())
+ .setErrorCode(AllocateScmBlockResponseProto.Error.success)
+ .build();
+ } else {
+ return AllocateScmBlockResponseProto.newBuilder()
+ .setErrorCode(AllocateScmBlockResponseProto.Error.unknownFailure)
+ .build();
+ }
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public DeleteScmBlocksResponseProto deleteScmBlocks(
+ RpcController controller, DeleteScmBlocksRequestProto req)
+ throws ServiceException {
+ Set<String> keys = Sets.newLinkedHashSetWithExpectedSize(
+ req.getKeysCount());
+ for (String key : req.getKeysList()) {
+ keys.add(key);
+ }
+ DeleteScmBlocksResponseProto.Builder resp =
+ DeleteScmBlocksResponseProto.newBuilder();
+ try {
+ final List<DeleteBlockResult> results = impl.deleteBlocks(keys);
+ for (DeleteBlockResult result: results) {
+ DeleteScmBlockResult.Builder deleteResult = DeleteScmBlockResult
+ .newBuilder().setKey(result.getKey()).setResult(result.getResult());
+ resp.addResults(deleteResult.build());
+ }
+ } catch (IOException ex) {
+ throw new ServiceException(ex);
+ }
+ return resp.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f7b8a16/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 5ea37d4..d65802c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.protocolPB;
import java.io.IOException;
-import java.util.List;
import java.util.Set;
import com.google.common.collect.Sets;
@@ -28,10 +27,7 @@ import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
-import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
-import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
import org.apache.hadoop.scm.protocol.LocatedContainer;
-import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos
@@ -40,29 +36,12 @@ import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos
.GetStorageContainerLocationsResponseProto;
import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerLocationProtocolProtos
- .ScmLocatedBlockProto;
-import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.LocatedContainerProto;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.ContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.ContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerLocationProtocolProtos.AllocateScmBlockRequestProto;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerLocationProtocolProtos.AllocateScmBlockResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerLocationProtocolProtos.DeleteScmBlocksRequestProto;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerLocationProtocolProtos.DeleteScmBlocksResponseProto;
-import static org.apache.hadoop.ozone.protocol.proto
- .StorageContainerLocationProtocolProtos.DeleteScmBlockResult;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerLocationProtocolProtos.GetScmBlockLocationsRequestProto;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerLocationProtocolProtos.GetScmBlockLocationsResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.GetContainerResponseProto;
@@ -83,7 +62,6 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
implements StorageContainerLocationProtocolPB {
private final StorageContainerLocationProtocol impl;
- private final ScmBlockLocationProtocol blockImpl;
/**
* Creates a new StorageContainerLocationProtocolServerSideTranslatorPB.
@@ -91,10 +69,8 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
* @param impl {@link StorageContainerLocationProtocol} server implementation
*/
public StorageContainerLocationProtocolServerSideTranslatorPB(
- StorageContainerLocationProtocol impl,
- ScmBlockLocationProtocol blockImpl) throws IOException {
+ StorageContainerLocationProtocol impl) throws IOException {
this.impl = impl;
- this.blockImpl = blockImpl;
}
@Override
@@ -170,77 +146,4 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
throw new ServiceException(e);
}
}
-
- @Override
- public GetScmBlockLocationsResponseProto getScmBlockLocations(
- RpcController controller, GetScmBlockLocationsRequestProto req)
- throws ServiceException {
- Set<String> keys = Sets.newLinkedHashSetWithExpectedSize(
- req.getKeysCount());
- for (String key : req.getKeysList()) {
- keys.add(key);
- }
- final Set<AllocatedBlock> blocks;
- try {
- blocks = blockImpl.getBlockLocations(keys);
- } catch (IOException ex) {
- throw new ServiceException(ex);
- }
- GetScmBlockLocationsResponseProto.Builder resp =
- GetScmBlockLocationsResponseProto.newBuilder();
- for (AllocatedBlock block: blocks) {
- ScmLocatedBlockProto.Builder locatedBlock =
- ScmLocatedBlockProto.newBuilder()
- .setKey(block.getKey())
- .setPipeline(block.getPipeline().getProtobufMessage());
- resp.addLocatedBlocks(locatedBlock.build());
- }
- return resp.build();
- }
-
- @Override
- public AllocateScmBlockResponseProto allocateScmBlock(
- RpcController controller, AllocateScmBlockRequestProto request)
- throws ServiceException {
- try {
- AllocatedBlock allocatedBlock =
- blockImpl.allocateBlock(request.getSize());
- if (allocatedBlock != null) {
- return
- AllocateScmBlockResponseProto.newBuilder()
- .setKey(allocatedBlock.getKey())
- .setPipeline(allocatedBlock.getPipeline().getProtobufMessage())
- .setCreateContainer(allocatedBlock.getCreateContainer())
- .setErrorCode(AllocateScmBlockResponseProto.Error.success)
- .build();
- } else {
- return
- AllocateScmBlockResponseProto.newBuilder()
- .setErrorCode(AllocateScmBlockResponseProto.Error.unknownFailure)
- .build();
- }
- } catch (IOException e) {
- throw new ServiceException(e);
- }
- }
-
- @Override
- public DeleteScmBlocksResponseProto deleteScmBlocks(
- RpcController controller, DeleteScmBlocksRequestProto req)
- throws ServiceException {
- Set<String> keys = Sets.newLinkedHashSetWithExpectedSize(
- req.getKeysCount());
- for (String key : req.getKeysList()) {
- keys.add(key);
- }
- final List<DeleteBlockResult> results = blockImpl.deleteBlocks(keys);
- DeleteScmBlocksResponseProto.Builder resp =
- DeleteScmBlocksResponseProto.newBuilder();
- for (DeleteBlockResult result: results) {
- DeleteScmBlockResult.Builder deleteResult = DeleteScmBlockResult
- .newBuilder().setKey(result.getKey()).setResult(result.getResult());
- resp.addResults(deleteResult.build());
- }
- return resp.build();
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f7b8a16/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
index c12b556..38d8e55 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
@@ -30,6 +30,9 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos;
+import org.apache.hadoop.ozone.protocolPB
+ .ScmBlockLocationProtocolServerSideTranslatorPB;
import org.apache.hadoop.ozone.scm.block.BlockManager;
import org.apache.hadoop.ozone.scm.block.BlockManagerImpl;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
@@ -73,6 +76,7 @@ import org.apache.hadoop.ozone.protocol.proto
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
import org.apache.hadoop.ozone.protocolPB
.StorageContainerDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.ozone.protocolPB
.StorageContainerLocationProtocolServerSideTranslatorPB;
@@ -97,9 +101,11 @@ import java.util.Set;
import java.util.UUID;
import static org.apache.hadoop.ozone.protocol.proto
- .StorageContainerLocationProtocolProtos.DeleteScmBlockResult.Result;
+ .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
import static org.apache.hadoop.scm.ScmConfigKeys
+ .OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY;
+import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_CLIENT_ADDRESS_KEY;
import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_DATANODE_ADDRESS_KEY;
@@ -146,6 +152,10 @@ public class StorageContainerManager
private final RPC.Server clientRpcServer;
private final InetSocketAddress clientRpcAddress;
+ /** The RPC server that listens to requests from block service clients. */
+ private final RPC.Server blockRpcServer;
+ private final InetSocketAddress blockRpcAddress;
+
/** SCM mxbean. */
private ObjectName scmInfoBeanName;
@@ -173,6 +183,8 @@ public class StorageContainerManager
ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
ProtobufRpcEngine.class);
+ RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class,
+ ProtobufRpcEngine.class);
BlockingService dnProtoPbService = StorageContainerDatanodeProtocolProtos.
StorageContainerDatanodeProtocolService.newReflectiveBlockingService(
@@ -186,12 +198,12 @@ public class StorageContainerManager
datanodeRpcAddress = updateListenAddress(conf,
OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer);
+ // SCM Container Service RPC
BlockingService storageProtoPbService =
StorageContainerLocationProtocolProtos
.StorageContainerLocationProtocolService
.newReflectiveBlockingService(
- new StorageContainerLocationProtocolServerSideTranslatorPB(
- this, this));
+ new StorageContainerLocationProtocolServerSideTranslatorPB(this));
final InetSocketAddress scmAddress =
OzoneClientUtils.getScmClientBindAddress(conf);
@@ -201,6 +213,22 @@ public class StorageContainerManager
clientRpcAddress = updateListenAddress(conf,
OZONE_SCM_CLIENT_ADDRESS_KEY, scmAddress, clientRpcServer);
+
+ // SCM Block Service RPC
+ BlockingService blockProtoPbService =
+ ScmBlockLocationProtocolProtos
+ .ScmBlockLocationProtocolService
+ .newReflectiveBlockingService(
+ new ScmBlockLocationProtocolServerSideTranslatorPB(this));
+
+ final InetSocketAddress scmBlockAddress =
+ OzoneClientUtils.getScmBlockClientBindAddress(conf);
+ blockRpcServer = startRpcServer(conf, scmBlockAddress,
+ ScmBlockLocationProtocolPB.class, blockProtoPbService,
+ handlerCount);
+ blockRpcAddress = updateListenAddress(conf,
+ OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, scmBlockAddress, blockRpcServer);
+
registerMXBean();
}
@@ -447,6 +475,9 @@ public class StorageContainerManager
LOG.info(buildRpcServerStartMessage(
"StorageContainerLocationProtocol RPC server", clientRpcAddress));
clientRpcServer.start();
+ LOG.info(buildRpcServerStartMessage(
+ "ScmBlockLocationProtocol RPC server", blockRpcAddress));
+ blockRpcServer.start();
LOG.info(buildRpcServerStartMessage("RPC server for DataNodes",
datanodeRpcAddress));
datanodeRpcServer.start();
@@ -456,6 +487,8 @@ public class StorageContainerManager
* Stop service.
*/
public void stop() {
+ LOG.info("Stopping block service RPC server");
+ blockRpcServer.stop();
LOG.info("Stopping the StorageContainerLocationProtocol RPC server");
clientRpcServer.stop();
LOG.info("Stopping the RPC server for DataNodes");
@@ -470,6 +503,7 @@ public class StorageContainerManager
*/
public void join() {
try {
+ blockRpcServer.join();
clientRpcServer.join();
datanodeRpcServer.join();
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f7b8a16/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 9caec2a..173b911 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -351,6 +351,7 @@ public final class MiniOzoneCluster extends MiniDFSCluster
configScmMetadata();
conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
+ conf.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
conf.set(KSMConfigKeys.OZONE_KSM_ADDRESS_KEY, "127.0.0.1:0");
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org