You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tm...@apache.org on 2018/08/11 05:37:34 UTC

[30/50] [abbrv] hadoop git commit: HDDS-339. Add block length and blockId in PutKeyResponse. Contributed by Shashikant Banerjee.

HDDS-339. Add block length and blockId in PutKeyResponse. Contributed by Shashikant Banerjee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/398d8955
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/398d8955
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/398d8955

Branch: refs/heads/HADOOP-15407
Commit: 398d89554398a38ffa1347524286cd437f94f3ae
Parents: 15241c6
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Fri Aug 10 23:45:56 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Fri Aug 10 23:45:56 2018 +0530

----------------------------------------------------------------------
 .../main/proto/DatanodeContainerProtocol.proto  |   1 +
 .../container/keyvalue/KeyValueHandler.java     |  18 +-
 .../container/keyvalue/helpers/KeyUtils.java    |  50 +++-
 .../container/keyvalue/impl/KeyManagerImpl.java |   4 +-
 .../keyvalue/interfaces/KeyManager.java         |   3 +-
 .../ozone/scm/TestCommittedBlockLengthAPI.java  | 216 ----------------
 .../TestGetCommittedBlockLengthAndPutKey.java   | 254 +++++++++++++++++++
 7 files changed, 313 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index af06346..930f314 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -308,6 +308,7 @@ message  PutKeyRequestProto {
 }
 
 message  PutKeyResponseProto {
+  required GetCommittedBlockLengthResponseProto committedBlockLength = 1;
 }
 
 message  GetKeyRequestProto  {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index f4699dd..8364a77 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -421,6 +421,7 @@ public class KeyValueHandler extends Handler {
   ContainerCommandResponseProto handlePutKey(
       ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
 
+    long blockLength;
     if (!request.hasPutKey()) {
       LOG.debug("Malformed Put Key request. trace ID: {}",
           request.getTraceID());
@@ -433,7 +434,7 @@ public class KeyValueHandler extends Handler {
       KeyData keyData = KeyData.getFromProtoBuf(
           request.getPutKey().getKeyData());
       long numBytes = keyData.getProtoBufMessage().toByteArray().length;
-      commitKey(keyData, kvContainer);
+      blockLength = commitKey(keyData, kvContainer);
       metrics.incContainerBytesStats(Type.PutKey, numBytes);
     } catch (StorageContainerException ex) {
       return ContainerUtils.logAndReturnError(LOG, ex, request);
@@ -443,7 +444,7 @@ public class KeyValueHandler extends Handler {
           request);
     }
 
-    return KeyUtils.getKeyResponseSuccess(request);
+    return KeyUtils.putKeyResponseSuccess(request, blockLength);
   }
 
   private void commitPendingKeys(KeyValueContainer kvContainer)
@@ -456,12 +457,13 @@ public class KeyValueHandler extends Handler {
     }
   }
 
-  private void commitKey(KeyData keyData, KeyValueContainer kvContainer)
+  private long commitKey(KeyData keyData, KeyValueContainer kvContainer)
       throws IOException {
     Preconditions.checkNotNull(keyData);
-    keyManager.putKey(kvContainer, keyData);
+    long length = keyManager.putKey(kvContainer, keyData);
     //update the open key Map in containerManager
     this.openContainerBlockMap.removeFromKeyMap(keyData.getBlockID());
+    return length;
   }
   /**
    * Handle Get Key operation. Calls KeyManager to process the request.
@@ -662,8 +664,12 @@ public class KeyValueHandler extends Handler {
           request.getWriteChunk().getStage() == Stage.COMBINED) {
         metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk()
             .getChunkData().getLen());
-        // the openContainerBlockMap should be updated only while writing data
-        // not during COMMIT_STAGE of handling write chunk request.
+      }
+
+      if (request.getWriteChunk().getStage() == Stage.COMMIT_DATA
+          || request.getWriteChunk().getStage() == Stage.COMBINED) {
+        // the openContainerBlockMap should be updated only during
+        // COMMIT_STAGE of handling write chunk request.
         openContainerBlockMap.addChunk(blockID, chunkInfoProto);
       }
     } catch (StorageContainerException ex) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
index 2be966d..a83d298 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
@@ -27,6 +27,10 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .GetKeyResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
+    GetCommittedBlockLengthResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
+    PutKeyResponseProto;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
@@ -123,6 +127,26 @@ public final class KeyUtils {
   }
 
   /**
+   * Returns putKey response success.
+   * @param msg - Request.
+   * @return Response.
+   */
+  public static ContainerCommandResponseProto putKeyResponseSuccess(
+      ContainerCommandRequestProto msg, long blockLength) {
+    GetCommittedBlockLengthResponseProto.Builder
+        committedBlockLengthResponseBuilder =
+        getCommittedBlockLengthResponseBuilder(blockLength,
+            msg.getPutKey().getKeyData().getBlockID());
+    PutKeyResponseProto.Builder putKeyResponse =
+        PutKeyResponseProto.newBuilder();
+    putKeyResponse
+        .setCommittedBlockLength(committedBlockLengthResponseBuilder);
+    ContainerProtos.ContainerCommandResponseProto.Builder builder =
+        ContainerUtils.getSuccessResponseBuilder(msg);
+    builder.setPutKey(putKeyResponse);
+    return builder.build();
+  }
+  /**
    * Returns successful keyResponse.
    * @param msg - Request.
    * @return Response.
@@ -150,18 +174,26 @@ public final class KeyUtils {
    * @param msg - Request.
    * @return Response.
    */
-  public static ContainerProtos.ContainerCommandResponseProto
-  getBlockLengthResponse(ContainerProtos.
-      ContainerCommandRequestProto msg, long blockLength) {
+  public static ContainerCommandResponseProto getBlockLengthResponse(
+          ContainerCommandRequestProto msg, long blockLength) {
+    GetCommittedBlockLengthResponseProto.Builder
+        committedBlockLengthResponseBuilder =
+        getCommittedBlockLengthResponseBuilder(blockLength,
+            msg.getGetCommittedBlockLength().getBlockID());
+    ContainerProtos.ContainerCommandResponseProto.Builder builder =
+        ContainerUtils.getSuccessResponseBuilder(msg);
+    builder.setGetCommittedBlockLength(committedBlockLengthResponseBuilder);
+    return builder.build();
+  }
+
+  private static GetCommittedBlockLengthResponseProto.Builder
+  getCommittedBlockLengthResponseBuilder(
+      long blockLength, ContainerProtos.DatanodeBlockID blockID) {
     ContainerProtos.GetCommittedBlockLengthResponseProto.Builder
         getCommittedBlockLengthResponseBuilder = ContainerProtos.
         GetCommittedBlockLengthResponseProto.newBuilder();
     getCommittedBlockLengthResponseBuilder.setBlockLength(blockLength);
-    getCommittedBlockLengthResponseBuilder
-        .setBlockID(msg.getGetCommittedBlockLength().getBlockID());
-    ContainerProtos.ContainerCommandResponseProto.Builder builder =
-        ContainerUtils.getSuccessResponseBuilder(msg);
-    builder.setGetCommittedBlockLength(getCommittedBlockLengthResponseBuilder);
-    return  builder.build();
+    getCommittedBlockLengthResponseBuilder.setBlockID(blockID);
+    return getCommittedBlockLengthResponseBuilder;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java
index 58bf1f8..6370f8e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java
@@ -67,9 +67,10 @@ public class KeyManagerImpl implements KeyManager {
    *
    * @param container - Container for which key need to be added.
    * @param data     - Key Data.
+   * @return length of the key.
    * @throws IOException
    */
-  public void putKey(Container container, KeyData data) throws IOException {
+  public long putKey(Container container, KeyData data) throws IOException {
     Preconditions.checkNotNull(data, "KeyData cannot be null for put " +
         "operation.");
     Preconditions.checkState(data.getContainerID() >= 0, "Container Id " +
@@ -87,6 +88,7 @@ public class KeyManagerImpl implements KeyManager {
 
     // Increment keycount here
     container.getContainerData().incrKeyCount();
+    return data.getSize();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java
index dad688e..37871be 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java
@@ -35,9 +35,10 @@ public interface KeyManager {
    *
    * @param container - Container for which key need to be added.
    * @param data     - Key Data.
+   * @return length of the Key.
    * @throws IOException
    */
-  void putKey(Container container, KeyData data) throws IOException;
+  long putKey(Container container, KeyData data) throws IOException;
 
   /**
    * Gets an existing key.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCommittedBlockLengthAPI.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCommittedBlockLengthAPI.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCommittedBlockLengthAPI.java
deleted file mode 100644
index 3c6479f..0000000
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCommittedBlockLengthAPI.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.scm;
-
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.commons.lang3.RandomUtils;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.hadoop.hdds.scm.container.common.helpers.
-    ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.
-    StorageContainerException;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.
-    ContainerPlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.
-    SCMContainerPlacementCapacity;
-import org.apache.hadoop.hdds.scm.protocolPB.
-    StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-
-import java.util.UUID;
-
-/**
- * Test Container calls.
- */
-public class TestCommittedBlockLengthAPI {
-
-  private static MiniOzoneCluster cluster;
-  private static OzoneConfiguration ozoneConfig;
-  private static StorageContainerLocationProtocolClientSideTranslatorPB
-      storageContainerLocationClient;
-  private static XceiverClientManager xceiverClientManager;
-  private static String containerOwner = "OZONE";
-
-  @BeforeClass
-  public static void init() throws Exception {
-    ozoneConfig = new OzoneConfiguration();
-    ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
-        SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
-    cluster =
-        MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build();
-    cluster.waitForClusterToBeReady();
-    storageContainerLocationClient =
-        cluster.getStorageContainerLocationClient();
-    xceiverClientManager = new XceiverClientManager(ozoneConfig);
-  }
-
-  @AfterClass
-  public static void shutdown() throws InterruptedException {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-    IOUtils.cleanupWithLogger(null, storageContainerLocationClient);
-  }
-
-  @Test
-  public void tesGetCommittedBlockLength() throws Exception {
-    ContainerProtos.GetCommittedBlockLengthResponseProto response;
-    String traceID = UUID.randomUUID().toString();
-    ContainerWithPipeline container = storageContainerLocationClient
-        .allocateContainer(xceiverClientManager.getType(),
-            HddsProtos.ReplicationFactor.ONE, containerOwner);
-    long containerID = container.getContainerInfo().getContainerID();
-    Pipeline pipeline = container.getPipeline();
-    XceiverClientSpi client =
-        xceiverClientManager.acquireClient(pipeline, containerID);
-    //create the container
-    ContainerProtocolCalls.createContainer(client, containerID, traceID);
-
-    BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
-    byte[] data =
-        RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes();
-    ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
-        ContainerTestHelper
-            .getWriteChunkRequest(container.getPipeline(), blockID,
-                data.length);
-    client.sendCommand(writeChunkRequest);
-    // Now, explicitly make a putKey request for the block.
-    ContainerProtos.ContainerCommandRequestProto putKeyRequest =
-        ContainerTestHelper
-            .getPutKeyRequest(pipeline, writeChunkRequest.getWriteChunk());
-    client.sendCommand(putKeyRequest);
-    response = ContainerProtocolCalls
-        .getCommittedBlockLength(client, blockID, traceID);
-    // make sure the block ids in the request and response are same.
-    Assert.assertTrue(
-        BlockID.getFromProtobuf(response.getBlockID()).equals(blockID));
-    Assert.assertTrue(response.getBlockLength() == data.length);
-    xceiverClientManager.releaseClient(client);
-  }
-
-  @Test
-  public void tesGetCommittedBlockLengthWithClosedContainer()
-      throws Exception {
-    String traceID = UUID.randomUUID().toString();
-    ContainerWithPipeline container = storageContainerLocationClient
-        .allocateContainer(xceiverClientManager.getType(),
-            HddsProtos.ReplicationFactor.ONE, containerOwner);
-    long containerID = container.getContainerInfo().getContainerID();
-    Pipeline pipeline = container.getPipeline();
-    XceiverClientSpi client =
-        xceiverClientManager.acquireClient(pipeline, containerID);
-    // create the container
-    ContainerProtocolCalls.createContainer(client, containerID, traceID);
-
-    byte[] data =
-        RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes();
-    BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
-    ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
-        ContainerTestHelper
-            .getWriteChunkRequest(container.getPipeline(), blockID,
-                data.length);
-    client.sendCommand(writeChunkRequest);
-    // close the container
-    ContainerProtocolCalls.closeContainer(client, containerID, traceID);
-    ContainerProtos.GetCommittedBlockLengthResponseProto response =
-        ContainerProtocolCalls
-            .getCommittedBlockLength(client, blockID, traceID);
-    // make sure the block ids in the request and response are same.
-    // This will also ensure that closing the container committed the block
-    // on the Datanodes.
-    Assert.assertTrue(
-        BlockID.getFromProtobuf(response.getBlockID()).equals(blockID));
-    Assert.assertTrue(response.getBlockLength() == data.length);
-    xceiverClientManager.releaseClient(client);
-  }
-
-  @Test
-  public void tesGetCommittedBlockLengthForInvalidBlock() throws Exception {
-    String traceID = UUID.randomUUID().toString();
-    ContainerWithPipeline container = storageContainerLocationClient
-        .allocateContainer(xceiverClientManager.getType(),
-            HddsProtos.ReplicationFactor.ONE, containerOwner);
-    long containerID = container.getContainerInfo().getContainerID();
-    XceiverClientSpi client = xceiverClientManager
-        .acquireClient(container.getPipeline(), containerID);
-    ContainerProtocolCalls.createContainer(client, containerID, traceID);
-
-    BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
-    // move the container to closed state
-    ContainerProtocolCalls.closeContainer(client, containerID, traceID);
-    try {
-      // There is no block written inside the container. The request should
-      // fail.
-      ContainerProtocolCalls.getCommittedBlockLength(client, blockID, traceID);
-      Assert.fail("Expected exception not thrown");
-    } catch (StorageContainerException sce) {
-      Assert.assertTrue(sce.getMessage().contains("Unable to find the key"));
-    }
-    xceiverClientManager.releaseClient(client);
-  }
-
-  @Test
-  public void testGetCommittedBlockLengthForOpenBlock() throws Exception {
-    String traceID = UUID.randomUUID().toString();
-    ContainerWithPipeline container = storageContainerLocationClient
-        .allocateContainer(xceiverClientManager.getType(),
-            HddsProtos.ReplicationFactor.ONE, containerOwner);
-    long containerID = container.getContainerInfo().getContainerID();
-    XceiverClientSpi client = xceiverClientManager
-        .acquireClient(container.getPipeline(), containerID);
-    ContainerProtocolCalls
-        .createContainer(client, containerID, traceID);
-
-    BlockID blockID =
-        ContainerTestHelper.getTestBlockID(containerID);
-    ContainerProtos.ContainerCommandRequestProto requestProto =
-        ContainerTestHelper
-            .getWriteChunkRequest(container.getPipeline(), blockID, 1024);
-    client.sendCommand(requestProto);
-    try {
-      ContainerProtocolCalls.getCommittedBlockLength(client, blockID, traceID);
-      Assert.fail("Expected Exception not thrown");
-    } catch (StorageContainerException sce) {
-      Assert.assertEquals(ContainerProtos.Result.BLOCK_NOT_COMMITTED,
-          sce.getResult());
-    }
-    // now close the container, it should auto commit pending open blocks
-    ContainerProtocolCalls
-        .closeContainer(client, containerID, traceID);
-    ContainerProtos.GetCommittedBlockLengthResponseProto response =
-        ContainerProtocolCalls
-            .getCommittedBlockLength(client, blockID, traceID);
-    Assert.assertTrue(response.getBlockLength() == 1024);
-    xceiverClientManager.releaseClient(client);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
new file mode 100644
index 0000000..f82b0d3
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.scm;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.
+    ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.
+    StorageContainerException;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.
+    ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.
+    SCMContainerPlacementCapacity;
+import org.apache.hadoop.hdds.scm.protocolPB.
+    StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+import java.util.UUID;
+
+/**
+ * Test Container calls.
+ */
+public class TestGetCommittedBlockLengthAndPutKey {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration ozoneConfig;
+  private static StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
+  private static XceiverClientManager xceiverClientManager;
+  private static String containerOwner = "OZONE";
+
+  @BeforeClass
+  public static void init() throws Exception {
+    ozoneConfig = new OzoneConfiguration();
+    ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
+        SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+    cluster =
+        MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build();
+    cluster.waitForClusterToBeReady();
+    storageContainerLocationClient =
+        cluster.getStorageContainerLocationClient();
+    xceiverClientManager = new XceiverClientManager(ozoneConfig);
+  }
+
+  @AfterClass
+  public static void shutdown() throws InterruptedException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    IOUtils.cleanupWithLogger(null, storageContainerLocationClient);
+  }
+
+  @Test
+  public void tesGetCommittedBlockLength() throws Exception {
+    ContainerProtos.GetCommittedBlockLengthResponseProto response;
+    String traceID = UUID.randomUUID().toString();
+    ContainerWithPipeline container = storageContainerLocationClient
+        .allocateContainer(xceiverClientManager.getType(),
+            HddsProtos.ReplicationFactor.ONE, containerOwner);
+    long containerID = container.getContainerInfo().getContainerID();
+    Pipeline pipeline = container.getPipeline();
+    XceiverClientSpi client =
+        xceiverClientManager.acquireClient(pipeline, containerID);
+    //create the container
+    ContainerProtocolCalls.createContainer(client, containerID, traceID);
+
+    BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
+    byte[] data =
+        RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes();
+    ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+        ContainerTestHelper
+            .getWriteChunkRequest(container.getPipeline(), blockID,
+                data.length);
+    client.sendCommand(writeChunkRequest);
+    // Now, explicitly make a putKey request for the block.
+    ContainerProtos.ContainerCommandRequestProto putKeyRequest =
+        ContainerTestHelper
+            .getPutKeyRequest(pipeline, writeChunkRequest.getWriteChunk());
+    client.sendCommand(putKeyRequest);
+    response = ContainerProtocolCalls
+        .getCommittedBlockLength(client, blockID, traceID);
+    // make sure the block ids in the request and response are same.
+    Assert.assertTrue(
+        BlockID.getFromProtobuf(response.getBlockID()).equals(blockID));
+    Assert.assertTrue(response.getBlockLength() == data.length);
+    xceiverClientManager.releaseClient(client);
+  }
+
+  @Test
+  public void tesGetCommittedBlockLengthWithClosedContainer()
+      throws Exception {
+    String traceID = UUID.randomUUID().toString();
+    ContainerWithPipeline container = storageContainerLocationClient
+        .allocateContainer(xceiverClientManager.getType(),
+            HddsProtos.ReplicationFactor.ONE, containerOwner);
+    long containerID = container.getContainerInfo().getContainerID();
+    Pipeline pipeline = container.getPipeline();
+    XceiverClientSpi client =
+        xceiverClientManager.acquireClient(pipeline, containerID);
+    // create the container
+    ContainerProtocolCalls.createContainer(client, containerID, traceID);
+
+    byte[] data =
+        RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes();
+    BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
+    ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+        ContainerTestHelper
+            .getWriteChunkRequest(container.getPipeline(), blockID,
+                data.length);
+    client.sendCommand(writeChunkRequest);
+    // close the container
+    ContainerProtocolCalls.closeContainer(client, containerID, traceID);
+    ContainerProtos.GetCommittedBlockLengthResponseProto response =
+        ContainerProtocolCalls
+            .getCommittedBlockLength(client, blockID, traceID);
+    // make sure the block ids in the request and response are same.
+    // This will also ensure that closing the container committed the block
+    // on the Datanodes.
+    Assert.assertTrue(
+        BlockID.getFromProtobuf(response.getBlockID()).equals(blockID));
+    Assert.assertTrue(response.getBlockLength() == data.length);
+    xceiverClientManager.releaseClient(client);
+  }
+
+  @Test
+  public void tesGetCommittedBlockLengthForInvalidBlock() throws Exception {
+    String traceID = UUID.randomUUID().toString();
+    ContainerWithPipeline container = storageContainerLocationClient
+        .allocateContainer(xceiverClientManager.getType(),
+            HddsProtos.ReplicationFactor.ONE, containerOwner);
+    long containerID = container.getContainerInfo().getContainerID();
+    XceiverClientSpi client = xceiverClientManager
+        .acquireClient(container.getPipeline(), containerID);
+    ContainerProtocolCalls.createContainer(client, containerID, traceID);
+
+    BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
+    // move the container to closed state
+    ContainerProtocolCalls.closeContainer(client, containerID, traceID);
+    try {
+      // There is no block written inside the container. The request should
+      // fail.
+      ContainerProtocolCalls.getCommittedBlockLength(client, blockID, traceID);
+      Assert.fail("Expected exception not thrown");
+    } catch (StorageContainerException sce) {
+      Assert.assertTrue(sce.getMessage().contains("Unable to find the key"));
+    }
+    xceiverClientManager.releaseClient(client);
+  }
+
+  @Test
+  public void testGetCommittedBlockLengthForOpenBlock() throws Exception {
+    String traceID = UUID.randomUUID().toString();
+    ContainerWithPipeline container = storageContainerLocationClient
+        .allocateContainer(xceiverClientManager.getType(),
+            HddsProtos.ReplicationFactor.ONE, containerOwner);
+    long containerID = container.getContainerInfo().getContainerID();
+    XceiverClientSpi client = xceiverClientManager
+        .acquireClient(container.getPipeline(), containerID);
+    ContainerProtocolCalls
+        .createContainer(client, containerID, traceID);
+
+    BlockID blockID =
+        ContainerTestHelper.getTestBlockID(containerID);
+    ContainerProtos.ContainerCommandRequestProto requestProto =
+        ContainerTestHelper
+            .getWriteChunkRequest(container.getPipeline(), blockID, 1024);
+    client.sendCommand(requestProto);
+    try {
+      ContainerProtocolCalls.getCommittedBlockLength(client, blockID, traceID);
+      Assert.fail("Expected Exception not thrown");
+    } catch (StorageContainerException sce) {
+      Assert.assertEquals(ContainerProtos.Result.BLOCK_NOT_COMMITTED,
+          sce.getResult());
+    }
+    // now close the container, it should auto commit pending open blocks
+    ContainerProtocolCalls
+        .closeContainer(client, containerID, traceID);
+    ContainerProtos.GetCommittedBlockLengthResponseProto response =
+        ContainerProtocolCalls
+            .getCommittedBlockLength(client, blockID, traceID);
+    Assert.assertTrue(response.getBlockLength() == 1024);
+    xceiverClientManager.releaseClient(client);
+  }
+
+  @Test
+  public void tesPutKeyResposne() throws Exception {
+    ContainerProtos.PutKeyResponseProto response;
+    String traceID = UUID.randomUUID().toString();
+    ContainerWithPipeline container = storageContainerLocationClient
+        .allocateContainer(xceiverClientManager.getType(),
+            HddsProtos.ReplicationFactor.ONE, containerOwner);
+    long containerID = container.getContainerInfo().getContainerID();
+    Pipeline pipeline = container.getPipeline();
+    XceiverClientSpi client =
+        xceiverClientManager.acquireClient(pipeline, containerID);
+    //create the container
+    ContainerProtocolCalls.createContainer(client, containerID, traceID);
+
+    BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
+    byte[] data =
+        RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes();
+    ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+        ContainerTestHelper
+            .getWriteChunkRequest(container.getPipeline(), blockID,
+                data.length);
+    client.sendCommand(writeChunkRequest);
+    // Now, explicitly make a putKey request for the block.
+    ContainerProtos.ContainerCommandRequestProto putKeyRequest =
+        ContainerTestHelper
+            .getPutKeyRequest(pipeline, writeChunkRequest.getWriteChunk());
+    response = client.sendCommand(putKeyRequest).getPutKey();
+    // make sure the block ids in the request and response are same.
+    // This will also ensure that closing the container committed the block
+    // on the Datanodes.
+    Assert.assertEquals(BlockID
+        .getFromProtobuf(response.getCommittedBlockLength().getBlockID()),
+        blockID);
+    Assert.assertEquals(
+        response.getCommittedBlockLength().getBlockLength(), data.length);
+    xceiverClientManager.releaseClient(client);
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org