You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2018/09/05 00:11:01 UTC
hadoop git commit: HDDS-383. Ozone Client should discard preallocated
blocks from closed containers. Contributed by Shashikant Banerjee
Repository: hadoop
Updated Branches:
refs/heads/trunk 6e4c73147 -> 6883fe860
HDDS-383. Ozone Client should discard preallocated blocks from closed containers. Contributed by Shashikant Banerjee
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6883fe86
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6883fe86
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6883fe86
Branch: refs/heads/trunk
Commit: 6883fe860f484da2b835f9f57307b84165ed7f6f
Parents: 6e4c731
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Tue Sep 4 17:10:10 2018 -0700
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Tue Sep 4 17:10:44 2018 -0700
----------------------------------------------------------------------
.../ozone/client/io/ChunkGroupOutputStream.java | 86 +++++++++++++-------
.../hadoop/ozone/om/helpers/OmKeyInfo.java | 4 +
.../rpc/TestCloseContainerHandlingByClient.java | 64 ++++++++++-----
3 files changed, 102 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6883fe86/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
index 21406b5..3742a9a 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
@@ -53,6 +53,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.ListIterator;
/**
* Maintaining a list of ChunkInputStream. Write based on offset.
@@ -81,7 +82,6 @@ public class ChunkGroupOutputStream extends OutputStream {
private final int chunkSize;
private final String requestID;
private boolean closed;
- private List<OmKeyLocationInfo> locationInfoList;
private final RetryPolicy retryPolicy;
/**
* A constructor for testing purpose only.
@@ -97,7 +97,6 @@ public class ChunkGroupOutputStream extends OutputStream {
chunkSize = 0;
requestID = null;
closed = false;
- locationInfoList = null;
retryPolicy = null;
}
@@ -118,9 +117,16 @@ public class ChunkGroupOutputStream extends OutputStream {
return streamEntries;
}
- @VisibleForTesting
- public long getOpenID() {
- return openID;
+ public List<OmKeyLocationInfo> getLocationInfoList() {
+ List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
+ for (ChunkOutputStreamEntry streamEntry : streamEntries) {
+ OmKeyLocationInfo info =
+ new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID)
+ .setShouldCreateContainer(false)
+ .setLength(streamEntry.currentPosition).setOffset(0).build();
+ locationInfoList.add(info);
+ }
+ return locationInfoList;
}
public ChunkGroupOutputStream(
@@ -146,7 +152,6 @@ public class ChunkGroupOutputStream extends OutputStream {
this.xceiverClientManager = xceiverClientManager;
this.chunkSize = chunkSize;
this.requestID = requestId;
- this.locationInfoList = new ArrayList<>();
this.retryPolicy = retryPolicy;
LOG.debug("Expecting open key with one block, but got" +
info.getKeyLocationVersions().size());
@@ -211,18 +216,6 @@ public class ChunkGroupOutputStream extends OutputStream {
streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(),
keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
chunkSize, subKeyInfo.getLength()));
- // reset the original length to zero here. It will be updated as and when
- // the data gets written.
- subKeyInfo.setLength(0);
- locationInfoList.add(subKeyInfo);
- }
-
- private void incrementBlockLength(int index, long length) {
- if (locationInfoList != null) {
- OmKeyLocationInfo locationInfo = locationInfoList.get(index);
- long originalLength = locationInfo.getLength();
- locationInfo.setLength(originalLength + length);
- }
}
@VisibleForTesting
@@ -298,7 +291,6 @@ public class ChunkGroupOutputStream extends OutputStream {
throw ioe;
}
}
- incrementBlockLength(currentStreamIndex, writeLen);
if (current.getRemaining() <= 0) {
// since the current block is already written close the stream.
handleFlushOrClose(true);
@@ -316,12 +308,6 @@ public class ChunkGroupOutputStream extends OutputStream {
ContainerProtos.GetCommittedBlockLengthResponseProto responseProto;
RetryPolicy.RetryAction action;
int numRetries = 0;
-
- // TODO : At this point of time, we also need to allocate new blocks
- // from a different container and may need to nullify
- // all the remaining pre-allocated blocks in case they were
- // pre-allocated on the same container which got closed now.This needs
- // caching the closed container list on the client itself.
while (true) {
try {
responseProto = ContainerProtocolCalls
@@ -367,6 +353,43 @@ public class ChunkGroupOutputStream extends OutputStream {
}
/**
+ * Discards the subsequent pre allocated blocks and removes the streamEntries
+ * from the streamEntries list for the container which is closed.
+ * @param containerID id of the closed container
+ */
+ private void discardPreallocatedBlocks(long containerID) {
+ // currentStreamIndex < streamEntries.size() signifies that, there are still
+ // pre allocated blocks available.
+ if (currentStreamIndex < streamEntries.size()) {
+ ListIterator<ChunkOutputStreamEntry> streamEntryIterator =
+ streamEntries.listIterator(currentStreamIndex);
+ while (streamEntryIterator.hasNext()) {
+ if (streamEntryIterator.next().blockID.getContainerID()
+ == containerID) {
+ streamEntryIterator.remove();
+ }
+ }
+ }
+ }
+
+ /**
+ * It might be possible that the blocks pre allocated might never get written
+ * while the stream gets closed normally. In such cases, it would be a good
+ * idea to trim down the locationInfoList by removing the unused blocks if any
+ * so as only the used block info gets updated on OzoneManager during close.
+ */
+ private void removeEmptyBlocks() {
+ if (currentStreamIndex < streamEntries.size()) {
+ ListIterator<ChunkOutputStreamEntry> streamEntryIterator =
+ streamEntries.listIterator(currentStreamIndex);
+ while (streamEntryIterator.hasNext()) {
+ if (streamEntryIterator.next().currentPosition == 0) {
+ streamEntryIterator.remove();
+ }
+ }
+ }
+ }
+ /**
* It performs following actions :
* a. Updates the committed length at datanode for the current stream in
* datanode.
@@ -396,7 +419,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|| streamEntry.currentPosition != buffer.position()) {
committedLength = getCommittedBlockLength(streamEntry);
// update the length of the current stream
- locationInfoList.get(streamIndex).setLength(committedLength);
+ streamEntry.currentPosition = committedLength;
}
if (buffer.position() > 0) {
@@ -418,10 +441,12 @@ public class ChunkGroupOutputStream extends OutputStream {
// written. Remove it from the current stream list.
if (committedLength == 0) {
streamEntries.remove(streamIndex);
- locationInfoList.remove(streamIndex);
Preconditions.checkArgument(currentStreamIndex != 0);
currentStreamIndex -= 1;
}
+ // discard subsequent pre allocated blocks from the streamEntries list
+ // from the closed container
+ discardPreallocatedBlocks(streamEntry.blockID.getContainerID());
}
private boolean checkIfContainerIsClosed(IOException ioe) {
@@ -433,7 +458,7 @@ public class ChunkGroupOutputStream extends OutputStream {
}
private long getKeyLength() {
- return locationInfoList.parallelStream().mapToLong(e -> e.getLength())
+ return streamEntries.parallelStream().mapToLong(e -> e.currentPosition)
.sum();
}
@@ -506,12 +531,11 @@ public class ChunkGroupOutputStream extends OutputStream {
handleFlushOrClose(true);
if (keyArgs != null) {
// in test, this could be null
- Preconditions.checkState(streamEntries.size() == locationInfoList.size());
+ removeEmptyBlocks();
Preconditions.checkState(byteOffset == getKeyLength());
keyArgs.setDataSize(byteOffset);
- keyArgs.setLocationInfoList(locationInfoList);
+ keyArgs.setLocationInfoList(getLocationInfoList());
omClient.commitKey(keyArgs, openID);
- locationInfoList = null;
} else {
LOG.warn("Closing ChunkGroupOutputStream, but key args is null");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6883fe86/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index f6e4265..50f4b17 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -122,6 +122,7 @@ public final class OmKeyInfo {
* @throws IOException
*/
public void updateLocationInfoList(List<OmKeyLocationInfo> locationInfoList) {
+ long latestVersion = getLatestVersionLocations().getVersion();
OmKeyLocationInfoGroup keyLocationInfoGroup = getLatestVersionLocations();
List<OmKeyLocationInfo> currentList =
keyLocationInfoGroup.getLocationList();
@@ -134,6 +135,9 @@ public final class OmKeyInfo {
// might get closed. The diff of blocks between these two lists here
// need to be garbage collected in case the ozone client dies.
currentList.removeAll(latestVersionList);
+ // set each of the locationInfo object to the latest version
+ locationInfoList.stream().forEach(omKeyLocationInfo -> omKeyLocationInfo
+ .setCreateVersion(latestVersion));
currentList.addAll(locationInfoList);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6883fe86/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
index 9f12633..cf38982 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
@@ -38,12 +38,10 @@ import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -247,9 +245,8 @@ public class TestCloseContainerHandlingByClient {
@Test
public void testMultiBlockWrites2() throws Exception {
-
String keyName = "standalone4";
- long dataLength = 0;
+ long dataLength;
OzoneOutputStream key =
createKey(keyName, ReplicationType.STAND_ALONE, 4 * blockSize);
ChunkGroupOutputStream groupOutputStream =
@@ -293,11 +290,10 @@ public class TestCloseContainerHandlingByClient {
private void waitForContainerClose(String keyName,
OzoneOutputStream outputStream, HddsProtos.ReplicationType type)
throws Exception {
-
ChunkGroupOutputStream groupOutputStream =
(ChunkGroupOutputStream) outputStream.getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
- getLocationInfos(groupOutputStream, keyName);
+ groupOutputStream.getLocationInfoList();
List<Long> containerIdList = new ArrayList<>();
List<Pipeline> pipelineList = new ArrayList<>();
for (OmKeyLocationInfo info : locationInfoList) {
@@ -335,6 +331,46 @@ public class TestCloseContainerHandlingByClient {
}
}
+ @Test
+ public void testDiscardPreallocatedBlocks() throws Exception {
+ String keyName = "discardpreallocatedblocks";
+ OzoneOutputStream key =
+ createKey(keyName, ReplicationType.STAND_ALONE, 2 * blockSize);
+ ChunkGroupOutputStream groupOutputStream =
+ (ChunkGroupOutputStream) key.getOutputStream();
+
+ Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+ // With the initial size provided, it should have pre allocated 4 blocks
+ Assert.assertEquals(2, groupOutputStream.getStreamEntries().size());
+ Assert.assertEquals(2, groupOutputStream.getLocationInfoList().size());
+ String dataString = fixedLengthString(keyString, (1 * blockSize));
+ byte[] data = dataString.getBytes();
+ key.write(data);
+ List<OmKeyLocationInfo> locationInfos =
+ new ArrayList<>(groupOutputStream.getLocationInfoList());
+ long containerID = locationInfos.get(0).getContainerID();
+ List<DatanodeDetails> datanodes =
+ cluster.getStorageContainerManager().getScmContainerManager()
+ .getContainerWithPipeline(containerID).getPipeline().getMachines();
+ Assert.assertEquals(1, datanodes.size());
+ waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE);
+ dataString = fixedLengthString(keyString, (1 * blockSize));
+ data = dataString.getBytes();
+ key.write(data);
+ Assert.assertEquals(2, groupOutputStream.getStreamEntries().size());
+
+ // the 1st block got written. Now all the containers are closed, so the 2nd
+ // pre allocated block will be removed from the list and new block should
+ // have been allocated
+ Assert.assertTrue(
+ groupOutputStream.getLocationInfoList().get(0).getBlockID()
+ .equals(locationInfos.get(0).getBlockID()));
+ Assert.assertFalse(
+ groupOutputStream.getLocationInfoList().get(1).getBlockID()
+ .equals(locationInfos.get(1).getBlockID()));
+ key.close();
+ }
+
private OzoneOutputStream createKey(String keyName, ReplicationType type,
long size) throws Exception {
ReplicationFactor factor =
@@ -344,20 +380,6 @@ public class TestCloseContainerHandlingByClient {
.createKey(keyName, size, type, factor);
}
- private List<OmKeyLocationInfo> getLocationInfos(
- ChunkGroupOutputStream groupOutputStream, String keyName)
- throws IOException {
- long clientId = groupOutputStream.getOpenID();
- OMMetadataManager metadataManager =
- cluster.getOzoneManager().getMetadataManager();
- byte[] openKey = metadataManager
- .getOpenKeyBytes(volumeName, bucketName, keyName, clientId);
- byte[] openKeyData = metadataManager.getOpenKeyTable().get(openKey);
- OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(
- OzoneManagerProtocolProtos.KeyInfo.parseFrom(openKeyData));
- return keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
- }
-
private void validateData(String keyName, byte[] data) throws Exception {
byte[] readData = new byte[data.length];
OzoneInputStream is =
@@ -427,7 +449,7 @@ public class TestCloseContainerHandlingByClient {
String dataString = fixedLengthString(keyString, (3 * chunkSize));
key.write(dataString.getBytes());
List<OmKeyLocationInfo> locationInfos =
- getLocationInfos(groupOutputStream, keyName);
+ groupOutputStream.getLocationInfoList();
long containerID = locationInfos.get(0).getContainerID();
List<DatanodeDetails> datanodes =
cluster.getStorageContainerManager().getScmContainerManager()
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org