You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2018/07/25 01:32:13 UTC

[30/50] hadoop git commit: HDDS-181. CloseContainer should commit all pending open Keys on a datanode. Contributed by Shashikant Banerjee.

HDDS-181. CloseContainer should commit all pending open Keys on a datanode. Contributed by Shashikant Banerjee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bbe2f622
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bbe2f622
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bbe2f622

Branch: refs/heads/HADOOP-15461
Commit: bbe2f6225ea500651de04c064f7b847be18e5b66
Parents: 9fa9e30
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Mon Jul 23 09:12:47 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Mon Jul 23 09:13:03 2018 +0530

----------------------------------------------------------------------
 .../ozone/container/common/helpers/KeyData.java |  20 +-
 .../common/impl/OpenContainerBlockMap.java      | 167 ++++++++++++
 .../container/keyvalue/KeyValueHandler.java     |  69 ++++-
 .../common/impl/TestCloseContainerHandler.java  | 260 +++++++++++++++++++
 4 files changed, 504 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbe2f622/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java
index 129e4a8..b63332f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.ArrayList;
 
 /**
  * Helper class to convert Protobuf to Java classes.
@@ -131,7 +132,25 @@ public class KeyData {
   }
 
   /**
+   * Adds chinkInfo to the list
+   */
+  public void addChunk(ContainerProtos.ChunkInfo chunkInfo) {
+    if (chunks == null) {
+      chunks = new ArrayList<>();
+    }
+    chunks.add(chunkInfo);
+  }
+
+  /**
+   * removes the chunk.
+   */
+  public void removeChunk(ContainerProtos.ChunkInfo chunkInfo) {
+    chunks.remove(chunkInfo);
+  }
+
+  /**
    * Returns container ID.
+   *
    * @return long.
    */
   public long getContainerID() {
@@ -170,5 +189,4 @@ public class KeyData {
   public long getSize() {
     return chunks.parallelStream().mapToLong(e->e.getLen()).sum();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbe2f622/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/OpenContainerBlockMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/OpenContainerBlockMap.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/OpenContainerBlockMap.java
new file mode 100644
index 0000000..ab5f861
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/OpenContainerBlockMap.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * This class will maintain list of open keys per container when closeContainer
+ * command comes, it should autocommit all open keys of a open container before
+ * marking the container as closed.
+ */
+public class OpenContainerBlockMap {
+
+  /**
+   * TODO : We may construct the openBlockMap by reading the Block Layout
+   * for each block inside a container listing all chunk files and reading the
+   * sizes. This will help to recreate the openKeys Map once the DataNode
+   * restarts.
+   *
+   * For now, we will track all open blocks of a container in the blockMap.
+   */
+  private final ConcurrentHashMap<Long, HashMap<Long, KeyData>>
+      openContainerBlockMap;
+
+  /**
+   * Constructs OpenContainerBlockMap.
+   */
+  public OpenContainerBlockMap() {
+     openContainerBlockMap = new ConcurrentHashMap<>();
+  }
+  /**
+   * Removes the Container matching with specified containerId.
+   * @param containerId containerId
+   */
+  public void removeContainer(long containerId) {
+    Preconditions
+        .checkState(containerId >= 0, "Container Id cannot be negative.");
+    openContainerBlockMap.computeIfPresent(containerId, (k, v) -> null);
+  }
+
+  /**
+   * updates the chunkInfoList in case chunk is added or deleted
+   * @param blockID id of the block.
+   * @param info - Chunk Info
+   * @param remove if true, deletes the chunkInfo list otherwise appends to the
+   *               chunkInfo List
+   * @throws IOException
+   */
+  public synchronized void updateOpenKeyMap(BlockID blockID,
+      ContainerProtos.ChunkInfo info, boolean remove) throws IOException {
+    if (remove) {
+      deleteChunkFromMap(blockID, info);
+    } else {
+      addChunkToMap(blockID, info);
+    }
+  }
+
+  private KeyData getKeyData(ContainerProtos.ChunkInfo info, BlockID blockID)
+      throws IOException {
+    KeyData keyData = new KeyData(blockID);
+    keyData.addMetadata("TYPE", "KEY");
+    keyData.addChunk(info);
+    return keyData;
+  }
+
+  private void addChunkToMap(BlockID blockID, ContainerProtos.ChunkInfo info)
+      throws IOException {
+    Preconditions.checkNotNull(info);
+    long containerId = blockID.getContainerID();
+    long localID = blockID.getLocalID();
+
+    KeyData keyData = openContainerBlockMap.computeIfAbsent(containerId,
+        emptyMap -> new LinkedHashMap<Long, KeyData>())
+        .putIfAbsent(localID, getKeyData(info, blockID));
+    // KeyData != null means the block already exist
+    if (keyData != null) {
+      HashMap<Long, KeyData> keyDataSet =
+          openContainerBlockMap.get(containerId);
+      keyDataSet.putIfAbsent(blockID.getLocalID(), getKeyData(info, blockID));
+      keyDataSet.computeIfPresent(blockID.getLocalID(), (key, value) -> {
+        value.addChunk(info);
+        return value;
+      });
+    }
+  }
+
+  /**
+   * removes the chunks from the chunkInfo list for the given block.
+   * @param blockID id of the block
+   * @param chunkInfo chunk info.
+   */
+  private synchronized void deleteChunkFromMap(BlockID blockID,
+      ContainerProtos.ChunkInfo chunkInfo) {
+    Preconditions.checkNotNull(chunkInfo);
+    Preconditions.checkNotNull(blockID);
+    HashMap<Long, KeyData> keyDataMap =
+        openContainerBlockMap.get(blockID.getContainerID());
+    if (keyDataMap != null) {
+      long localId = blockID.getLocalID();
+      KeyData keyData = keyDataMap.get(localId);
+      if (keyData != null) {
+        keyData.removeChunk(chunkInfo);
+      }
+    }
+  }
+
+  /**
+   * returns the list of open to the openContainerBlockMap
+   * @param containerId container id
+   * @return List of open Keys(blocks)
+   */
+  public List<KeyData> getOpenKeys(long containerId) {
+    HashMap<Long, KeyData> keyDataHashMap =
+        openContainerBlockMap.get(containerId);
+    return keyDataHashMap == null ? null :
+        keyDataHashMap.values().stream().collect(Collectors.toList());
+  }
+
+  /**
+   * removes the block from the block map.
+   * @param blockID
+   */
+  public synchronized void removeFromKeyMap(BlockID blockID) {
+    Preconditions.checkNotNull(blockID);
+    HashMap<Long, KeyData> keyDataMap =
+        openContainerBlockMap.get(blockID.getContainerID());
+    if (keyDataMap != null) {
+      keyDataMap.remove(blockID.getLocalID());
+      if (keyDataMap.size() == 0) {
+        removeContainer(blockID.getContainerID());
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public ConcurrentHashMap<Long,
+      HashMap<Long, KeyData>> getContainerOpenKeyMap() {
+    return openContainerBlockMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbe2f622/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 84b3644..9aa3df7 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.impl.OpenContainerBlockMap;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
@@ -117,7 +118,7 @@ public class KeyValueHandler extends Handler {
   private VolumeChoosingPolicy volumeChoosingPolicy;
   private final int maxContainerSizeGB;
   private final AutoCloseableLock handlerLock;
-
+  private final OpenContainerBlockMap openContainerBlockMap;
 
   public KeyValueHandler(Configuration config, ContainerSet contSet,
       VolumeSet volSet, ContainerMetrics metrics) {
@@ -145,6 +146,15 @@ public class KeyValueHandler extends Handler {
     // this handler lock is used for synchronizing createContainer Requests,
     // so using a fair lock here.
     handlerLock = new AutoCloseableLock(new ReentrantLock(true));
+    openContainerBlockMap = new OpenContainerBlockMap();
+  }
+
+  /**
+   * Returns OpenContainerBlockMap instance
+   * @return OpenContainerBlockMap
+   */
+  public OpenContainerBlockMap getOpenContainerBlockMap() {
+    return openContainerBlockMap;
   }
 
   @Override
@@ -333,8 +343,9 @@ public class KeyValueHandler extends Handler {
             "Container cannot be deleted because it is not empty.",
             ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY);
       } else {
-        containerSet.removeContainer(
-            kvContainer.getContainerData().getContainerID());
+        long containerId = kvContainer.getContainerData().getContainerID();
+        containerSet.removeContainer(containerId);
+        openContainerBlockMap.removeContainer(containerId);
         // Release the lock first.
         // Avoid holding write locks for disk operations
         kvContainer.writeUnlock();
@@ -366,9 +377,21 @@ public class KeyValueHandler extends Handler {
     try {
       checkContainerOpen(kvContainer);
 
+      // remove the container from open block map once, all the blocks
+      // have been committed and the container is closed
+      kvContainer.getContainerData()
+          .setState(ContainerProtos.ContainerLifeCycleState.CLOSING);
+      commitPendingKeys(kvContainer);
       kvContainer.close();
+      // make sure the the container open keys from BlockMap gets removed
+      openContainerBlockMap.removeContainer(
+          request.getCloseContainer().getContainerID());
     } catch (StorageContainerException ex) {
       return ContainerUtils.logAndReturnError(LOG, ex, request);
+    } catch (IOException ex) {
+      return ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException("Close Container failed", ex,
+              IO_EXCEPTION), request);
     }
 
     return ContainerUtils.getSuccessResponse(request);
@@ -391,10 +414,8 @@ public class KeyValueHandler extends Handler {
 
       KeyData keyData = KeyData.getFromProtoBuf(
           request.getPutKey().getKeyData());
-      Preconditions.checkNotNull(keyData);
-
-      keyManager.putKey(kvContainer, keyData);
       long numBytes = keyData.getProtoBufMessage().toByteArray().length;
+      commitKey(keyData, kvContainer);
       metrics.incContainerBytesStats(Type.PutKey, numBytes);
     } catch (StorageContainerException ex) {
       return ContainerUtils.logAndReturnError(LOG, ex, request);
@@ -407,6 +428,25 @@ public class KeyValueHandler extends Handler {
     return KeyUtils.getKeyResponseSuccess(request);
   }
 
+  private void commitPendingKeys(KeyValueContainer kvContainer)
+      throws IOException {
+    long containerId = kvContainer.getContainerData().getContainerID();
+    List<KeyData> pendingKeys =
+        this.openContainerBlockMap.getOpenKeys(containerId);
+    if (pendingKeys != null) {
+      for (KeyData keyData : pendingKeys) {
+        commitKey(keyData, kvContainer);
+      }
+    }
+  }
+
+  private void commitKey(KeyData keyData, KeyValueContainer kvContainer)
+      throws IOException {
+    Preconditions.checkNotNull(keyData);
+    keyManager.putKey(kvContainer, keyData);
+    //update the open key Map in containerManager
+    this.openContainerBlockMap.removeFromKeyMap(keyData.getBlockID());
+  }
   /**
    * Handle Get Key operation. Calls KeyManager to process the request.
    */
@@ -519,11 +559,13 @@ public class KeyValueHandler extends Handler {
 
       BlockID blockID = BlockID.getFromProtobuf(
           request.getDeleteChunk().getBlockID());
-      ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(request.getDeleteChunk()
-          .getChunkData());
+      ContainerProtos.ChunkInfo chunkInfoProto = request.getDeleteChunk()
+          .getChunkData();
+      ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
       Preconditions.checkNotNull(chunkInfo);
 
       chunkManager.deleteChunk(kvContainer, blockID, chunkInfo);
+      openContainerBlockMap.updateOpenKeyMap(blockID, chunkInfoProto, true);
     } catch (StorageContainerException ex) {
       return ContainerUtils.logAndReturnError(LOG, ex, request);
     } catch (IOException ex) {
@@ -552,8 +594,9 @@ public class KeyValueHandler extends Handler {
 
       BlockID blockID = BlockID.getFromProtobuf(
           request.getWriteChunk().getBlockID());
-      ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(request.getWriteChunk()
-          .getChunkData());
+      ContainerProtos.ChunkInfo chunkInfoProto =
+          request.getWriteChunk().getChunkData();
+      ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
       Preconditions.checkNotNull(chunkInfo);
 
       byte[] data = null;
@@ -570,6 +613,9 @@ public class KeyValueHandler extends Handler {
           request.getWriteChunk().getStage() == Stage.COMBINED) {
         metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk()
             .getChunkData().getLen());
+        // the openContainerBlockMap should be updated only while writing data
+        // not during COMMIT_STAGE of handling write chunk request.
+        openContainerBlockMap.updateOpenKeyMap(blockID, chunkInfoProto, false);
       }
     } catch (StorageContainerException ex) {
       return ContainerUtils.logAndReturnError(LOG, ex, request);
@@ -610,8 +656,9 @@ public class KeyValueHandler extends Handler {
       ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(
           putSmallFileReq.getChunkInfo());
       Preconditions.checkNotNull(chunkInfo);
-
       byte[] data = putSmallFileReq.getData().toByteArray();
+      // chunks will be committed as a part of handling putSmallFile
+      // here. There is no need to maintain this info in openContainerBlockMap.
       chunkManager.writeChunk(
           kvContainer, blockID, chunkInfo, data, Stage.COMBINED);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbe2f622/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java
new file mode 100644
index 0000000..3ab593e
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.impl;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.Assert;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.LinkedList;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper
+    .createSingleNodePipeline;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper
+    .setDataChecksum;
+
+/**
+ * Simple tests to verify that closeContainer handler on Datanode.
+ */
+public class TestCloseContainerHandler {
+
+  @Rule
+  public TestRule timeout = new Timeout(300000);
+
+  private static Configuration conf;
+  private static HddsDispatcher dispatcher;
+  private static ContainerSet containerSet;
+  private static VolumeSet volumeSet;
+  private static KeyValueHandler handler;
+  private static OpenContainerBlockMap openContainerBlockMap;
+
+  private final static String DATANODE_UUID = UUID.randomUUID().toString();
+
+  private static final String baseDir = MiniDFSCluster.getBaseDirectory();
+  private static final String volume1 = baseDir + "disk1";
+  private static final String volume2 = baseDir + "disk2";
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    conf = new Configuration();
+    String dataDirKey = volume1 + "," + volume2;
+    conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDirKey);
+    containerSet = new ContainerSet();
+    DatanodeDetails datanodeDetails =
+        DatanodeDetails.newBuilder().setUuid(DATANODE_UUID)
+            .setHostName("localhost").setIpAddress("127.0.0.1").build();
+    volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf);
+
+    dispatcher = new HddsDispatcher(conf, containerSet, volumeSet);
+    handler = (KeyValueHandler) dispatcher
+        .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
+    openContainerBlockMap = handler.getOpenContainerBlockMap();
+    dispatcher.setScmId(UUID.randomUUID().toString());
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    // Delete the hdds volume root dir
+    List<HddsVolume> volumes = new ArrayList<>();
+    volumes.addAll(volumeSet.getVolumesList());
+    volumes.addAll(volumeSet.getFailedVolumesList());
+
+    for (HddsVolume volume : volumes) {
+      FileUtils.deleteDirectory(volume.getHddsRootDir());
+    }
+    volumeSet.shutdown();
+  }
+
+  private long createContainer() {
+    long testContainerId = ContainerTestHelper.getTestContainerID();
+    ContainerProtos.CreateContainerRequestProto createReq =
+        ContainerProtos.CreateContainerRequestProto.newBuilder()
+            .setContainerID(testContainerId)
+            .build();
+
+    ContainerProtos.ContainerCommandRequestProto request =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder()
+            .setCmdType(ContainerProtos.Type.CreateContainer)
+            .setDatanodeUuid(DATANODE_UUID)
+            .setCreateContainer(createReq)
+            .build();
+
+    dispatcher.dispatch(request);
+    return testContainerId;
+  }
+
+  private List<ChunkInfo> writeChunkBuilder(BlockID blockID, Pipeline pipeline,
+      int chunkCount)
+      throws IOException, NoSuchAlgorithmException {
+    final int datalen = 1024;
+    long testContainerID = blockID.getContainerID();
+    List<ChunkInfo> chunkList = new LinkedList<>();
+    for (int x = 0; x < chunkCount; x++) {
+      ChunkInfo info = getChunk(blockID.getLocalID(), x, datalen * x, datalen);
+      byte[] data = getData(datalen);
+      setDataChecksum(info, data);
+      ContainerProtos.WriteChunkRequestProto.Builder writeRequest =
+          ContainerProtos.WriteChunkRequestProto.newBuilder();
+      writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf());
+      writeRequest.setChunkData(info.getProtoBufMessage());
+      writeRequest.setData(ByteString.copyFrom(data));
+      writeRequest.setStage(ContainerProtos.Stage.COMBINED);
+      ContainerProtos.ContainerCommandRequestProto.Builder request =
+          ContainerProtos.ContainerCommandRequestProto.newBuilder();
+      request.setCmdType(ContainerProtos.Type.WriteChunk);
+      request.setWriteChunk(writeRequest);
+      request.setTraceID(UUID.randomUUID().toString());
+      request.setDatanodeUuid(pipeline.getLeader().getUuidString());
+      dispatcher.dispatch(request.build());
+      chunkList.add(info);
+    }
+    return chunkList;
+  }
+
+  @Test
+  public void testPutKeyWithMultipleChunks()
+      throws IOException, NoSuchAlgorithmException {
+    long testContainerID = createContainer();
+    Assert.assertNotNull(containerSet.getContainer(testContainerID));
+    BlockID blockID = ContainerTestHelper.
+        getTestBlockID(testContainerID);
+    Pipeline pipeline = createSingleNodePipeline();
+    List<ChunkInfo> chunkList = writeChunkBuilder(blockID, pipeline, 3);
+    // the key should exist in the map
+    Assert.assertTrue(
+        openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)
+            .containsKey(blockID.getLocalID()));
+    KeyData keyData = new KeyData(blockID);
+    List<ContainerProtos.ChunkInfo> chunkProtoList = new LinkedList<>();
+    for (ChunkInfo i : chunkList) {
+      chunkProtoList.add(i.getProtoBufMessage());
+    }
+    keyData.setChunks(chunkProtoList);
+    ContainerProtos.PutKeyRequestProto.Builder putKeyRequestProto =
+        ContainerProtos.PutKeyRequestProto.newBuilder();
+    putKeyRequestProto.setKeyData(keyData.getProtoBufMessage());
+    ContainerProtos.ContainerCommandRequestProto.Builder request =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder();
+    request.setCmdType(ContainerProtos.Type.PutKey);
+    request.setPutKey(putKeyRequestProto);
+    request.setTraceID(UUID.randomUUID().toString());
+    request.setDatanodeUuid(pipeline.getLeader().getUuidString());
+    dispatcher.dispatch(request.build());
+
+    //the open key should be removed from Map
+    Assert.assertNull(
+        openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID));
+  }
+
+  @Test
+  public void testDeleteChunk() throws Exception {
+    long testContainerID = createContainer();
+    Assert.assertNotNull(containerSet.getContainer(testContainerID));
+    BlockID blockID = ContainerTestHelper.
+        getTestBlockID(testContainerID);
+    Pipeline pipeline = createSingleNodePipeline();
+    List<ChunkInfo> chunkList = writeChunkBuilder(blockID, pipeline, 3);
+    // the key should exist in the map
+    Assert.assertTrue(
+        openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)
+            .containsKey(blockID.getLocalID()));
+    Assert.assertTrue(
+        openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)
+            .get(blockID.getLocalID()).getChunks().size() == 3);
+    ContainerProtos.DeleteChunkRequestProto.Builder deleteChunkProto =
+        ContainerProtos.DeleteChunkRequestProto.newBuilder();
+    deleteChunkProto.setBlockID(blockID.getDatanodeBlockIDProtobuf());
+    deleteChunkProto.setChunkData(chunkList.get(0).getProtoBufMessage());
+    ContainerProtos.WriteChunkRequestProto.Builder writeRequest =
+        ContainerProtos.WriteChunkRequestProto.newBuilder();
+    writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf());
+    writeRequest.setChunkData(chunkList.get(0).getProtoBufMessage());
+    ContainerProtos.ContainerCommandRequestProto.Builder request =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder();
+    request.setCmdType(ContainerProtos.Type.DeleteChunk);
+    request.setDeleteChunk(deleteChunkProto);
+    request.setWriteChunk(writeRequest);
+    request.setTraceID(UUID.randomUUID().toString());
+    request.setDatanodeUuid(pipeline.getLeader().getUuidString());
+    dispatcher.dispatch(request.build());
+    Assert.assertTrue(
+        openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)
+            .get(blockID.getLocalID()).getChunks().size() == 2);
+
+  }
+
+  @Test
+  public void testCloseContainer() throws Exception {
+    long testContainerID = createContainer();
+    Assert.assertNotNull(containerSet.getContainer(testContainerID));
+    BlockID blockID = ContainerTestHelper.
+        getTestBlockID(testContainerID);
+    Pipeline pipeline = createSingleNodePipeline();
+    List<ChunkInfo> chunkList = writeChunkBuilder(blockID, pipeline, 3);
+
+    Container container = containerSet.getContainer(testContainerID);
+    KeyData keyData = openContainerBlockMap.getContainerOpenKeyMap().
+        get(testContainerID).get(blockID.getLocalID());
+    // the key should exist in the map
+    Assert.assertTrue(
+        openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)
+            .containsKey(blockID.getLocalID()));
+    Assert.assertTrue(
+        keyData.getChunks().size() == chunkList.size());
+    ContainerProtos.CloseContainerRequestProto.Builder closeContainerProto =
+        ContainerProtos.CloseContainerRequestProto.newBuilder();
+    closeContainerProto.setContainerID(blockID.getContainerID());
+    ContainerProtos.ContainerCommandRequestProto.Builder request =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder();
+    request.setCmdType(ContainerProtos.Type.CloseContainer);
+    request.setCloseContainer(closeContainerProto);
+    request.setTraceID(UUID.randomUUID().toString());
+    request.setDatanodeUuid(pipeline.getLeader().getUuidString());
+    dispatcher.dispatch(request.build());
+    Assert.assertNull(
+        openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID));
+    // Make sure the key got committed
+    Assert.assertNotNull(handler.getKeyManager().getKey(container, blockID));
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org