You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2018/07/25 01:32:13 UTC
[30/50] hadoop git commit: HDDS-181. CloseContainer should commit all
pending open Keys on a datanode. Contributed by Shashikant Banerjee.
HDDS-181. CloseContainer should commit all pending open Keys on a datanode. Contributed by Shashikant Banerjee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bbe2f622
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bbe2f622
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bbe2f622
Branch: refs/heads/HADOOP-15461
Commit: bbe2f6225ea500651de04c064f7b847be18e5b66
Parents: 9fa9e30
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Mon Jul 23 09:12:47 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Mon Jul 23 09:13:03 2018 +0530
----------------------------------------------------------------------
.../ozone/container/common/helpers/KeyData.java | 20 +-
.../common/impl/OpenContainerBlockMap.java | 167 ++++++++++++
.../container/keyvalue/KeyValueHandler.java | 69 ++++-
.../common/impl/TestCloseContainerHandler.java | 260 +++++++++++++++++++
4 files changed, 504 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbe2f622/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java
index 129e4a8..b63332f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.ArrayList;
/**
* Helper class to convert Protobuf to Java classes.
@@ -131,7 +132,25 @@ public class KeyData {
}
/**
+ * Adds chinkInfo to the list
+ */
+ public void addChunk(ContainerProtos.ChunkInfo chunkInfo) {
+ if (chunks == null) {
+ chunks = new ArrayList<>();
+ }
+ chunks.add(chunkInfo);
+ }
+
+ /**
+ * removes the chunk.
+ */
+ public void removeChunk(ContainerProtos.ChunkInfo chunkInfo) {
+ chunks.remove(chunkInfo);
+ }
+
+ /**
* Returns container ID.
+ *
* @return long.
*/
public long getContainerID() {
@@ -170,5 +189,4 @@ public class KeyData {
public long getSize() {
return chunks.parallelStream().mapToLong(e->e.getLen()).sum();
}
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbe2f622/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/OpenContainerBlockMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/OpenContainerBlockMap.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/OpenContainerBlockMap.java
new file mode 100644
index 0000000..ab5f861
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/OpenContainerBlockMap.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * This class will maintain list of open keys per container when closeContainer
+ * command comes, it should autocommit all open keys of a open container before
+ * marking the container as closed.
+ */
+public class OpenContainerBlockMap {
+
+ /**
+ * TODO : We may construct the openBlockMap by reading the Block Layout
+ * for each block inside a container listing all chunk files and reading the
+ * sizes. This will help to recreate the openKeys Map once the DataNode
+ * restarts.
+ *
+ * For now, we will track all open blocks of a container in the blockMap.
+ */
+ private final ConcurrentHashMap<Long, HashMap<Long, KeyData>>
+ openContainerBlockMap;
+
+ /**
+ * Constructs OpenContainerBlockMap.
+ */
+ public OpenContainerBlockMap() {
+ openContainerBlockMap = new ConcurrentHashMap<>();
+ }
+ /**
+ * Removes the Container matching with specified containerId.
+ * @param containerId containerId
+ */
+ public void removeContainer(long containerId) {
+ Preconditions
+ .checkState(containerId >= 0, "Container Id cannot be negative.");
+ openContainerBlockMap.computeIfPresent(containerId, (k, v) -> null);
+ }
+
+ /**
+ * updates the chunkInfoList in case chunk is added or deleted
+ * @param blockID id of the block.
+ * @param info - Chunk Info
+ * @param remove if true, deletes the chunkInfo list otherwise appends to the
+ * chunkInfo List
+ * @throws IOException
+ */
+ public synchronized void updateOpenKeyMap(BlockID blockID,
+ ContainerProtos.ChunkInfo info, boolean remove) throws IOException {
+ if (remove) {
+ deleteChunkFromMap(blockID, info);
+ } else {
+ addChunkToMap(blockID, info);
+ }
+ }
+
+ private KeyData getKeyData(ContainerProtos.ChunkInfo info, BlockID blockID)
+ throws IOException {
+ KeyData keyData = new KeyData(blockID);
+ keyData.addMetadata("TYPE", "KEY");
+ keyData.addChunk(info);
+ return keyData;
+ }
+
+ private void addChunkToMap(BlockID blockID, ContainerProtos.ChunkInfo info)
+ throws IOException {
+ Preconditions.checkNotNull(info);
+ long containerId = blockID.getContainerID();
+ long localID = blockID.getLocalID();
+
+ KeyData keyData = openContainerBlockMap.computeIfAbsent(containerId,
+ emptyMap -> new LinkedHashMap<Long, KeyData>())
+ .putIfAbsent(localID, getKeyData(info, blockID));
+ // KeyData != null means the block already exist
+ if (keyData != null) {
+ HashMap<Long, KeyData> keyDataSet =
+ openContainerBlockMap.get(containerId);
+ keyDataSet.putIfAbsent(blockID.getLocalID(), getKeyData(info, blockID));
+ keyDataSet.computeIfPresent(blockID.getLocalID(), (key, value) -> {
+ value.addChunk(info);
+ return value;
+ });
+ }
+ }
+
+ /**
+ * removes the chunks from the chunkInfo list for the given block.
+ * @param blockID id of the block
+ * @param chunkInfo chunk info.
+ */
+ private synchronized void deleteChunkFromMap(BlockID blockID,
+ ContainerProtos.ChunkInfo chunkInfo) {
+ Preconditions.checkNotNull(chunkInfo);
+ Preconditions.checkNotNull(blockID);
+ HashMap<Long, KeyData> keyDataMap =
+ openContainerBlockMap.get(blockID.getContainerID());
+ if (keyDataMap != null) {
+ long localId = blockID.getLocalID();
+ KeyData keyData = keyDataMap.get(localId);
+ if (keyData != null) {
+ keyData.removeChunk(chunkInfo);
+ }
+ }
+ }
+
+ /**
+ * returns the list of open to the openContainerBlockMap
+ * @param containerId container id
+ * @return List of open Keys(blocks)
+ */
+ public List<KeyData> getOpenKeys(long containerId) {
+ HashMap<Long, KeyData> keyDataHashMap =
+ openContainerBlockMap.get(containerId);
+ return keyDataHashMap == null ? null :
+ keyDataHashMap.values().stream().collect(Collectors.toList());
+ }
+
+ /**
+ * removes the block from the block map.
+ * @param blockID
+ */
+ public synchronized void removeFromKeyMap(BlockID blockID) {
+ Preconditions.checkNotNull(blockID);
+ HashMap<Long, KeyData> keyDataMap =
+ openContainerBlockMap.get(blockID.getContainerID());
+ if (keyDataMap != null) {
+ keyDataMap.remove(blockID.getLocalID());
+ if (keyDataMap.size() == 0) {
+ removeContainer(blockID.getContainerID());
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public ConcurrentHashMap<Long,
+ HashMap<Long, KeyData>> getContainerOpenKeyMap() {
+ return openContainerBlockMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbe2f622/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 84b3644..9aa3df7 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.impl.OpenContainerBlockMap;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
@@ -117,7 +118,7 @@ public class KeyValueHandler extends Handler {
private VolumeChoosingPolicy volumeChoosingPolicy;
private final int maxContainerSizeGB;
private final AutoCloseableLock handlerLock;
-
+ private final OpenContainerBlockMap openContainerBlockMap;
public KeyValueHandler(Configuration config, ContainerSet contSet,
VolumeSet volSet, ContainerMetrics metrics) {
@@ -145,6 +146,15 @@ public class KeyValueHandler extends Handler {
// this handler lock is used for synchronizing createContainer Requests,
// so using a fair lock here.
handlerLock = new AutoCloseableLock(new ReentrantLock(true));
+ openContainerBlockMap = new OpenContainerBlockMap();
+ }
+
+ /**
+ * Returns OpenContainerBlockMap instance
+ * @return OpenContainerBlockMap
+ */
+ public OpenContainerBlockMap getOpenContainerBlockMap() {
+ return openContainerBlockMap;
}
@Override
@@ -333,8 +343,9 @@ public class KeyValueHandler extends Handler {
"Container cannot be deleted because it is not empty.",
ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY);
} else {
- containerSet.removeContainer(
- kvContainer.getContainerData().getContainerID());
+ long containerId = kvContainer.getContainerData().getContainerID();
+ containerSet.removeContainer(containerId);
+ openContainerBlockMap.removeContainer(containerId);
// Release the lock first.
// Avoid holding write locks for disk operations
kvContainer.writeUnlock();
@@ -366,9 +377,21 @@ public class KeyValueHandler extends Handler {
try {
checkContainerOpen(kvContainer);
+ // remove the container from open block map once, all the blocks
+ // have been committed and the container is closed
+ kvContainer.getContainerData()
+ .setState(ContainerProtos.ContainerLifeCycleState.CLOSING);
+ commitPendingKeys(kvContainer);
kvContainer.close();
+ // make sure the the container open keys from BlockMap gets removed
+ openContainerBlockMap.removeContainer(
+ request.getCloseContainer().getContainerID());
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
+ } catch (IOException ex) {
+ return ContainerUtils.logAndReturnError(LOG,
+ new StorageContainerException("Close Container failed", ex,
+ IO_EXCEPTION), request);
}
return ContainerUtils.getSuccessResponse(request);
@@ -391,10 +414,8 @@ public class KeyValueHandler extends Handler {
KeyData keyData = KeyData.getFromProtoBuf(
request.getPutKey().getKeyData());
- Preconditions.checkNotNull(keyData);
-
- keyManager.putKey(kvContainer, keyData);
long numBytes = keyData.getProtoBufMessage().toByteArray().length;
+ commitKey(keyData, kvContainer);
metrics.incContainerBytesStats(Type.PutKey, numBytes);
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
@@ -407,6 +428,25 @@ public class KeyValueHandler extends Handler {
return KeyUtils.getKeyResponseSuccess(request);
}
+ private void commitPendingKeys(KeyValueContainer kvContainer)
+ throws IOException {
+ long containerId = kvContainer.getContainerData().getContainerID();
+ List<KeyData> pendingKeys =
+ this.openContainerBlockMap.getOpenKeys(containerId);
+ if (pendingKeys != null) {
+ for (KeyData keyData : pendingKeys) {
+ commitKey(keyData, kvContainer);
+ }
+ }
+ }
+
+ private void commitKey(KeyData keyData, KeyValueContainer kvContainer)
+ throws IOException {
+ Preconditions.checkNotNull(keyData);
+ keyManager.putKey(kvContainer, keyData);
+ //update the open key Map in containerManager
+ this.openContainerBlockMap.removeFromKeyMap(keyData.getBlockID());
+ }
/**
* Handle Get Key operation. Calls KeyManager to process the request.
*/
@@ -519,11 +559,13 @@ public class KeyValueHandler extends Handler {
BlockID blockID = BlockID.getFromProtobuf(
request.getDeleteChunk().getBlockID());
- ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(request.getDeleteChunk()
- .getChunkData());
+ ContainerProtos.ChunkInfo chunkInfoProto = request.getDeleteChunk()
+ .getChunkData();
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
Preconditions.checkNotNull(chunkInfo);
chunkManager.deleteChunk(kvContainer, blockID, chunkInfo);
+ openContainerBlockMap.updateOpenKeyMap(blockID, chunkInfoProto, true);
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
} catch (IOException ex) {
@@ -552,8 +594,9 @@ public class KeyValueHandler extends Handler {
BlockID blockID = BlockID.getFromProtobuf(
request.getWriteChunk().getBlockID());
- ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(request.getWriteChunk()
- .getChunkData());
+ ContainerProtos.ChunkInfo chunkInfoProto =
+ request.getWriteChunk().getChunkData();
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
Preconditions.checkNotNull(chunkInfo);
byte[] data = null;
@@ -570,6 +613,9 @@ public class KeyValueHandler extends Handler {
request.getWriteChunk().getStage() == Stage.COMBINED) {
metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk()
.getChunkData().getLen());
+ // the openContainerBlockMap should be updated only while writing data
+ // not during COMMIT_STAGE of handling write chunk request.
+ openContainerBlockMap.updateOpenKeyMap(blockID, chunkInfoProto, false);
}
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
@@ -610,8 +656,9 @@ public class KeyValueHandler extends Handler {
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(
putSmallFileReq.getChunkInfo());
Preconditions.checkNotNull(chunkInfo);
-
byte[] data = putSmallFileReq.getData().toByteArray();
+ // chunks will be committed as a part of handling putSmallFile
+ // here. There is no need to maintain this info in openContainerBlockMap.
chunkManager.writeChunk(
kvContainer, blockID, chunkInfo, data, Stage.COMBINED);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbe2f622/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java
new file mode 100644
index 0000000..3ab593e
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.impl;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.Assert;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.LinkedList;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper
+ .createSingleNodePipeline;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper
+ .setDataChecksum;
+
+/**
+ * Simple tests to verify that closeContainer handler on Datanode.
+ */
+public class TestCloseContainerHandler {
+
+ @Rule
+ public TestRule timeout = new Timeout(300000);
+
+ private static Configuration conf;
+ private static HddsDispatcher dispatcher;
+ private static ContainerSet containerSet;
+ private static VolumeSet volumeSet;
+ private static KeyValueHandler handler;
+ private static OpenContainerBlockMap openContainerBlockMap;
+
+ private final static String DATANODE_UUID = UUID.randomUUID().toString();
+
+ private static final String baseDir = MiniDFSCluster.getBaseDirectory();
+ private static final String volume1 = baseDir + "disk1";
+ private static final String volume2 = baseDir + "disk2";
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ conf = new Configuration();
+ String dataDirKey = volume1 + "," + volume2;
+ conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDirKey);
+ containerSet = new ContainerSet();
+ DatanodeDetails datanodeDetails =
+ DatanodeDetails.newBuilder().setUuid(DATANODE_UUID)
+ .setHostName("localhost").setIpAddress("127.0.0.1").build();
+ volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf);
+
+ dispatcher = new HddsDispatcher(conf, containerSet, volumeSet);
+ handler = (KeyValueHandler) dispatcher
+ .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
+ openContainerBlockMap = handler.getOpenContainerBlockMap();
+ dispatcher.setScmId(UUID.randomUUID().toString());
+ }
+
+ @AfterClass
+ public static void shutdown() throws IOException {
+ // Delete the hdds volume root dir
+ List<HddsVolume> volumes = new ArrayList<>();
+ volumes.addAll(volumeSet.getVolumesList());
+ volumes.addAll(volumeSet.getFailedVolumesList());
+
+ for (HddsVolume volume : volumes) {
+ FileUtils.deleteDirectory(volume.getHddsRootDir());
+ }
+ volumeSet.shutdown();
+ }
+
+ private long createContainer() {
+ long testContainerId = ContainerTestHelper.getTestContainerID();
+ ContainerProtos.CreateContainerRequestProto createReq =
+ ContainerProtos.CreateContainerRequestProto.newBuilder()
+ .setContainerID(testContainerId)
+ .build();
+
+ ContainerProtos.ContainerCommandRequestProto request =
+ ContainerProtos.ContainerCommandRequestProto.newBuilder()
+ .setCmdType(ContainerProtos.Type.CreateContainer)
+ .setDatanodeUuid(DATANODE_UUID)
+ .setCreateContainer(createReq)
+ .build();
+
+ dispatcher.dispatch(request);
+ return testContainerId;
+ }
+
+ private List<ChunkInfo> writeChunkBuilder(BlockID blockID, Pipeline pipeline,
+ int chunkCount)
+ throws IOException, NoSuchAlgorithmException {
+ final int datalen = 1024;
+ long testContainerID = blockID.getContainerID();
+ List<ChunkInfo> chunkList = new LinkedList<>();
+ for (int x = 0; x < chunkCount; x++) {
+ ChunkInfo info = getChunk(blockID.getLocalID(), x, datalen * x, datalen);
+ byte[] data = getData(datalen);
+ setDataChecksum(info, data);
+ ContainerProtos.WriteChunkRequestProto.Builder writeRequest =
+ ContainerProtos.WriteChunkRequestProto.newBuilder();
+ writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf());
+ writeRequest.setChunkData(info.getProtoBufMessage());
+ writeRequest.setData(ByteString.copyFrom(data));
+ writeRequest.setStage(ContainerProtos.Stage.COMBINED);
+ ContainerProtos.ContainerCommandRequestProto.Builder request =
+ ContainerProtos.ContainerCommandRequestProto.newBuilder();
+ request.setCmdType(ContainerProtos.Type.WriteChunk);
+ request.setWriteChunk(writeRequest);
+ request.setTraceID(UUID.randomUUID().toString());
+ request.setDatanodeUuid(pipeline.getLeader().getUuidString());
+ dispatcher.dispatch(request.build());
+ chunkList.add(info);
+ }
+ return chunkList;
+ }
+
+ @Test
+ public void testPutKeyWithMultipleChunks()
+ throws IOException, NoSuchAlgorithmException {
+ long testContainerID = createContainer();
+ Assert.assertNotNull(containerSet.getContainer(testContainerID));
+ BlockID blockID = ContainerTestHelper.
+ getTestBlockID(testContainerID);
+ Pipeline pipeline = createSingleNodePipeline();
+ List<ChunkInfo> chunkList = writeChunkBuilder(blockID, pipeline, 3);
+ // the key should exist in the map
+ Assert.assertTrue(
+ openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)
+ .containsKey(blockID.getLocalID()));
+ KeyData keyData = new KeyData(blockID);
+ List<ContainerProtos.ChunkInfo> chunkProtoList = new LinkedList<>();
+ for (ChunkInfo i : chunkList) {
+ chunkProtoList.add(i.getProtoBufMessage());
+ }
+ keyData.setChunks(chunkProtoList);
+ ContainerProtos.PutKeyRequestProto.Builder putKeyRequestProto =
+ ContainerProtos.PutKeyRequestProto.newBuilder();
+ putKeyRequestProto.setKeyData(keyData.getProtoBufMessage());
+ ContainerProtos.ContainerCommandRequestProto.Builder request =
+ ContainerProtos.ContainerCommandRequestProto.newBuilder();
+ request.setCmdType(ContainerProtos.Type.PutKey);
+ request.setPutKey(putKeyRequestProto);
+ request.setTraceID(UUID.randomUUID().toString());
+ request.setDatanodeUuid(pipeline.getLeader().getUuidString());
+ dispatcher.dispatch(request.build());
+
+ //the open key should be removed from Map
+ Assert.assertNull(
+ openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID));
+ }
+
+ @Test
+ public void testDeleteChunk() throws Exception {
+ long testContainerID = createContainer();
+ Assert.assertNotNull(containerSet.getContainer(testContainerID));
+ BlockID blockID = ContainerTestHelper.
+ getTestBlockID(testContainerID);
+ Pipeline pipeline = createSingleNodePipeline();
+ List<ChunkInfo> chunkList = writeChunkBuilder(blockID, pipeline, 3);
+ // the key should exist in the map
+ Assert.assertTrue(
+ openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)
+ .containsKey(blockID.getLocalID()));
+ Assert.assertTrue(
+ openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)
+ .get(blockID.getLocalID()).getChunks().size() == 3);
+ ContainerProtos.DeleteChunkRequestProto.Builder deleteChunkProto =
+ ContainerProtos.DeleteChunkRequestProto.newBuilder();
+ deleteChunkProto.setBlockID(blockID.getDatanodeBlockIDProtobuf());
+ deleteChunkProto.setChunkData(chunkList.get(0).getProtoBufMessage());
+ ContainerProtos.WriteChunkRequestProto.Builder writeRequest =
+ ContainerProtos.WriteChunkRequestProto.newBuilder();
+ writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf());
+ writeRequest.setChunkData(chunkList.get(0).getProtoBufMessage());
+ ContainerProtos.ContainerCommandRequestProto.Builder request =
+ ContainerProtos.ContainerCommandRequestProto.newBuilder();
+ request.setCmdType(ContainerProtos.Type.DeleteChunk);
+ request.setDeleteChunk(deleteChunkProto);
+ request.setWriteChunk(writeRequest);
+ request.setTraceID(UUID.randomUUID().toString());
+ request.setDatanodeUuid(pipeline.getLeader().getUuidString());
+ dispatcher.dispatch(request.build());
+ Assert.assertTrue(
+ openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)
+ .get(blockID.getLocalID()).getChunks().size() == 2);
+
+ }
+
+ @Test
+ public void testCloseContainer() throws Exception {
+ long testContainerID = createContainer();
+ Assert.assertNotNull(containerSet.getContainer(testContainerID));
+ BlockID blockID = ContainerTestHelper.
+ getTestBlockID(testContainerID);
+ Pipeline pipeline = createSingleNodePipeline();
+ List<ChunkInfo> chunkList = writeChunkBuilder(blockID, pipeline, 3);
+
+ Container container = containerSet.getContainer(testContainerID);
+ KeyData keyData = openContainerBlockMap.getContainerOpenKeyMap().
+ get(testContainerID).get(blockID.getLocalID());
+ // the key should exist in the map
+ Assert.assertTrue(
+ openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)
+ .containsKey(blockID.getLocalID()));
+ Assert.assertTrue(
+ keyData.getChunks().size() == chunkList.size());
+ ContainerProtos.CloseContainerRequestProto.Builder closeContainerProto =
+ ContainerProtos.CloseContainerRequestProto.newBuilder();
+ closeContainerProto.setContainerID(blockID.getContainerID());
+ ContainerProtos.ContainerCommandRequestProto.Builder request =
+ ContainerProtos.ContainerCommandRequestProto.newBuilder();
+ request.setCmdType(ContainerProtos.Type.CloseContainer);
+ request.setCloseContainer(closeContainerProto);
+ request.setTraceID(UUID.randomUUID().toString());
+ request.setDatanodeUuid(pipeline.getLeader().getUuidString());
+ dispatcher.dispatch(request.build());
+ Assert.assertNull(
+ openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID));
+ // Make sure the key got committed
+ Assert.assertNotNull(handler.getKeyManager().getKey(container, blockID));
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org