You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tm...@apache.org on 2018/08/11 05:37:34 UTC
[30/50] [abbrv] hadoop git commit: HDDS-339. Add block length and
blockId in PutKeyResponse. Contributed by Shashikant Banerjee.
HDDS-339. Add block length and blockId in PutKeyResponse. Contributed by Shashikant Banerjee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/398d8955
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/398d8955
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/398d8955
Branch: refs/heads/HADOOP-15407
Commit: 398d89554398a38ffa1347524286cd437f94f3ae
Parents: 15241c6
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Fri Aug 10 23:45:56 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Fri Aug 10 23:45:56 2018 +0530
----------------------------------------------------------------------
.../main/proto/DatanodeContainerProtocol.proto | 1 +
.../container/keyvalue/KeyValueHandler.java | 18 +-
.../container/keyvalue/helpers/KeyUtils.java | 50 +++-
.../container/keyvalue/impl/KeyManagerImpl.java | 4 +-
.../keyvalue/interfaces/KeyManager.java | 3 +-
.../ozone/scm/TestCommittedBlockLengthAPI.java | 216 ----------------
.../TestGetCommittedBlockLengthAndPutKey.java | 254 +++++++++++++++++++
7 files changed, 313 insertions(+), 233 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index af06346..930f314 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -308,6 +308,7 @@ message PutKeyRequestProto {
}
message PutKeyResponseProto {
+ required GetCommittedBlockLengthResponseProto committedBlockLength = 1;
}
message GetKeyRequestProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index f4699dd..8364a77 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -421,6 +421,7 @@ public class KeyValueHandler extends Handler {
ContainerCommandResponseProto handlePutKey(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+ long blockLength;
if (!request.hasPutKey()) {
LOG.debug("Malformed Put Key request. trace ID: {}",
request.getTraceID());
@@ -433,7 +434,7 @@ public class KeyValueHandler extends Handler {
KeyData keyData = KeyData.getFromProtoBuf(
request.getPutKey().getKeyData());
long numBytes = keyData.getProtoBufMessage().toByteArray().length;
- commitKey(keyData, kvContainer);
+ blockLength = commitKey(keyData, kvContainer);
metrics.incContainerBytesStats(Type.PutKey, numBytes);
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
@@ -443,7 +444,7 @@ public class KeyValueHandler extends Handler {
request);
}
- return KeyUtils.getKeyResponseSuccess(request);
+ return KeyUtils.putKeyResponseSuccess(request, blockLength);
}
private void commitPendingKeys(KeyValueContainer kvContainer)
@@ -456,12 +457,13 @@ public class KeyValueHandler extends Handler {
}
}
- private void commitKey(KeyData keyData, KeyValueContainer kvContainer)
+ private long commitKey(KeyData keyData, KeyValueContainer kvContainer)
throws IOException {
Preconditions.checkNotNull(keyData);
- keyManager.putKey(kvContainer, keyData);
+ long length = keyManager.putKey(kvContainer, keyData);
//update the open key Map in containerManager
this.openContainerBlockMap.removeFromKeyMap(keyData.getBlockID());
+ return length;
}
/**
* Handle Get Key operation. Calls KeyManager to process the request.
@@ -662,8 +664,12 @@ public class KeyValueHandler extends Handler {
request.getWriteChunk().getStage() == Stage.COMBINED) {
metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk()
.getChunkData().getLen());
- // the openContainerBlockMap should be updated only while writing data
- // not during COMMIT_STAGE of handling write chunk request.
+ }
+
+ if (request.getWriteChunk().getStage() == Stage.COMMIT_DATA
+ || request.getWriteChunk().getStage() == Stage.COMBINED) {
+ // the openContainerBlockMap should be updated only during
+ // COMMIT_STAGE of handling write chunk request.
openContainerBlockMap.addChunk(blockID, chunkInfoProto);
}
} catch (StorageContainerException ex) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
index 2be966d..a83d298 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
@@ -27,6 +27,10 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.GetKeyResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
+ GetCommittedBlockLengthResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
+ PutKeyResponseProto;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
@@ -123,6 +127,26 @@ public final class KeyUtils {
}
/**
+ * Returns putKey response success.
+ * @param msg - Request.
+ * @return Response.
+ */
+ public static ContainerCommandResponseProto putKeyResponseSuccess(
+ ContainerCommandRequestProto msg, long blockLength) {
+ GetCommittedBlockLengthResponseProto.Builder
+ committedBlockLengthResponseBuilder =
+ getCommittedBlockLengthResponseBuilder(blockLength,
+ msg.getPutKey().getKeyData().getBlockID());
+ PutKeyResponseProto.Builder putKeyResponse =
+ PutKeyResponseProto.newBuilder();
+ putKeyResponse
+ .setCommittedBlockLength(committedBlockLengthResponseBuilder);
+ ContainerProtos.ContainerCommandResponseProto.Builder builder =
+ ContainerUtils.getSuccessResponseBuilder(msg);
+ builder.setPutKey(putKeyResponse);
+ return builder.build();
+ }
+ /**
* Returns successful keyResponse.
* @param msg - Request.
* @return Response.
@@ -150,18 +174,26 @@ public final class KeyUtils {
* @param msg - Request.
* @return Response.
*/
- public static ContainerProtos.ContainerCommandResponseProto
- getBlockLengthResponse(ContainerProtos.
- ContainerCommandRequestProto msg, long blockLength) {
+ public static ContainerCommandResponseProto getBlockLengthResponse(
+ ContainerCommandRequestProto msg, long blockLength) {
+ GetCommittedBlockLengthResponseProto.Builder
+ committedBlockLengthResponseBuilder =
+ getCommittedBlockLengthResponseBuilder(blockLength,
+ msg.getGetCommittedBlockLength().getBlockID());
+ ContainerProtos.ContainerCommandResponseProto.Builder builder =
+ ContainerUtils.getSuccessResponseBuilder(msg);
+ builder.setGetCommittedBlockLength(committedBlockLengthResponseBuilder);
+ return builder.build();
+ }
+
+ private static GetCommittedBlockLengthResponseProto.Builder
+ getCommittedBlockLengthResponseBuilder(
+ long blockLength, ContainerProtos.DatanodeBlockID blockID) {
ContainerProtos.GetCommittedBlockLengthResponseProto.Builder
getCommittedBlockLengthResponseBuilder = ContainerProtos.
GetCommittedBlockLengthResponseProto.newBuilder();
getCommittedBlockLengthResponseBuilder.setBlockLength(blockLength);
- getCommittedBlockLengthResponseBuilder
- .setBlockID(msg.getGetCommittedBlockLength().getBlockID());
- ContainerProtos.ContainerCommandResponseProto.Builder builder =
- ContainerUtils.getSuccessResponseBuilder(msg);
- builder.setGetCommittedBlockLength(getCommittedBlockLengthResponseBuilder);
- return builder.build();
+ getCommittedBlockLengthResponseBuilder.setBlockID(blockID);
+ return getCommittedBlockLengthResponseBuilder;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java
index 58bf1f8..6370f8e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java
@@ -67,9 +67,10 @@ public class KeyManagerImpl implements KeyManager {
*
* @param container - Container for which key need to be added.
* @param data - Key Data.
+ * @return length of the key.
* @throws IOException
*/
- public void putKey(Container container, KeyData data) throws IOException {
+ public long putKey(Container container, KeyData data) throws IOException {
Preconditions.checkNotNull(data, "KeyData cannot be null for put " +
"operation.");
Preconditions.checkState(data.getContainerID() >= 0, "Container Id " +
@@ -87,6 +88,7 @@ public class KeyManagerImpl implements KeyManager {
// Increment keycount here
container.getContainerData().incrKeyCount();
+ return data.getSize();
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java
index dad688e..37871be 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java
@@ -35,9 +35,10 @@ public interface KeyManager {
*
* @param container - Container for which key need to be added.
* @param data - Key Data.
+ * @return length of the Key.
* @throws IOException
*/
- void putKey(Container container, KeyData data) throws IOException;
+ long putKey(Container container, KeyData data) throws IOException;
/**
* Gets an existing key.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCommittedBlockLengthAPI.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCommittedBlockLengthAPI.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCommittedBlockLengthAPI.java
deleted file mode 100644
index 3c6479f..0000000
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCommittedBlockLengthAPI.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.scm;
-
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.commons.lang3.RandomUtils;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.hadoop.hdds.scm.container.common.helpers.
- ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.
- StorageContainerException;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.
- ContainerPlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.
- SCMContainerPlacementCapacity;
-import org.apache.hadoop.hdds.scm.protocolPB.
- StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-
-import java.util.UUID;
-
-/**
- * Test Container calls.
- */
-public class TestCommittedBlockLengthAPI {
-
- private static MiniOzoneCluster cluster;
- private static OzoneConfiguration ozoneConfig;
- private static StorageContainerLocationProtocolClientSideTranslatorPB
- storageContainerLocationClient;
- private static XceiverClientManager xceiverClientManager;
- private static String containerOwner = "OZONE";
-
- @BeforeClass
- public static void init() throws Exception {
- ozoneConfig = new OzoneConfiguration();
- ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
- SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
- cluster =
- MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build();
- cluster.waitForClusterToBeReady();
- storageContainerLocationClient =
- cluster.getStorageContainerLocationClient();
- xceiverClientManager = new XceiverClientManager(ozoneConfig);
- }
-
- @AfterClass
- public static void shutdown() throws InterruptedException {
- if (cluster != null) {
- cluster.shutdown();
- }
- IOUtils.cleanupWithLogger(null, storageContainerLocationClient);
- }
-
- @Test
- public void tesGetCommittedBlockLength() throws Exception {
- ContainerProtos.GetCommittedBlockLengthResponseProto response;
- String traceID = UUID.randomUUID().toString();
- ContainerWithPipeline container = storageContainerLocationClient
- .allocateContainer(xceiverClientManager.getType(),
- HddsProtos.ReplicationFactor.ONE, containerOwner);
- long containerID = container.getContainerInfo().getContainerID();
- Pipeline pipeline = container.getPipeline();
- XceiverClientSpi client =
- xceiverClientManager.acquireClient(pipeline, containerID);
- //create the container
- ContainerProtocolCalls.createContainer(client, containerID, traceID);
-
- BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
- byte[] data =
- RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes();
- ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
- ContainerTestHelper
- .getWriteChunkRequest(container.getPipeline(), blockID,
- data.length);
- client.sendCommand(writeChunkRequest);
- // Now, explicitly make a putKey request for the block.
- ContainerProtos.ContainerCommandRequestProto putKeyRequest =
- ContainerTestHelper
- .getPutKeyRequest(pipeline, writeChunkRequest.getWriteChunk());
- client.sendCommand(putKeyRequest);
- response = ContainerProtocolCalls
- .getCommittedBlockLength(client, blockID, traceID);
- // make sure the block ids in the request and response are same.
- Assert.assertTrue(
- BlockID.getFromProtobuf(response.getBlockID()).equals(blockID));
- Assert.assertTrue(response.getBlockLength() == data.length);
- xceiverClientManager.releaseClient(client);
- }
-
- @Test
- public void tesGetCommittedBlockLengthWithClosedContainer()
- throws Exception {
- String traceID = UUID.randomUUID().toString();
- ContainerWithPipeline container = storageContainerLocationClient
- .allocateContainer(xceiverClientManager.getType(),
- HddsProtos.ReplicationFactor.ONE, containerOwner);
- long containerID = container.getContainerInfo().getContainerID();
- Pipeline pipeline = container.getPipeline();
- XceiverClientSpi client =
- xceiverClientManager.acquireClient(pipeline, containerID);
- // create the container
- ContainerProtocolCalls.createContainer(client, containerID, traceID);
-
- byte[] data =
- RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes();
- BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
- ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
- ContainerTestHelper
- .getWriteChunkRequest(container.getPipeline(), blockID,
- data.length);
- client.sendCommand(writeChunkRequest);
- // close the container
- ContainerProtocolCalls.closeContainer(client, containerID, traceID);
- ContainerProtos.GetCommittedBlockLengthResponseProto response =
- ContainerProtocolCalls
- .getCommittedBlockLength(client, blockID, traceID);
- // make sure the block ids in the request and response are same.
- // This will also ensure that closing the container committed the block
- // on the Datanodes.
- Assert.assertTrue(
- BlockID.getFromProtobuf(response.getBlockID()).equals(blockID));
- Assert.assertTrue(response.getBlockLength() == data.length);
- xceiverClientManager.releaseClient(client);
- }
-
- @Test
- public void tesGetCommittedBlockLengthForInvalidBlock() throws Exception {
- String traceID = UUID.randomUUID().toString();
- ContainerWithPipeline container = storageContainerLocationClient
- .allocateContainer(xceiverClientManager.getType(),
- HddsProtos.ReplicationFactor.ONE, containerOwner);
- long containerID = container.getContainerInfo().getContainerID();
- XceiverClientSpi client = xceiverClientManager
- .acquireClient(container.getPipeline(), containerID);
- ContainerProtocolCalls.createContainer(client, containerID, traceID);
-
- BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
- // move the container to closed state
- ContainerProtocolCalls.closeContainer(client, containerID, traceID);
- try {
- // There is no block written inside the container. The request should
- // fail.
- ContainerProtocolCalls.getCommittedBlockLength(client, blockID, traceID);
- Assert.fail("Expected exception not thrown");
- } catch (StorageContainerException sce) {
- Assert.assertTrue(sce.getMessage().contains("Unable to find the key"));
- }
- xceiverClientManager.releaseClient(client);
- }
-
- @Test
- public void testGetCommittedBlockLengthForOpenBlock() throws Exception {
- String traceID = UUID.randomUUID().toString();
- ContainerWithPipeline container = storageContainerLocationClient
- .allocateContainer(xceiverClientManager.getType(),
- HddsProtos.ReplicationFactor.ONE, containerOwner);
- long containerID = container.getContainerInfo().getContainerID();
- XceiverClientSpi client = xceiverClientManager
- .acquireClient(container.getPipeline(), containerID);
- ContainerProtocolCalls
- .createContainer(client, containerID, traceID);
-
- BlockID blockID =
- ContainerTestHelper.getTestBlockID(containerID);
- ContainerProtos.ContainerCommandRequestProto requestProto =
- ContainerTestHelper
- .getWriteChunkRequest(container.getPipeline(), blockID, 1024);
- client.sendCommand(requestProto);
- try {
- ContainerProtocolCalls.getCommittedBlockLength(client, blockID, traceID);
- Assert.fail("Expected Exception not thrown");
- } catch (StorageContainerException sce) {
- Assert.assertEquals(ContainerProtos.Result.BLOCK_NOT_COMMITTED,
- sce.getResult());
- }
- // now close the container, it should auto commit pending open blocks
- ContainerProtocolCalls
- .closeContainer(client, containerID, traceID);
- ContainerProtos.GetCommittedBlockLengthResponseProto response =
- ContainerProtocolCalls
- .getCommittedBlockLength(client, blockID, traceID);
- Assert.assertTrue(response.getBlockLength() == 1024);
- xceiverClientManager.releaseClient(client);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
new file mode 100644
index 0000000..f82b0d3
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.scm;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.
+ ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.
+ StorageContainerException;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.
+ ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.
+ SCMContainerPlacementCapacity;
+import org.apache.hadoop.hdds.scm.protocolPB.
+ StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+import java.util.UUID;
+
+/**
+ * Test Container calls.
+ */
+public class TestGetCommittedBlockLengthAndPutKey {
+
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration ozoneConfig;
+ private static StorageContainerLocationProtocolClientSideTranslatorPB
+ storageContainerLocationClient;
+ private static XceiverClientManager xceiverClientManager;
+ private static String containerOwner = "OZONE";
+
+ @BeforeClass
+ public static void init() throws Exception {
+ ozoneConfig = new OzoneConfiguration();
+ ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
+ SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+ cluster =
+ MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build();
+ cluster.waitForClusterToBeReady();
+ storageContainerLocationClient =
+ cluster.getStorageContainerLocationClient();
+ xceiverClientManager = new XceiverClientManager(ozoneConfig);
+ }
+
+ @AfterClass
+ public static void shutdown() throws InterruptedException {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ IOUtils.cleanupWithLogger(null, storageContainerLocationClient);
+ }
+
+ @Test
+ public void tesGetCommittedBlockLength() throws Exception {
+ ContainerProtos.GetCommittedBlockLengthResponseProto response;
+ String traceID = UUID.randomUUID().toString();
+ ContainerWithPipeline container = storageContainerLocationClient
+ .allocateContainer(xceiverClientManager.getType(),
+ HddsProtos.ReplicationFactor.ONE, containerOwner);
+ long containerID = container.getContainerInfo().getContainerID();
+ Pipeline pipeline = container.getPipeline();
+ XceiverClientSpi client =
+ xceiverClientManager.acquireClient(pipeline, containerID);
+ //create the container
+ ContainerProtocolCalls.createContainer(client, containerID, traceID);
+
+ BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
+ byte[] data =
+ RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes();
+ ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+ ContainerTestHelper
+ .getWriteChunkRequest(container.getPipeline(), blockID,
+ data.length);
+ client.sendCommand(writeChunkRequest);
+ // Now, explicitly make a putKey request for the block.
+ ContainerProtos.ContainerCommandRequestProto putKeyRequest =
+ ContainerTestHelper
+ .getPutKeyRequest(pipeline, writeChunkRequest.getWriteChunk());
+ client.sendCommand(putKeyRequest);
+ response = ContainerProtocolCalls
+ .getCommittedBlockLength(client, blockID, traceID);
+ // make sure the block ids in the request and response are same.
+ Assert.assertTrue(
+ BlockID.getFromProtobuf(response.getBlockID()).equals(blockID));
+ Assert.assertTrue(response.getBlockLength() == data.length);
+ xceiverClientManager.releaseClient(client);
+ }
+
+ @Test
+ public void tesGetCommittedBlockLengthWithClosedContainer()
+ throws Exception {
+ String traceID = UUID.randomUUID().toString();
+ ContainerWithPipeline container = storageContainerLocationClient
+ .allocateContainer(xceiverClientManager.getType(),
+ HddsProtos.ReplicationFactor.ONE, containerOwner);
+ long containerID = container.getContainerInfo().getContainerID();
+ Pipeline pipeline = container.getPipeline();
+ XceiverClientSpi client =
+ xceiverClientManager.acquireClient(pipeline, containerID);
+ // create the container
+ ContainerProtocolCalls.createContainer(client, containerID, traceID);
+
+ byte[] data =
+ RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes();
+ BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
+ ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+ ContainerTestHelper
+ .getWriteChunkRequest(container.getPipeline(), blockID,
+ data.length);
+ client.sendCommand(writeChunkRequest);
+ // close the container
+ ContainerProtocolCalls.closeContainer(client, containerID, traceID);
+ ContainerProtos.GetCommittedBlockLengthResponseProto response =
+ ContainerProtocolCalls
+ .getCommittedBlockLength(client, blockID, traceID);
+ // make sure the block ids in the request and response are same.
+ // This will also ensure that closing the container committed the block
+ // on the Datanodes.
+ Assert.assertTrue(
+ BlockID.getFromProtobuf(response.getBlockID()).equals(blockID));
+ Assert.assertTrue(response.getBlockLength() == data.length);
+ xceiverClientManager.releaseClient(client);
+ }
+
+ @Test
+ public void tesGetCommittedBlockLengthForInvalidBlock() throws Exception {
+ String traceID = UUID.randomUUID().toString();
+ ContainerWithPipeline container = storageContainerLocationClient
+ .allocateContainer(xceiverClientManager.getType(),
+ HddsProtos.ReplicationFactor.ONE, containerOwner);
+ long containerID = container.getContainerInfo().getContainerID();
+ XceiverClientSpi client = xceiverClientManager
+ .acquireClient(container.getPipeline(), containerID);
+ ContainerProtocolCalls.createContainer(client, containerID, traceID);
+
+ BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
+ // move the container to closed state
+ ContainerProtocolCalls.closeContainer(client, containerID, traceID);
+ try {
+ // There is no block written inside the container. The request should
+ // fail.
+ ContainerProtocolCalls.getCommittedBlockLength(client, blockID, traceID);
+ Assert.fail("Expected exception not thrown");
+ } catch (StorageContainerException sce) {
+ Assert.assertTrue(sce.getMessage().contains("Unable to find the key"));
+ }
+ xceiverClientManager.releaseClient(client);
+ }
+
+ @Test
+ public void testGetCommittedBlockLengthForOpenBlock() throws Exception {
+ String traceID = UUID.randomUUID().toString();
+ ContainerWithPipeline container = storageContainerLocationClient
+ .allocateContainer(xceiverClientManager.getType(),
+ HddsProtos.ReplicationFactor.ONE, containerOwner);
+ long containerID = container.getContainerInfo().getContainerID();
+ XceiverClientSpi client = xceiverClientManager
+ .acquireClient(container.getPipeline(), containerID);
+ ContainerProtocolCalls
+ .createContainer(client, containerID, traceID);
+
+ BlockID blockID =
+ ContainerTestHelper.getTestBlockID(containerID);
+ ContainerProtos.ContainerCommandRequestProto requestProto =
+ ContainerTestHelper
+ .getWriteChunkRequest(container.getPipeline(), blockID, 1024);
+ client.sendCommand(requestProto);
+ try {
+ ContainerProtocolCalls.getCommittedBlockLength(client, blockID, traceID);
+ Assert.fail("Expected Exception not thrown");
+ } catch (StorageContainerException sce) {
+ Assert.assertEquals(ContainerProtos.Result.BLOCK_NOT_COMMITTED,
+ sce.getResult());
+ }
+ // now close the container, it should auto commit pending open blocks
+ ContainerProtocolCalls
+ .closeContainer(client, containerID, traceID);
+ ContainerProtos.GetCommittedBlockLengthResponseProto response =
+ ContainerProtocolCalls
+ .getCommittedBlockLength(client, blockID, traceID);
+ Assert.assertTrue(response.getBlockLength() == 1024);
+ xceiverClientManager.releaseClient(client);
+ }
+
+ @Test
+ public void tesPutKeyResposne() throws Exception {
+ ContainerProtos.PutKeyResponseProto response;
+ String traceID = UUID.randomUUID().toString();
+ ContainerWithPipeline container = storageContainerLocationClient
+ .allocateContainer(xceiverClientManager.getType(),
+ HddsProtos.ReplicationFactor.ONE, containerOwner);
+ long containerID = container.getContainerInfo().getContainerID();
+ Pipeline pipeline = container.getPipeline();
+ XceiverClientSpi client =
+ xceiverClientManager.acquireClient(pipeline, containerID);
+ //create the container
+ ContainerProtocolCalls.createContainer(client, containerID, traceID);
+
+ BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
+ byte[] data =
+ RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes();
+ ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+ ContainerTestHelper
+ .getWriteChunkRequest(container.getPipeline(), blockID,
+ data.length);
+ client.sendCommand(writeChunkRequest);
+ // Now, explicitly make a putKey request for the block.
+ ContainerProtos.ContainerCommandRequestProto putKeyRequest =
+ ContainerTestHelper
+ .getPutKeyRequest(pipeline, writeChunkRequest.getWriteChunk());
+ response = client.sendCommand(putKeyRequest).getPutKey();
+ // make sure the block ids in the request and response are same.
+ // This will also ensure that closing the container committed the block
+ // on the Datanodes.
+ Assert.assertEquals(BlockID
+ .getFromProtobuf(response.getCommittedBlockLength().getBlockID()),
+ blockID);
+ Assert.assertEquals(
+ response.getCommittedBlockLength().getBlockLength(), data.length);
+ xceiverClientManager.releaseClient(client);
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org