You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ha...@apache.org on 2021/04/01 17:18:16 UTC
[ozone] branch master updated: HDDS-4553. ChunkInputStream should
release buffer as soon as last byte in the buffer is read (#2062)
This is an automated email from the ASF dual-hosted git repository.
hanishakoneru pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new b57041e HDDS-4553. ChunkInputStream should release buffer as soon as last byte in the buffer is read (#2062)
b57041e is described below
commit b57041ea00aef98dd13e64f642825fcf2cbbf78a
Author: Hanisha Koneru <ha...@apache.org>
AuthorDate: Thu Apr 1 10:17:52 2021 -0700
HDDS-4553. ChunkInputStream should release buffer as soon as last byte in the buffer is read (#2062)
---
.../hadoop/hdds/scm/storage/ChunkInputStream.java | 119 +++++++++++++-------
.../hdds/scm/storage/DummyChunkInputStream.java | 5 +-
.../hdds/scm/storage/TestChunkInputStream.java | 25 +++--
.../common/ChunkBufferImplWithByteBufferList.java | 13 ++-
.../ozone/common/IncrementalChunkBuffer.java | 13 ++-
.../hadoop/ozone/common/utils/BufferUtils.java | 36 +++++-
.../keyvalue/impl/FilePerBlockStrategy.java | 27 +----
.../keyvalue/impl/FilePerChunkStrategy.java | 29 +----
.../keyvalue/interfaces/ChunkManager.java | 31 +++++
.../client/rpc/read/TestChunkInputStream.java | 125 +++++++++++++++++----
10 files changed, 299 insertions(+), 124 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
index dc9123d..0653d1b 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hdds.scm.storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
-import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.Seekable;
@@ -71,8 +70,8 @@ public class ChunkInputStream extends InputStream
private final Supplier<Pipeline> pipelineSupplier;
private boolean verifyChecksum;
private boolean allocated = false;
- // Buffer to store the chunk data read from the DN container
- private List<ByteBuffer> buffers;
+ // Buffers to store the chunk data read from the DN container
+ private ByteBuffer[] buffers;
// Index of the buffers corresponding to the current position of the buffers
private int bufferIndex;
@@ -89,6 +88,9 @@ public class ChunkInputStream extends InputStream
// of chunk data
private long bufferOffsetWrtChunkData;
+ // Index of the first buffer which has not been released
+ private int firstUnreleasedBufferIndex = 0;
+
// The number of bytes of chunk data residing in the buffers currently
private long buffersSize;
@@ -133,13 +135,11 @@ public class ChunkInputStream extends InputStream
// been released by now
Preconditions.checkState(buffers == null);
} else {
- dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
+ dataout = Byte.toUnsignedInt(buffers[bufferIndex].get());
}
- if (chunkStreamEOF()) {
- // consumer might use getPos to determine EOF,
- // so release buffers when serving the last byte of data
- releaseBuffers();
+ if (bufferEOF()) {
+ releaseBuffers(bufferIndex);
}
return dataout;
@@ -179,15 +179,13 @@ public class ChunkInputStream extends InputStream
Preconditions.checkState(buffers == null);
return total != 0 ? total : EOF;
}
- buffers.get(bufferIndex).get(b, off + total, available);
+ buffers[bufferIndex].get(b, off + total, available);
len -= available;
total += available;
- }
- if (chunkStreamEOF()) {
- // smart consumers determine EOF by calling getPos()
- // so we release buffers when serving the final bytes of data
- releaseBuffers();
+ if (bufferEOF()) {
+ releaseBuffers(bufferIndex);
+ }
}
return total;
@@ -233,7 +231,7 @@ public class ChunkInputStream extends InputStream
// BufferOffset w.r.t to ChunkData + BufferOffset w.r.t buffers +
// Position of current Buffer
return bufferOffsetWrtChunkData + bufferOffsets[bufferIndex] +
- buffers.get(bufferIndex).position();
+ buffers[bufferIndex].position();
}
if (buffersAllocated()) {
return bufferOffsetWrtChunkData + buffersSize;
@@ -289,7 +287,7 @@ public class ChunkInputStream extends InputStream
}
if (buffersHaveData()) {
// Data is available from buffers
- ByteBuffer bb = buffers.get(bufferIndex);
+ ByteBuffer bb = buffers[bufferIndex];
return len > bb.remaining() ? bb.remaining() : len;
} else if (dataRemainingInChunk()) {
// There is more data in the chunk stream which has not
@@ -370,14 +368,15 @@ public class ChunkInputStream extends InputStream
buffers = readChunk(readChunkInfo);
buffersSize = readChunkInfo.getLen();
- bufferOffsets = new long[buffers.size()];
+ bufferOffsets = new long[buffers.length];
int tempOffset = 0;
- for (int i = 0; i < buffers.size(); i++) {
+ for (int i = 0; i < buffers.length; i++) {
bufferOffsets[i] = tempOffset;
- tempOffset += buffers.get(i).limit();
+ tempOffset += buffers[i].limit();
}
bufferIndex = 0;
+ firstUnreleasedBufferIndex = 0;
allocated = true;
}
@@ -385,7 +384,7 @@ public class ChunkInputStream extends InputStream
* Send RPC call to get the chunk from the container.
*/
@VisibleForTesting
- protected List<ByteBuffer> readChunk(ChunkInfo readChunkInfo)
+ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo)
throws IOException {
ReadChunkResponseProto readChunkResponse;
@@ -405,13 +404,12 @@ public class ChunkInputStream extends InputStream
}
if (readChunkResponse.hasData()) {
- return readChunkResponse.getData().asReadOnlyByteBufferList();
+ return readChunkResponse.getData().asReadOnlyByteBufferList()
+ .toArray(new ByteBuffer[0]);
} else if (readChunkResponse.hasDataBuffers()) {
List<ByteString> buffersList = readChunkResponse.getDataBuffers()
.getBuffersList();
- return buffersList.stream()
- .map(ByteString::asReadOnlyByteBuffer)
- .collect(Collectors.toList());
+ return BufferUtils.getReadOnlyByteBuffersArray(buffersList);
} else {
throw new IOException("Unexpected error while reading chunk data " +
"from container. No data returned.");
@@ -508,21 +506,21 @@ public class ChunkInputStream extends InputStream
private void adjustBufferPosition(long bufferPosition) {
// The bufferPosition is w.r.t the current buffers.
// Adjust the bufferIndex and position to the seeked bufferPosition.
- if (bufferIndex >= buffers.size()) {
+ if (bufferIndex >= buffers.length) {
bufferIndex = Arrays.binarySearch(bufferOffsets, bufferPosition);
} else if (bufferPosition < bufferOffsets[bufferIndex]) {
bufferIndex = Arrays.binarySearch(bufferOffsets, 0, bufferIndex,
bufferPosition);
} else if (bufferPosition >= bufferOffsets[bufferIndex] +
- buffers.get(bufferIndex).capacity()) {
+ buffers[bufferIndex].capacity()) {
bufferIndex = Arrays.binarySearch(bufferOffsets, bufferIndex + 1,
- buffers.size(), bufferPosition);
+ buffers.length, bufferPosition);
}
if (bufferIndex < 0) {
bufferIndex = -bufferIndex - 2;
}
- buffers.get(bufferIndex).position(
+ buffers[bufferIndex].position(
(int) (bufferPosition - bufferOffsets[bufferIndex]));
// Reset buffers > bufferIndex to position 0. We do this to reset any
@@ -531,8 +529,8 @@ public class ChunkInputStream extends InputStream
// not required for this read. If a seek was done to a position in the
// previous indices, the buffer position reset would be performed in the
// seek call.
- for (int i = bufferIndex + 1; i < buffers.size(); i++) {
- buffers.get(i).position(0);
+ for (int i = bufferIndex + 1; i < buffers.length; i++) {
+ buffers[i].position(0);
}
// Reset the chunkPosition as chunk stream has been initialized i.e. the
@@ -545,7 +543,7 @@ public class ChunkInputStream extends InputStream
*/
@VisibleForTesting
protected boolean buffersAllocated() {
- return buffers != null && !buffers.isEmpty();
+ return buffers != null && buffers.length > 0;
}
/**
@@ -556,8 +554,9 @@ public class ChunkInputStream extends InputStream
boolean hasData = false;
if (buffersAllocated()) {
- while (bufferIndex < (buffers.size())) {
- if (buffers.get(bufferIndex).hasRemaining()) {
+ while (bufferIndex < (buffers.length)) {
+ if (buffers[bufferIndex] != null &&
+ buffers[bufferIndex].hasRemaining()) {
// current buffer has data
hasData = true;
break;
@@ -565,7 +564,7 @@ public class ChunkInputStream extends InputStream
if (buffersRemaining()) {
// move to next available buffer
++bufferIndex;
- Preconditions.checkState(bufferIndex < buffers.size());
+ Preconditions.checkState(bufferIndex < buffers.length);
} else {
// no more buffers remaining
break;
@@ -578,7 +577,7 @@ public class ChunkInputStream extends InputStream
}
private boolean buffersRemaining() {
- return (bufferIndex < (buffers.size() - 1));
+ return (bufferIndex < (buffers.length - 1));
}
/**
@@ -588,7 +587,10 @@ public class ChunkInputStream extends InputStream
// Check if buffers have been allocated
if (buffersAllocated()) {
// Check if the current buffers cover the input position
- return pos >= bufferOffsetWrtChunkData &&
+ // Released buffers should not be considered when checking if position
+ // is available
+ return pos >= bufferOffsetWrtChunkData +
+ bufferOffsets[firstUnreleasedBufferIndex] &&
pos < bufferOffsetWrtChunkData + buffersSize;
}
return false;
@@ -610,6 +612,21 @@ public class ChunkInputStream extends InputStream
}
/**
+ * Check if current buffer had been read till the end.
+ */
+ private boolean bufferEOF() {
+ if (!allocated) {
+ // Chunk data has not been read yet
+ return false;
+ }
+
+ if (!buffers[bufferIndex].hasRemaining()) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
* Check if end of chunkStream has been reached.
*/
private boolean chunkStreamEOF() {
@@ -628,12 +645,38 @@ public class ChunkInputStream extends InputStream
}
}
+
+ /**
+ * Release the buffers upto the given index.
+ * @param releaseUptoBufferIndex bufferIndex (inclusive) upto which the
+ * buffers must be released
+ */
+ private void releaseBuffers(int releaseUptoBufferIndex) {
+ if (releaseUptoBufferIndex == buffers.length - 1) {
+ // Before releasing all the buffers, if chunk EOF is not reached, then
+ // chunkPosition should be set to point to the last position of the
+ // buffers. This should be done so that getPos() can return the current
+ // chunk position
+ chunkPosition = bufferOffsetWrtChunkData +
+ bufferOffsets[releaseUptoBufferIndex] +
+ buffers[releaseUptoBufferIndex].capacity();
+ // Release all the buffers
+ releaseBuffers();
+ } else {
+ for (int i = 0; i <= releaseUptoBufferIndex; i++) {
+ buffers[i] = null;
+ }
+ firstUnreleasedBufferIndex = releaseUptoBufferIndex + 1;
+ }
+ }
+
/**
* If EOF is reached, release the buffers.
*/
private void releaseBuffers() {
buffers = null;
bufferIndex = 0;
+ firstUnreleasedBufferIndex = 0;
// We should not reset bufferOffsetWrtChunkData and buffersSize here
// because when getPos() is called in chunkStreamEOF() we use these
// values and determine whether chunk is read completely or not.
@@ -671,7 +714,7 @@ public class ChunkInputStream extends InputStream
}
@VisibleForTesting
- public List<ByteBuffer> getCachedBuffers() {
- return buffers;
+ public ByteBuffer[] getCachedBuffers() {
+ return BufferUtils.getReadOnlyByteBuffers(buffers);
}
}
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
index 8453210..78d0c05 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
@@ -50,7 +50,7 @@ public class DummyChunkInputStream extends ChunkInputStream {
}
@Override
- protected List<ByteBuffer> readChunk(ChunkInfo readChunkInfo) {
+ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo) {
int offset = (int) readChunkInfo.getOffset();
int remainingToRead = (int) readChunkInfo.getLen();
@@ -72,7 +72,8 @@ public class DummyChunkInputStream extends ChunkInputStream {
remainingToRead -= bufferLen;
}
- return BufferUtils.getReadOnlyByteBuffers(readByteBuffers);
+ return BufferUtils.getReadOnlyByteBuffers(readByteBuffers)
+ .toArray(new ByteBuffer[0]);
}
@Override
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
index f6d9b8d..b06b702 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
@@ -172,16 +172,26 @@ public class TestChunkInputStream {
// buffers and the chunkPosition should be reset to -1.
Assert.assertEquals(-1, chunkStream.getChunkPosition());
- // Seek to a position within the current buffers. Current buffers contain
- // data from index 20 to 59. ChunkPosition should still not be used to
- // set the position.
- seekAndVerify(35);
+ // Only the last BYTES_PER_CHECKSUM will be cached in the buffers as
+ // buffers are released after each checksum boundary is read. So the
+ // buffers should contain data from index 40 to 59.
+ // Seek to a position within the cached buffers. ChunkPosition should
+ // still not be used to set the position.
+ seekAndVerify(45);
Assert.assertEquals(-1, chunkStream.getChunkPosition());
- // Seek to a position outside the current buffers. In this case, the
+ // Seek to a position outside the current cached buffers. In this case, the
// chunkPosition should be updated to the seeked position.
seekAndVerify(75);
Assert.assertEquals(75, chunkStream.getChunkPosition());
+
+ // Read upto checksum boundary should result in all the buffers being
+ // released and hence chunkPosition updated with current position of chunk.
+ seekAndVerify(25);
+ b = new byte[15];
+ chunkStream.read(b, 0, 15);
+ matchWithInputData(b, 25, 15);
+ Assert.assertEquals(40, chunkStream.getChunkPosition());
}
@Test
@@ -229,8 +239,9 @@ public class TestChunkInputStream {
ChunkInputStream subject = new ChunkInputStream(chunkInfo, null,
clientFactory, pipelineRef::get, false, null) {
@Override
- protected List<ByteBuffer> readChunk(ChunkInfo readChunkInfo) {
- return ByteString.copyFrom(chunkData).asReadOnlyByteBufferList();
+ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo) {
+ return ByteString.copyFrom(chunkData).asReadOnlyByteBufferList()
+ .toArray(new ByteBuffer[0]);
}
};
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
index 94ad552..f3f3173 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
@@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.Iterator;
import java.util.NoSuchElementException;
-import java.util.stream.Collectors;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import java.io.IOException;
@@ -218,13 +217,21 @@ public class ChunkBufferImplWithByteBufferList implements ChunkBuffer {
@Override
public ByteString toByteStringImpl(Function<ByteBuffer, ByteString> f) {
- return buffers.stream().map(f).reduce(ByteString.EMPTY, ByteString::concat);
+ ByteString result = ByteString.EMPTY;
+ for (ByteBuffer buffer : buffers) {
+ result = result.concat(f.apply(buffer));
+ }
+ return result;
}
@Override
public List<ByteString> toByteStringListImpl(
Function<ByteBuffer, ByteString> f) {
- return buffers.stream().map(f).collect(Collectors.toList());
+ List<ByteString> byteStringList = new ArrayList<>();
+ for (ByteBuffer buffer : buffers) {
+ byteStringList.add(f.apply(buffer));
+ }
+ return byteStringList;
}
@Override
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
index ce7b1eb..7622ffc 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.common;
import com.google.common.base.Preconditions;
-import java.util.stream.Collectors;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import java.io.IOException;
@@ -271,13 +270,21 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
@Override
public ByteString toByteStringImpl(Function<ByteBuffer, ByteString> f) {
- return buffers.stream().map(f).reduce(ByteString.EMPTY, ByteString::concat);
+ ByteString result = ByteString.EMPTY;
+ for (ByteBuffer buffer : buffers) {
+ result = result.concat(f.apply(buffer));
+ }
+ return result;
}
@Override
public List<ByteString> toByteStringListImpl(
Function<ByteBuffer, ByteString> f) {
- return buffers.stream().map(f).collect(Collectors.toList());
+ List<ByteString> byteStringList = new ArrayList<>();
+ for (ByteBuffer buffer : buffers) {
+ byteStringList.add(f.apply(buffer));
+ }
+ return byteStringList;
}
@Override
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
index a8be69d..61e96fb 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
@@ -71,8 +71,36 @@ public final class BufferUtils {
return buffers;
}
+ /**
+ * Return a read only copy of ByteBuffer array.
+ */
+ public static ByteBuffer[] getReadOnlyByteBuffers(
+ ByteBuffer[] byteBuffers) {
+ if (byteBuffers == null) {
+ return null;
+ }
+ ByteBuffer[] readOnlyBuffers = new ByteBuffer[byteBuffers.length];
+ for (int i = 0; i < byteBuffers.length; i++) {
+ readOnlyBuffers[i] = byteBuffers[i] == null ?
+ null : byteBuffers[i].asReadOnlyBuffer();
+ }
+ return readOnlyBuffers;
+ }
+
+ /**
+ * Return a read only ByteBuffer array for the input ByteStrings list.
+ */
+ public static ByteBuffer[] getReadOnlyByteBuffersArray(
+ List<ByteString> byteStrings) {
+ return getReadOnlyByteBuffers(byteStrings).toArray(new ByteBuffer[0]);
+ }
+
public static ByteString concatByteStrings(List<ByteString> byteStrings) {
- return byteStrings.stream().reduce(ByteString::concat).orElse(null);
+ ByteString result = ByteString.EMPTY;
+ for (ByteString byteString : byteStrings) {
+ result = result.concat(byteString);
+ }
+ return result;
}
/**
@@ -96,4 +124,10 @@ public final class BufferUtils {
public static int getNumberOfBins(long numElements, long maxElementsPerBin) {
return (int) Math.ceil((double) numElements / (double) maxElementsPerBin);
}
+
+ public static void clearBuffers(ByteBuffer[] byteBuffers) {
+ for (ByteBuffer buffer : byteBuffers) {
+ buffer.clear();
+ }
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
index 925ef71..0594e17 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
@@ -153,30 +152,8 @@ public class FilePerBlockStrategy implements ChunkManager {
long len = info.getLen();
long offset = info.getOffset();
-
- long bufferCapacity = 0;
- if (info.isReadDataIntoSingleBuffer()) {
- // Older client - read all chunk data into one single buffer.
- bufferCapacity = len;
- } else {
- // Set buffer capacity to checksum boundary size so that each buffer
- // corresponds to one checksum. If checksum is NONE, then set buffer
- // capacity to default (OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY = 64KB).
- ChecksumData checksumData = info.getChecksumData();
-
- if (checksumData != null) {
- if (checksumData.getChecksumType() ==
- ContainerProtos.ChecksumType.NONE) {
- bufferCapacity = defaultReadBufferCapacity;
- } else {
- bufferCapacity = checksumData.getBytesPerChecksum();
- }
- }
- }
- // If the buffer capacity is 0, set all the data into one ByteBuffer
- if (bufferCapacity == 0) {
- bufferCapacity = len;
- }
+ long bufferCapacity = ChunkManager.getBufferCapacityForChunkRead(info,
+ defaultReadBufferCapacity);
ByteBuffer[] dataBuffers = BufferUtils.assignByteBuffers(len,
bufferCapacity);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
index d7f994e..82ac4e8 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
@@ -228,30 +227,8 @@ public class FilePerChunkStrategy implements ChunkManager {
}
long len = info.getLen();
-
- long bufferCapacity = 0;
- if (info.isReadDataIntoSingleBuffer()) {
- // Older client - read all chunk data into one single buffer.
- bufferCapacity = len;
- } else {
- // Set buffer capacity to checksum boundary size so that each buffer
- // corresponds to one checksum. If checksum is NONE, then set buffer
- // capacity to default (OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY = 64KB).
- ChecksumData checksumData = info.getChecksumData();
-
- if (checksumData != null) {
- if (checksumData.getChecksumType() ==
- ContainerProtos.ChecksumType.NONE) {
- bufferCapacity = defaultReadBufferCapacity;
- } else {
- bufferCapacity = checksumData.getBytesPerChecksum();
- }
- }
- }
- // If the buffer capacity is 0, set all the data into one ByteBuffer
- if (bufferCapacity == 0) {
- bufferCapacity = len;
- }
+ long bufferCapacity = ChunkManager.getBufferCapacityForChunkRead(info,
+ defaultReadBufferCapacity);
ByteBuffer[] dataBuffers = BufferUtils.assignByteBuffers(len,
bufferCapacity);
@@ -296,7 +273,7 @@ public class FilePerChunkStrategy implements ChunkManager {
if (ex.getResult() != UNABLE_TO_FIND_CHUNK) {
throw ex;
}
- dataBuffers = null;
+ BufferUtils.clearBuffers(dataBuffers);
}
}
throw new StorageContainerException(
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
index 8db72f7..15ff9d6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.ozone.container.keyvalue.interfaces;
*/
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
@@ -101,4 +103,33 @@ public interface ChunkManager {
BlockData blockData) throws IOException {
// no-op
}
+
+ static long getBufferCapacityForChunkRead(ChunkInfo chunkInfo,
+ long defaultReadBufferCapacity) {
+ long bufferCapacity = 0;
+ if (chunkInfo.isReadDataIntoSingleBuffer()) {
+ // Older client - read all chunk data into one single buffer.
+ bufferCapacity = chunkInfo.getLen();
+ } else {
+ // Set buffer capacity to checksum boundary size so that each buffer
+ // corresponds to one checksum. If checksum is NONE, then set buffer
+ // capacity to default (OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY = 64KB).
+ ChecksumData checksumData = chunkInfo.getChecksumData();
+
+ if (checksumData != null) {
+ if (checksumData.getChecksumType() ==
+ ContainerProtos.ChecksumType.NONE) {
+ bufferCapacity = defaultReadBufferCapacity;
+ } else {
+ bufferCapacity = checksumData.getBytesPerChecksum();
+ }
+ }
+ }
+ // If the buffer capacity is 0, set all the data into one ByteBuffer
+ if (bufferCapacity == 0) {
+ bufferCapacity = chunkInfo.getLen();
+ }
+
+ return bufferCapacity;
+ }
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
index 2ad91a7..fc051c8 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.ozone.client.rpc.read;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.List;
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import org.apache.hadoop.hdds.scm.storage.ChunkInputStream;
import org.apache.hadoop.ozone.client.io.KeyInputStream;
@@ -56,12 +55,12 @@ public class TestChunkInputStream extends TestInputStreamBase {
// To read 1 byte of chunk data, ChunkInputStream should get one full
// checksum boundary worth of data from Container and store it in buffers.
chunk0Stream.read(new byte[1]);
- checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1,
+ checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, 0,
BYTES_PER_CHECKSUM);
// Read > checksum boundary of data from chunk0
int readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
- byte[] readData = readDataFromChunk(chunk0Stream, readDataLen, 0);
+ byte[] readData = readDataFromChunk(chunk0Stream, 0, readDataLen);
validateData(inputData, 0, readData);
// The first checksum boundary size of data was already existing in the
@@ -69,52 +68,140 @@ public class TestChunkInputStream extends TestInputStreamBase {
// boundary size of data will be fetched again to read the remaining data.
// Hence there should be 1 checksum boundary size of data stored in the
// ChunkStreams buffers at the end of the read.
- checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1,
+ checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, 0,
BYTES_PER_CHECKSUM);
// Seek to a position in the third checksum boundary (so that current
// buffers do not have the seeked position) and read > BYTES_PER_CHECKSUM
// bytes of data. This should result in 2 * BYTES_PER_CHECKSUM amount of
- // data being read into the buffers. There should be 2 buffers each with
- // BYTES_PER_CHECKSUM capacity.
+ // data being read into the buffers. There should be 2 buffers in the
+ // stream but the the first buffer should be released after it is read
+ // and the second buffer should have BYTES_PER_CHECKSUM capacity.
readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
int offset = 2 * BYTES_PER_CHECKSUM + 1;
- readData = readDataFromChunk(chunk0Stream, readDataLen, offset);
+ readData = readDataFromChunk(chunk0Stream, offset, readDataLen);
validateData(inputData, offset, readData);
- checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 2,
+ checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 2, 1,
BYTES_PER_CHECKSUM);
- // Read the full chunk data -1 and verify that all chunk data is read into
- // buffers. We read CHUNK_SIZE - 1 as otherwise the buffers will be
+ // Read the full chunk data - 1 and verify that all chunk data is read into
+ // buffers. We read CHUNK_SIZE - 1 as otherwise all the buffers will be
// released once all chunk data is read.
- readData = readDataFromChunk(chunk0Stream, CHUNK_SIZE - 1, 0);
+ readData = readDataFromChunk(chunk0Stream, 0, CHUNK_SIZE - 1);
validateData(inputData, 0, readData);
+ int expectedNumBuffers = CHUNK_SIZE / BYTES_PER_CHECKSUM;
checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(),
- CHUNK_SIZE / BYTES_PER_CHECKSUM, BYTES_PER_CHECKSUM);
+ expectedNumBuffers, expectedNumBuffers - 1, BYTES_PER_CHECKSUM);
// Read the last byte of chunk and verify that the buffers are released.
chunk0Stream.read(new byte[1]);
Assert.assertNull("ChunkInputStream did not release buffers after " +
"reaching EOF.", chunk0Stream.getCachedBuffers());
+ }
+
+ /**
+ * Test that ChunkInputStream buffers are released as soon as the last byte
+ * of the buffer is read.
+ */
+ @Test
+ public void testBufferRelease() throws Exception {
+ String keyName = getNewKeyName();
+ int dataLength = CHUNK_SIZE;
+ byte[] inputData = writeRandomBytes(keyName, dataLength);
+ try (KeyInputStream keyInputStream = getKeyInputStream(keyName)) {
+
+ BlockInputStream block0Stream = keyInputStream.getBlockStreams().get(0);
+ block0Stream.initialize();
+
+ ChunkInputStream chunk0Stream = block0Stream.getChunkStreams().get(0);
+
+ // Read checksum boundary - 1 bytes of data
+ int readDataLen = BYTES_PER_CHECKSUM - 1;
+ byte[] readData = readDataFromChunk(chunk0Stream, 0, readDataLen);
+ validateData(inputData, 0, readData);
+
+ // There should be 1 byte of data remaining in the buffer which is not
+ // yet read. Hence, the buffer should not be released.
+ checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(),
+ 1, 0, BYTES_PER_CHECKSUM);
+ Assert.assertEquals(1, chunk0Stream.getCachedBuffers()[0].remaining());
+
+ // Reading the last byte in the buffer should result in all the buffers
+ // being released.
+ readData = readDataFromChunk(chunk0Stream, 1);
+ validateData(inputData, readDataLen, readData);
+ Assert
+ .assertNull("Chunk stream buffers not released after last byte is " +
+ "read", chunk0Stream.getCachedBuffers());
+
+ // Read more data to get the data till the next checksum boundary.
+ readDataLen = BYTES_PER_CHECKSUM / 2;
+ readData = readDataFromChunk(chunk0Stream, readDataLen);
+ // There should be one buffer and the buffer should not be released as
+ // there is data pending to be read from the buffer
+ checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, 0,
+ BYTES_PER_CHECKSUM);
+ ByteBuffer lastCachedBuffer = chunk0Stream.getCachedBuffers()[0];
+ Assert.assertEquals(BYTES_PER_CHECKSUM - readDataLen,
+ lastCachedBuffer.remaining());
+
+ // Read more than the remaining data in buffer (but less than the next
+ // checksum boundary).
+ int position = (int) chunk0Stream.getPos();
+ readDataLen = lastCachedBuffer.remaining() + BYTES_PER_CHECKSUM / 2;
+ readData = readDataFromChunk(chunk0Stream, readDataLen);
+ validateData(inputData, position, readData);
+ // After reading the remaining data in the buffer, the buffer should be
+ // released and next checksum size of data must be read into the buffers
+ checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, 0,
+ BYTES_PER_CHECKSUM);
+ // Verify that the previously cached buffer is released by comparing it
+ // with the current cached buffer
+ Assert.assertNotEquals(lastCachedBuffer,
+ chunk0Stream.getCachedBuffers()[0]);
+ }
}
private byte[] readDataFromChunk(ChunkInputStream chunkInputStream,
- int readDataLength, int offset) throws IOException {
+ int offset, int readDataLength) throws IOException {
byte[] readData = new byte[readDataLength];
chunkInputStream.seek(offset);
chunkInputStream.read(readData, 0, readDataLength);
return readData;
}
- private void checkBufferSizeAndCapacity(List<ByteBuffer> buffers,
- int expectedNumBuffers, long expectedBufferCapacity) {
+ private byte[] readDataFromChunk(ChunkInputStream chunkInputStream,
+ int readDataLength) throws IOException {
+ byte[] readData = new byte[readDataLength];
+ chunkInputStream.read(readData, 0, readDataLength);
+ return readData;
+ }
+
+ /**
+ * Verify number of buffers and their capacities.
+ * @param buffers chunk stream buffers
+ * @param expectedNumBuffers expected number of buffers
+ * @param numReleasedBuffers first numReleasedBuffers are expected to
+ * be released and hence null
+ * @param expectedBufferCapacity expected buffer capacity of unreleased
+ * buffers
+ */
+ private void checkBufferSizeAndCapacity(ByteBuffer[] buffers,
+ int expectedNumBuffers, int numReleasedBuffers,
+ long expectedBufferCapacity) {
Assert.assertEquals("ChunkInputStream does not have expected number of " +
- "ByteBuffers", expectedNumBuffers, buffers.size());
- for (ByteBuffer buffer : buffers) {
- Assert.assertEquals("ChunkInputStream ByteBuffer capacity is wrong",
- expectedBufferCapacity, buffer.capacity());
+ "ByteBuffers", expectedNumBuffers, buffers.length);
+ for (int i = 0; i < buffers.length; i++) {
+ if (i <= numReleasedBuffers - 1) {
+ // This buffer should have been released and hence null
+ Assert.assertNull("ChunkInputStream Buffer not released after being " +
+ "read", buffers[i]);
+ } else {
+ Assert.assertEquals("ChunkInputStream ByteBuffer capacity is wrong",
+ expectedBufferCapacity, buffers[i].capacity());
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org