You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2019/06/07 02:44:46 UTC
[hadoop] branch trunk updated: HDDS-1496. Support partial chunk
reads and checksum verification (#804)
This is an automated email from the ASF dual-hosted git repository.
hanishakoneru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new a91d24f HDDS-1496. Support partial chunk reads and checksum verification (#804)
a91d24f is described below
commit a91d24fea45c2d269fabe46d43d5d4156ba47e1c
Author: Hanisha Koneru <ha...@apache.org>
AuthorDate: Thu Jun 6 19:44:40 2019 -0700
HDDS-1496. Support partial chunk reads and checksum verification (#804)
---
.../hadoop/hdds/scm/storage/BlockInputStream.java | 634 +++++++++------------
.../hadoop/hdds/scm/storage/ChunkInputStream.java | 546 ++++++++++++++++++
.../hdds/scm/storage/TestBlockInputStream.java | 251 ++++----
.../hdds/scm/storage/TestChunkInputStream.java | 224 ++++++++
.../org/apache/hadoop/ozone/common/Checksum.java | 29 +-
.../apache/hadoop/ozone/common/ChecksumData.java | 37 +-
.../hadoop/ozone/client/io/KeyInputStream.java | 380 +++++-------
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 4 +-
.../web/storage/DistributedStorageHandler.java | 3 +-
.../apache/hadoop/ozone/om/TestChunkStreams.java | 11 +-
10 files changed, 1377 insertions(+), 742 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 82fb106..bccbc9b 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -19,470 +19,370 @@
package org.apache.hadoop.hdds.scm.storage;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.container.common.helpers
- .StorageContainerException;
-import org.apache.hadoop.ozone.common.Checksum;
-import org.apache.hadoop.ozone.common.ChecksumData;
-import org.apache.hadoop.ozone.common.OzoneChecksumException;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ReadChunkResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
- ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
- ContainerCommandRequestProto;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
- * An {@link InputStream} used by the REST service in combination with the
- * SCMClient to read the value of a key from a sequence
- * of container chunks. All bytes of the key value are stored in container
- * chunks. Each chunk may contain multiple underlying {@link ByteBuffer}
- * instances. This class encapsulates all state management for iterating
- * through the sequence of chunks and the sequence of buffers within each chunk.
+ * An {@link InputStream} called from KeyInputStream to read a block from the
+ * container.
+ * This class encapsulates all state management for iterating
+ * through the sequence of chunks through {@link ChunkInputStream}.
*/
public class BlockInputStream extends InputStream implements Seekable {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(BlockInputStream.class);
+
private static final int EOF = -1;
private final BlockID blockID;
+ private final long length;
+ private Pipeline pipeline;
+ private final Token<OzoneBlockTokenIdentifier> token;
+ private final boolean verifyChecksum;
private final String traceID;
private XceiverClientManager xceiverClientManager;
private XceiverClientSpi xceiverClient;
- private List<ChunkInfo> chunks;
- // ChunkIndex points to the index current chunk in the buffers or the the
- // index of chunk which will be read next into the buffers in
- // readChunkFromContainer().
+ private boolean initialized = false;
+
+ // List of ChunkInputStreams, one for each chunk in the block
+ private List<ChunkInputStream> chunkStreams;
+
+ // chunkOffsets[i] stores the index of the first data byte in
+ // chunkStream i w.r.t the block data.
+ // Let’s say we have chunk size as 40 bytes. And let's say the parent
+ // block stores data from index 200 and has length 400.
+ // The first 40 bytes of this block will be stored in chunk[0], next 40 in
+ // chunk[1] and so on. But since the chunkOffsets are w.r.t the block only
+ // and not the key, the values in chunkOffsets will be [0, 40, 80,....].
+ private long[] chunkOffsets = null;
+
+ // Index of the chunkStream corresponding to the current position of the
+ // BlockInputStream i.e offset of the data to be read next from this block
private int chunkIndex;
- // ChunkIndexOfCurrentBuffer points to the index of chunk read into the
- // buffers or index of the last chunk in the buffers. It is updated only
- // when a new chunk is read from container into the buffers.
- private int chunkIndexOfCurrentBuffer;
- private long[] chunkOffset;
- private List<ByteBuffer> buffers;
- private int bufferIndex;
- private long bufferPosition;
- private boolean verifyChecksum;
- /**
- * Creates a new BlockInputStream.
- *
- * @param blockID block ID of the chunk
- * @param xceiverClientManager client manager that controls client
- * @param xceiverClient client to perform container calls
- * @param chunks list of chunks to read
- * @param traceID container protocol call traceID
- * @param verifyChecksum verify checksum
- * @param initialPosition the initial position of the stream pointer. This
- * position is seeked now if the up-stream was seeked
- * before this was created.
- */
- public BlockInputStream(
- BlockID blockID, XceiverClientManager xceiverClientManager,
- XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID,
- boolean verifyChecksum, long initialPosition) throws IOException {
- this.blockID = blockID;
- this.traceID = traceID;
- this.xceiverClientManager = xceiverClientManager;
- this.xceiverClient = xceiverClient;
- this.chunks = chunks;
- this.chunkIndex = 0;
- this.chunkIndexOfCurrentBuffer = -1;
- // chunkOffset[i] stores offset at which chunk i stores data in
- // BlockInputStream
- this.chunkOffset = new long[this.chunks.size()];
- initializeChunkOffset();
- this.buffers = null;
- this.bufferIndex = 0;
- this.bufferPosition = -1;
+ // Position of the BlockInputStream is maintainted by this variable till
+ // the stream is initialized. This position is w.r.t to the block only and
+ // not the key.
+ // For the above example, if we seek to position 240 before the stream is
+ // initialized, then value of blockPosition will be set to 40.
+ // Once, the stream is initialized, the position of the stream
+ // will be determined by the current chunkStream and its position.
+ private long blockPosition = 0;
+
+ // Tracks the chunkIndex corresponding to the last blockPosition so that it
+ // can be reset if a new position is seeked.
+ private int chunkIndexOfPrevPosition;
+
+ public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
+ Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+ String traceId, XceiverClientManager xceiverClientManager) {
+ this.blockID = blockId;
+ this.length = blockLen;
+ this.pipeline = pipeline;
+ this.token = token;
this.verifyChecksum = verifyChecksum;
- if (initialPosition > 0) {
- // The stream was seeked to a position before the stream was
- // initialized. So seeking to the position now.
- seek(initialPosition);
- }
+ this.traceID = traceId;
+ this.xceiverClientManager = xceiverClientManager;
}
- private void initializeChunkOffset() {
- long tempOffset = 0;
- for (int i = 0; i < chunks.size(); i++) {
- chunkOffset[i] = tempOffset;
- tempOffset += chunks.get(i).getLen();
+ /**
+ * Initialize the BlockInputStream. Get the BlockData (list of chunks) from
+ * the Container and create the ChunkInputStreams for each Chunk in the Block.
+ */
+ public synchronized void initialize() throws IOException {
+
+ // Pre-check that the stream has not been intialized already
+ if (initialized) {
+ return;
}
- }
- @Override
- public synchronized int read()
- throws IOException {
- checkOpen();
- int available = prepareRead(1);
- int dataout = EOF;
+ List<ChunkInfo> chunks = getChunkInfos();
+ if (chunks != null && !chunks.isEmpty()) {
+ // For each chunk in the block, create a ChunkInputStream and compute
+ // its chunkOffset
+ this.chunkOffsets = new long[chunks.size()];
+ long tempOffset = 0;
+
+ this.chunkStreams = new ArrayList<>(chunks.size());
+ for (int i = 0; i < chunks.size(); i++) {
+ addStream(chunks.get(i));
+ chunkOffsets[i] = tempOffset;
+ tempOffset += chunks.get(i).getLen();
+ }
- if (available == EOF) {
- Preconditions
- .checkState(buffers == null); //should have released by now, see below
- } else {
- dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
- }
+ initialized = true;
+ this.chunkIndex = 0;
- if (blockStreamEOF()) {
- // consumer might use getPos to determine EOF,
- // so release buffers when serving the last byte of data
- releaseBuffers();
+ if (blockPosition > 0) {
+ // Stream was seeked to blockPosition before initialization. Seek to the
+ // blockPosition now.
+ seek(blockPosition);
+ }
}
-
- return dataout;
}
- @Override
- public synchronized int read(byte[] b, int off, int len) throws IOException {
- // According to the JavaDocs for InputStream, it is recommended that
- // subclasses provide an override of bulk read if possible for performance
- // reasons. In addition to performance, we need to do it for correctness
- // reasons. The Ozone REST service uses PipedInputStream and
- // PipedOutputStream to relay HTTP response data between a Jersey thread and
- // a Netty thread. It turns out that PipedInputStream/PipedOutputStream
- // have a subtle dependency (bug?) on the wrapped stream providing separate
- // implementations of single-byte read and bulk read. Without this, get key
- // responses might close the connection before writing all of the bytes
- // advertised in the Content-Length.
- if (b == null) {
- throw new NullPointerException();
- }
- if (off < 0 || len < 0 || len > b.length - off) {
- throw new IndexOutOfBoundsException();
- }
- if (len == 0) {
- return 0;
- }
- checkOpen();
- int total = 0;
- while (len > 0) {
- int available = prepareRead(len);
- if (available == EOF) {
- Preconditions
- .checkState(buffers == null); //should have been released by now
- return total != 0 ? total : EOF;
- }
- buffers.get(bufferIndex).get(b, off + total, available);
- len -= available;
- total += available;
+ /**
+ * Send RPC call to get the block info from the container.
+ * @return List of chunks in this block.
+ */
+ protected List<ChunkInfo> getChunkInfos() throws IOException {
+ // irrespective of the container state, we will always read via Standalone
+ // protocol.
+ if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
+ pipeline = Pipeline.newBuilder(pipeline)
+ .setType(HddsProtos.ReplicationType.STAND_ALONE).build();
}
+ xceiverClient = xceiverClientManager.acquireClient(pipeline);
+ boolean success = false;
+ List<ChunkInfo> chunks;
+ try {
+ LOG.debug("Initializing BlockInputStream for get key to access {}",
+ blockID.getContainerID());
- if (blockStreamEOF()) {
- // smart consumers determine EOF by calling getPos()
- // so we release buffers when serving the final bytes of data
- releaseBuffers();
+ if (token != null) {
+ UserGroupInformation.getCurrentUser().addToken(token);
+ }
+ DatanodeBlockID datanodeBlockID = blockID
+ .getDatanodeBlockIDProtobuf();
+ GetBlockResponseProto response = ContainerProtocolCalls
+ .getBlock(xceiverClient, datanodeBlockID, traceID);
+
+ chunks = response.getBlockData().getChunksList();
+ success = true;
+ } finally {
+ if (!success) {
+ xceiverClientManager.releaseClient(xceiverClient, false);
+ }
}
- return total;
+ return chunks;
}
/**
- * Determines if all data in the stream has been consumed.
- *
- * @return true if EOF, false if more data is available
+ * Append another ChunkInputStream to the end of the list. Note that the
+ * ChunkInputStream is only created here. The chunk will be read from the
+ * Datanode only when a read operation is performed on for that chunk.
*/
- protected boolean blockStreamEOF() {
- if (buffersHaveData() || chunksRemaining()) {
- return false;
- } else {
- // if there are any chunks, we better be at the last chunk for EOF
- Preconditions.checkState(((chunks == null) || chunks.isEmpty() ||
- chunkIndex == (chunks.size() - 1)),
- "EOF detected, but not at the last chunk");
- return true;
- }
- }
-
- private void releaseBuffers() {
- //ashes to ashes, dust to dust
- buffers = null;
- bufferIndex = 0;
+ protected synchronized void addStream(ChunkInfo chunkInfo) {
+ chunkStreams.add(new ChunkInputStream(chunkInfo, blockID, traceID,
+ xceiverClient, verifyChecksum));
}
- @Override
- public synchronized void close() {
- if (xceiverClientManager != null && xceiverClient != null) {
- xceiverClientManager.releaseClient(xceiverClient, false);
- xceiverClientManager = null;
- xceiverClient = null;
- }
+ public synchronized long getRemaining() throws IOException {
+ return length - getPos();
}
/**
- * Checks if the stream is open. If not, throws an exception.
- *
- * @throws IOException if stream is closed
+ * {@inheritDoc}
*/
- private synchronized void checkOpen() throws IOException {
- if (xceiverClient == null) {
- throw new IOException("BlockInputStream has been closed.");
+ @Override
+ public synchronized int read() throws IOException {
+ byte[] buf = new byte[1];
+ if (read(buf, 0, 1) == EOF) {
+ return EOF;
}
+ return Byte.toUnsignedInt(buf[0]);
}
/**
- * Prepares to read by advancing through chunks and buffers as needed until it
- * finds data to return or encounters EOF.
- *
- * @param len desired length of data to read
- * @return length of data available to read, possibly less than desired length
+ * {@inheritDoc}
*/
- private synchronized int prepareRead(int len) throws IOException {
- for (;;) {
- if (!buffersAllocated()) {
- // The current chunk at chunkIndex has not been read from the
- // container. Read the chunk and put the data into buffers.
- readChunkFromContainer();
- }
- if (buffersHaveData()) {
- // Data is available from buffers
- ByteBuffer bb = buffers.get(bufferIndex);
- return len > bb.remaining() ? bb.remaining() : len;
- } else if (chunksRemaining()) {
- // There are additional chunks available.
- // Read the next chunk in the block.
- chunkIndex += 1;
- readChunkFromContainer();
- } else {
- // All available input has been consumed.
- return EOF;
- }
+ @Override
+ public synchronized int read(byte[] b, int off, int len) throws IOException {
+ if (b == null) {
+ throw new NullPointerException();
}
- }
-
- private boolean buffersAllocated() {
- if (buffers == null || buffers.isEmpty()) {
- return false;
+ if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
}
- return true;
- }
-
- private boolean buffersHaveData() {
- boolean hasData = false;
-
- if (buffersAllocated()) {
- while (bufferIndex < (buffers.size())) {
- if (buffers.get(bufferIndex).hasRemaining()) {
- // current buffer has data
- hasData = true;
- break;
- } else {
- if (buffersRemaining()) {
- // move to next available buffer
- ++bufferIndex;
- Preconditions.checkState(bufferIndex < buffers.size());
- } else {
- // no more buffers remaining
- break;
- }
- }
- }
+ if (len == 0) {
+ return 0;
}
- return hasData;
- }
+ if (!initialized) {
+ initialize();
+ }
- private boolean buffersRemaining() {
- return (bufferIndex < (buffers.size() - 1));
- }
+ checkOpen();
+ int totalReadLen = 0;
+ while (len > 0) {
+ // if we are at the last chunk and have read the entire chunk, return
+ if (chunkStreams.size() == 0 ||
+ (chunkStreams.size() - 1 <= chunkIndex &&
+ chunkStreams.get(chunkIndex)
+ .getRemaining() == 0)) {
+ return totalReadLen == 0 ? EOF : totalReadLen;
+ }
- private boolean chunksRemaining() {
- if ((chunks == null) || chunks.isEmpty()) {
- return false;
- }
- // Check if more chunks are remaining in the stream after chunkIndex
- if (chunkIndex < (chunks.size() - 1)) {
- return true;
+ // Get the current chunkStream and read data from it
+ ChunkInputStream current = chunkStreams.get(chunkIndex);
+ int numBytesToRead = Math.min(len, (int)current.getRemaining());
+ int numBytesRead = current.read(b, off, numBytesToRead);
+ if (numBytesRead != numBytesToRead) {
+ // This implies that there is either data loss or corruption in the
+ // chunk entries. Even EOF in the current stream would be covered in
+ // this case.
+ throw new IOException(String.format(
+ "Inconsistent read for chunkName=%s length=%d numBytesRead=%d",
+ current.getChunkName(), current.getLength(), numBytesRead));
+ }
+ totalReadLen += numBytesRead;
+ off += numBytesRead;
+ len -= numBytesRead;
+ if (current.getRemaining() <= 0 &&
+ ((chunkIndex + 1) < chunkStreams.size())) {
+ chunkIndex += 1;
+ }
}
- // ChunkIndex is the last chunk in the stream. Check if this chunk has
- // been read from container or not. Return true if chunkIndex has not
- // been read yet and false otherwise.
- return chunkIndexOfCurrentBuffer != chunkIndex;
+ return totalReadLen;
}
/**
- * Attempts to read the chunk at the specified offset in the chunk list. If
- * successful, then the data of the read chunk is saved so that its bytes can
- * be returned from subsequent read calls.
+ * Seeks the BlockInputStream to the specified position. If the stream is
+ * not initialized, save the seeked position via blockPosition. Otherwise,
+ * update the position in 2 steps:
+ * 1. Updating the chunkIndex to the chunkStream corresponding to the
+ * seeked position.
+ * 2. Seek the corresponding chunkStream to the adjusted position.
*
- * @throws IOException if there is an I/O error while performing the call
+ * Let’s say we have chunk size as 40 bytes. And let's say the parent block
+ * stores data from index 200 and has length 400. If the key was seeked to
+ * position 90, then this block will be seeked to position 90.
+ * When seek(90) is called on this blockStream, then
+ * 1. chunkIndex will be set to 2 (as indices 80 - 120 reside in chunk[2]).
+ * 2. chunkStream[2] will be seeked to position 10
+ * (= 90 - chunkOffset[2] (= 80)).
*/
- private synchronized void readChunkFromContainer() throws IOException {
- // Read the chunk at chunkIndex
- final ChunkInfo chunkInfo = chunks.get(chunkIndex);
- ByteString byteString;
- byteString = readChunk(chunkInfo);
- buffers = byteString.asReadOnlyByteBufferList();
- bufferIndex = 0;
- chunkIndexOfCurrentBuffer = chunkIndex;
-
- // The bufferIndex and position might need to be adjusted if seek() was
- // called on the stream before. This needs to be done so that the buffer
- // position can be advanced to the 'seeked' position.
- adjustBufferIndex();
- }
-
- /**
- * Send RPC call to get the chunk from the container.
- */
- @VisibleForTesting
- protected ByteString readChunk(final ChunkInfo chunkInfo)
- throws IOException {
- ReadChunkResponseProto readChunkResponse;
- try {
- List<CheckedBiFunction> validators =
- ContainerProtocolCalls.getValidatorList();
- validators.add(validator);
- readChunkResponse = ContainerProtocolCalls
- .readChunk(xceiverClient, chunkInfo, blockID, traceID, validators);
- } catch (IOException e) {
- if (e instanceof StorageContainerException) {
- throw e;
- }
- throw new IOException("Unexpected OzoneException: " + e.toString(), e);
- }
- return readChunkResponse.getData();
- }
-
- @VisibleForTesting
- protected List<DatanodeDetails> getDatanodeList() {
- return xceiverClient.getPipeline().getNodes();
- }
-
- private CheckedBiFunction<ContainerCommandRequestProto,
- ContainerCommandResponseProto, IOException> validator =
- (request, response) -> {
- ReadChunkResponseProto readChunkResponse = response.getReadChunk();
- final ChunkInfo chunkInfo = readChunkResponse.getChunkData();
- ByteString byteString = readChunkResponse.getData();
- if (byteString.size() != chunkInfo.getLen()) {
- // Bytes read from chunk should be equal to chunk size.
- throw new OzoneChecksumException(String
- .format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
- chunkInfo.getChunkName(), chunkInfo.getLen(),
- byteString.size()));
- }
- ChecksumData checksumData =
- ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
- if (verifyChecksum) {
- Checksum.verifyChecksum(byteString, checksumData);
- }
- };
-
@Override
public synchronized void seek(long pos) throws IOException {
- if (pos < 0 || (chunks.size() == 0 && pos > 0)
- || pos >= chunkOffset[chunks.size() - 1] + chunks.get(chunks.size() - 1)
- .getLen()) {
- throw new EOFException("EOF encountered pos: " + pos + " container key: "
- + blockID.getLocalID());
+ if (!initialized) {
+ // Stream has not been initialized yet. Save the position so that it
+ // can be seeked when the stream is initialized.
+ blockPosition = pos;
+ return;
+ }
+
+ checkOpen();
+ if (pos < 0 || pos >= length) {
+ if (pos == 0) {
+ // It is possible for length and pos to be zero in which case
+ // seek should return instead of throwing exception
+ return;
+ }
+ throw new EOFException(
+ "EOF encountered at pos: " + pos + " for block: " + blockID);
}
- if (pos < chunkOffset[chunkIndex]) {
- chunkIndex = Arrays.binarySearch(chunkOffset, 0, chunkIndex, pos);
- } else if (pos >= chunkOffset[chunkIndex] + chunks.get(chunkIndex)
- .getLen()) {
+ if (chunkIndex >= chunkStreams.size()) {
+ chunkIndex = Arrays.binarySearch(chunkOffsets, pos);
+ } else if (pos < chunkOffsets[chunkIndex]) {
chunkIndex =
- Arrays.binarySearch(chunkOffset, chunkIndex + 1, chunks.size(), pos);
+ Arrays.binarySearch(chunkOffsets, 0, chunkIndex, pos);
+ } else if (pos >= chunkOffsets[chunkIndex] + chunkStreams
+ .get(chunkIndex).getLength()) {
+ chunkIndex = Arrays.binarySearch(chunkOffsets,
+ chunkIndex + 1, chunkStreams.size(), pos);
}
if (chunkIndex < 0) {
// Binary search returns -insertionPoint - 1 if element is not present
// in the array. insertionPoint is the point at which element would be
// inserted in the sorted array. We need to adjust the chunkIndex
// accordingly so that chunkIndex = insertionPoint - 1
- chunkIndex = -chunkIndex -2;
+ chunkIndex = -chunkIndex - 2;
}
- // The bufferPosition should be adjusted to account for the chunk offset
- // of the chunk the the pos actually points to.
- bufferPosition = pos - chunkOffset[chunkIndex];
+ // Reset the previous chunkStream's position
+ chunkStreams.get(chunkIndexOfPrevPosition).resetPosition();
- // Check if current buffers correspond to the chunk index being seeked
- // and if the buffers have any data.
- if (chunkIndex == chunkIndexOfCurrentBuffer && buffersAllocated()) {
- // Position the buffer to the seeked position.
- adjustBufferIndex();
- } else {
- // Release the current buffers. The next readChunkFromContainer will
- // read the required chunk and position the buffer to the seeked
- // position.
- releaseBuffers();
- }
+ // seek to the proper offset in the ChunkInputStream
+ chunkStreams.get(chunkIndex).seek(pos - chunkOffsets[chunkIndex]);
+ chunkIndexOfPrevPosition = chunkIndex;
}
- private void adjustBufferIndex() {
- if (bufferPosition == -1) {
- // The stream has not been seeked to a position. No need to adjust the
- // buffer Index and position.
- return;
+ @Override
+ public synchronized long getPos() throws IOException {
+ if (length == 0) {
+ return 0;
}
- // The bufferPosition is w.r.t the buffers for current chunk.
- // Adjust the bufferIndex and position to the seeked position.
- long tempOffest = 0;
- for (int i = 0; i < buffers.size(); i++) {
- if (bufferPosition - tempOffest >= buffers.get(i).capacity()) {
- tempOffest += buffers.get(i).capacity();
- } else {
- bufferIndex = i;
- break;
- }
+
+ if (!initialized) {
+ // The stream is not initialized yet. Return the blockPosition
+ return blockPosition;
+ } else {
+ return chunkOffsets[chunkIndex] + chunkStreams.get(chunkIndex).getPos();
}
- buffers.get(bufferIndex).position((int) (bufferPosition - tempOffest));
- // Reset the bufferPosition as the seek() operation has been completed.
- bufferPosition = -1;
}
@Override
- public synchronized long getPos() throws IOException {
- // position = chunkOffset of current chunk (at chunkIndex) + position of
- // the buffer corresponding to the chunk.
- long bufferPos = 0;
-
- if (bufferPosition >= 0) {
- // seek has been called but the buffers were empty. Hence, the buffer
- // position will be advanced after the buffers are filled.
- // We return the chunkOffset + bufferPosition here as that will be the
- // position of the buffer pointer after reading the chunk file.
- bufferPos = bufferPosition;
-
- } else if (blockStreamEOF()) {
- // all data consumed, buffers have been released.
- // get position from the chunk offset and chunk length of last chunk
- bufferPos = chunks.get(chunkIndex).getLen();
-
- } else if (buffersAllocated()) {
- // get position from available buffers of current chunk
- bufferPos = buffers.get(bufferIndex).position();
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+ @Override
+ public synchronized void close() {
+ if (xceiverClientManager != null && xceiverClient != null) {
+ xceiverClientManager.releaseClient(xceiverClient, false);
+ xceiverClientManager = null;
+ xceiverClient = null;
}
+ }
- return chunkOffset[chunkIndex] + bufferPos;
+ public synchronized void resetPosition() {
+ this.blockPosition = 0;
}
- @Override
- public boolean seekToNewSource(long targetPos) throws IOException {
- return false;
+ /**
+ * Checks if the stream is open. If not, throw an exception.
+ *
+ * @throws IOException if stream is closed
+ */
+ protected synchronized void checkOpen() throws IOException {
+ if (xceiverClient == null) {
+ throw new IOException("BlockInputStream has been closed.");
+ }
}
public BlockID getBlockID() {
return blockID;
}
+ public long getLength() {
+ return length;
+ }
+
@VisibleForTesting
- protected int getChunkIndex() {
+ synchronized int getChunkIndex() {
return chunkIndex;
}
+
+ @VisibleForTesting
+ synchronized long getBlockPosition() {
+ return blockPosition;
+ }
+
+ @VisibleForTesting
+ synchronized List<ChunkInputStream> getChunkStreams() {
+ return chunkStreams;
+ }
}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
new file mode 100644
index 0000000..8d30c22
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * An {@link InputStream} called from BlockInputStream to read a chunk from the
+ * container. Each chunk may contain multiple underlying {@link ByteBuffer}
+ * instances.
+ */
+public class ChunkInputStream extends InputStream implements Seekable {
+
+ private ChunkInfo chunkInfo;
+ private final long length;
+ private final BlockID blockID;
+ private final String traceID;
+ private XceiverClientSpi xceiverClient;
+ private boolean verifyChecksum;
+ private boolean allocated = false;
+
+ // Buffer to store the chunk data read from the DN container
+ private List<ByteBuffer> buffers;
+
+ // Index of the buffers corresponding to the current position of the buffers
+ private int bufferIndex;
+
+ // The offset of the current data residing in the buffers w.r.t the start
+ // of chunk data
+ private long bufferOffset;
+
+ // The number of bytes of chunk data residing in the buffers currently
+ private long bufferLength;
+
+ // Position of the ChunkInputStream is maintained by this variable (if a
+ // seek is performed. This position is w.r.t to the chunk only and not the
+ // block or key. This variable is set only if either the buffers are not
+ // yet allocated or the if the allocated buffers do not cover the seeked
+ // position. Once the chunk is read, this variable is reset.
+ private long chunkPosition = -1;
+
+ private static final int EOF = -1;
+
+ ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId,
+ String traceId, XceiverClientSpi xceiverClient, boolean verifyChecksum) {
+ this.chunkInfo = chunkInfo;
+ this.length = chunkInfo.getLen();
+ this.blockID = blockId;
+ this.traceID = traceId;
+ this.xceiverClient = xceiverClient;
+ this.verifyChecksum = verifyChecksum;
+ }
+
+ public synchronized long getRemaining() throws IOException {
+ return length - getPos();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public synchronized int read() throws IOException {
+ checkOpen();
+ int available = prepareRead(1);
+ int dataout = EOF;
+
+ if (available == EOF) {
+ // There is no more data in the chunk stream. The buffers should have
+ // been released by now
+ Preconditions.checkState(buffers == null);
+ } else {
+ dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
+ }
+
+ if (chunkStreamEOF()) {
+ // consumer might use getPos to determine EOF,
+ // so release buffers when serving the last byte of data
+ releaseBuffers();
+ }
+
+ return dataout;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public synchronized int read(byte[] b, int off, int len) throws IOException {
+ // According to the JavaDocs for InputStream, it is recommended that
+ // subclasses provide an override of bulk read if possible for performance
+ // reasons. In addition to performance, we need to do it for correctness
+ // reasons. The Ozone REST service uses PipedInputStream and
+ // PipedOutputStream to relay HTTP response data between a Jersey thread and
+ // a Netty thread. It turns out that PipedInputStream/PipedOutputStream
+ // have a subtle dependency (bug?) on the wrapped stream providing separate
+ // implementations of single-byte read and bulk read. Without this, get key
+ // responses might close the connection before writing all of the bytes
+ // advertised in the Content-Length.
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+ if (len == 0) {
+ return 0;
+ }
+ checkOpen();
+ int total = 0;
+ while (len > 0) {
+ int available = prepareRead(len);
+ if (available == EOF) {
+ // There is no more data in the chunk stream. The buffers should have
+ // been released by now
+ Preconditions.checkState(buffers == null);
+ return total != 0 ? total : EOF;
+ }
+ buffers.get(bufferIndex).get(b, off + total, available);
+ len -= available;
+ total += available;
+ }
+
+ if (chunkStreamEOF()) {
+ // smart consumers determine EOF by calling getPos()
+ // so we release buffers when serving the final bytes of data
+ releaseBuffers();
+ }
+
+ return total;
+ }
+
+ /**
+ * Seeks the ChunkInputStream to the specified position. This is done by
+ * updating the chunkPosition to the seeked position in case the buffers
+ * are not allocated or buffers do not contain the data corresponding to
+ * the seeked position (determined by buffersHavePosition()). Otherwise,
+ * the buffers position is updated to the seeked position.
+ */
+ @Override
+ public synchronized void seek(long pos) throws IOException {
+ if (pos < 0 || pos >= length) {
+ if (pos == 0) {
+ // It is possible for length and pos to be zero in which case
+ // seek should return instead of throwing exception
+ return;
+ }
+ throw new EOFException("EOF encountered at pos: " + pos + " for chunk: "
+ + chunkInfo.getChunkName());
+ }
+
+ if (buffersHavePosition(pos)) {
+ // The bufferPosition is w.r.t the current chunk.
+ // Adjust the bufferIndex and position to the seeked position.
+ adjustBufferPosition(pos - bufferOffset);
+ } else {
+ chunkPosition = pos;
+ }
+ }
+
+ @Override
+ public synchronized long getPos() throws IOException {
+ if (chunkPosition >= 0) {
+ return chunkPosition;
+ }
+ if (chunkStreamEOF()) {
+ return length;
+ }
+ if (buffersHaveData()) {
+ return bufferOffset + buffers.get(bufferIndex).position();
+ }
+ if (buffersAllocated()) {
+ return bufferOffset + bufferLength;
+ }
+ return 0;
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+
+ @Override
+ public synchronized void close() {
+ if (xceiverClient != null) {
+ xceiverClient = null;
+ }
+ }
+
+ /**
+ * Checks if the stream is open. If not, throw an exception.
+ *
+ * @throws IOException if stream is closed
+ */
+ protected synchronized void checkOpen() throws IOException {
+ if (xceiverClient == null) {
+ throw new IOException("BlockInputStream has been closed.");
+ }
+ }
+
+ /**
+ * Prepares to read by advancing through buffers or allocating new buffers,
+ * as needed until it finds data to return, or encounters EOF.
+ * @param len desired lenght of data to read
+ * @return length of data available to read, possibly less than desired length
+ */
+ private synchronized int prepareRead(int len) throws IOException {
+ for (;;) {
+ if (chunkPosition >= 0) {
+ if (buffersHavePosition(chunkPosition)) {
+ // The current buffers have the seeked position. Adjust the buffer
+ // index and position to point to the chunkPosition.
+ adjustBufferPosition(chunkPosition - bufferOffset);
+ } else {
+ // Read a required chunk data to fill the buffers with seeked
+ // position data
+ readChunkFromContainer(len);
+ }
+ }
+ if (buffersHaveData()) {
+ // Data is available from buffers
+ ByteBuffer bb = buffers.get(bufferIndex);
+ return len > bb.remaining() ? bb.remaining() : len;
+ } else if (dataRemainingInChunk()) {
+ // There is more data in the chunk stream which has not
+ // been read into the buffers yet.
+ readChunkFromContainer(len);
+ } else {
+ // All available input from this chunk stream has been consumed.
+ return EOF;
+ }
+ }
+ }
+
+ /**
+ * Reads full or partial Chunk from DN Container based on the current
+ * position of the ChunkInputStream, the number of bytes of data to read
+ * and the checksum boundaries.
+ * If successful, then the read data in saved in the buffers so that
+ * subsequent read calls can utilize it.
+ * @param len number of bytes of data to be read
+ * @throws IOException if there is an I/O error while performing the call
+ * to Datanode
+ */
+ private synchronized void readChunkFromContainer(int len) throws IOException {
+
+ // index of first byte to be read from the chunk
+ long startByteIndex;
+ if (chunkPosition >= 0) {
+ // If seek operation was called to advance the buffer position, the
+ // chunk should be read from that position onwards.
+ startByteIndex = chunkPosition;
+ } else {
+ // Start reading the chunk from the last chunkPosition onwards.
+ startByteIndex = bufferOffset + bufferLength;
+ }
+
+ if (verifyChecksum) {
+ // Update the bufferOffset and bufferLength as per the checksum
+ // boundary requirement.
+ computeChecksumBoundaries(startByteIndex, len);
+ } else {
+ // Read from the startByteIndex
+ bufferOffset = startByteIndex;
+ bufferLength = len;
+ }
+
+ // Adjust the chunkInfo so that only the required bytes are read from
+ // the chunk.
+ final ChunkInfo adjustedChunkInfo = ChunkInfo.newBuilder(chunkInfo)
+ .setOffset(bufferOffset)
+ .setLen(bufferLength)
+ .build();
+
+ ByteString byteString = readChunk(adjustedChunkInfo);
+
+ buffers = byteString.asReadOnlyByteBufferList();
+ bufferIndex = 0;
+ allocated = true;
+
+ // If the stream was seeked to position before, then the buffer
+ // position should be adjusted as the reads happen at checksum boundaries.
+ // The buffers position might need to be adjusted for the following
+ // scenarios:
+ // 1. Stream was seeked to a position before the chunk was read
+ // 2. Chunk was read from index < the current position to account for
+ // checksum boundaries.
+ adjustBufferPosition(startByteIndex - bufferOffset);
+ }
+
+ /**
+ * Send RPC call to get the chunk from the container.
+ */
+ @VisibleForTesting
+ protected ByteString readChunk(ChunkInfo readChunkInfo) throws IOException {
+ ReadChunkResponseProto readChunkResponse;
+
+ try {
+ List<CheckedBiFunction> validators =
+ ContainerProtocolCalls.getValidatorList();
+ validators.add(validator);
+
+ readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient,
+ readChunkInfo, blockID, traceID, validators);
+
+ } catch (IOException e) {
+ if (e instanceof StorageContainerException) {
+ throw e;
+ }
+ throw new IOException("Unexpected OzoneException: " + e.toString(), e);
+ }
+
+ return readChunkResponse.getData();
+ }
+
+ private CheckedBiFunction<ContainerCommandRequestProto,
+ ContainerCommandResponseProto, IOException> validator =
+ (request, response) -> {
+ final ChunkInfo reqChunkInfo =
+ request.getReadChunk().getChunkData();
+
+ ReadChunkResponseProto readChunkResponse = response.getReadChunk();
+ ByteString byteString = readChunkResponse.getData();
+
+ if (byteString.size() != reqChunkInfo.getLen()) {
+ // Bytes read from chunk should be equal to chunk size.
+ throw new OzoneChecksumException(String
+ .format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
+ reqChunkInfo.getChunkName(), reqChunkInfo.getLen(),
+ byteString.size()));
+ }
+
+ if (verifyChecksum) {
+ ChecksumData checksumData = ChecksumData.getFromProtoBuf(
+ chunkInfo.getChecksumData());
+
+ // ChecksumData stores checksum for each 'numBytesPerChecksum'
+ // number of bytes in a list. Compute the index of the first
+ // checksum to match with the read data
+
+ int checkumStartIndex = (int) (reqChunkInfo.getOffset() /
+ checksumData.getBytesPerChecksum());
+ Checksum.verifyChecksum(
+ byteString, checksumData, checkumStartIndex);
+ }
+ };
+
+ /**
+ * Return the offset and length of bytes that need to be read from the
+ * chunk file to cover the checksum boundaries covering the actual start and
+ * end of the chunk index to be read.
+ * For example, lets say the client is reading from index 120 to 450 in the
+ * chunk. And let's say checksum is stored for every 100 bytes in the chunk
+ * i.e. the first checksum is for bytes from index 0 to 99, the next for
+ * bytes from index 100 to 199 and so on. To verify bytes from 120 to 450,
+ * we would need to read from bytes 100 to 499 so that checksum
+ * verification can be done.
+ *
+ * @param startByteIndex the first byte index to be read by client
+ * @param dataLen number of bytes to be read from the chunk
+ */
+ private void computeChecksumBoundaries(long startByteIndex, int dataLen) {
+
+ int bytesPerChecksum = chunkInfo.getChecksumData().getBytesPerChecksum();
+ // index of the last byte to be read from chunk, inclusively.
+ final long endByteIndex = startByteIndex + dataLen - 1;
+
+ bufferOffset = (startByteIndex / bytesPerChecksum)
+ * bytesPerChecksum; // inclusive
+ final long endIndex = ((endByteIndex / bytesPerChecksum) + 1)
+ * bytesPerChecksum; // exclusive
+ bufferLength = Math.min(endIndex, length) - bufferOffset;
+ }
+
+ /**
+ * Adjust the buffers position to account for seeked position and/ or checksum
+ * boundary reads.
+ * @param bufferPosition the position to which the buffers must be advanced
+ */
+ private void adjustBufferPosition(long bufferPosition) {
+ // The bufferPosition is w.r.t the current chunk.
+ // Adjust the bufferIndex and position to the seeked chunkPosition.
+ long tempOffest = 0;
+ for (int i = 0; i < buffers.size(); i++) {
+ if (bufferPosition - tempOffest >= buffers.get(i).capacity()) {
+ tempOffest += buffers.get(i).capacity();
+ } else {
+ bufferIndex = i;
+ break;
+ }
+ }
+ buffers.get(bufferIndex).position((int) (bufferPosition - tempOffest));
+
+ // Reset the chunkPosition as chunk stream has been initialized i.e. the
+ // buffers have been allocated.
+ resetPosition();
+ }
+
+ /**
+ * Check if the buffers have been allocated data and false otherwise.
+ */
+ private boolean buffersAllocated() {
+ return buffers != null && !buffers.isEmpty();
+ }
+
+ /**
+ * Check if the buffers have any data remaining between the current
+ * position and the limit.
+ */
+ private boolean buffersHaveData() {
+ boolean hasData = false;
+
+ if (buffersAllocated()) {
+ while (bufferIndex < (buffers.size())) {
+ if (buffers.get(bufferIndex).hasRemaining()) {
+ // current buffer has data
+ hasData = true;
+ break;
+ } else {
+ if (buffersRemaining()) {
+ // move to next available buffer
+ ++bufferIndex;
+ Preconditions.checkState(bufferIndex < buffers.size());
+ } else {
+ // no more buffers remaining
+ break;
+ }
+ }
+ }
+ }
+
+ return hasData;
+ }
+
+ private boolean buffersRemaining() {
+ return (bufferIndex < (buffers.size() - 1));
+ }
+
+ /**
+ * Check if curernt buffers have the data corresponding to the input position.
+ */
+ private boolean buffersHavePosition(long pos) {
+ // Check if buffers have been allocated
+ if (buffersAllocated()) {
+ // Check if the current buffers cover the input position
+ return pos >= bufferOffset &&
+ pos < bufferOffset + bufferLength;
+ }
+ return false;
+ }
+
+ /**
+ * Check if there is more data in the chunk which has not yet been read
+ * into the buffers.
+ */
+ private boolean dataRemainingInChunk() {
+ long bufferPos;
+ if (chunkPosition >= 0) {
+ bufferPos = chunkPosition;
+ } else {
+ bufferPos = bufferOffset + bufferLength;
+ }
+
+ return bufferPos < length;
+ }
+
+ /**
+ * Check if end of chunkStream has been reached.
+ */
+ private boolean chunkStreamEOF() {
+ if (!allocated) {
+ // Chunk data has not been read yet
+ return false;
+ }
+
+ if (buffersHaveData() || dataRemainingInChunk()) {
+ return false;
+ } else {
+ Preconditions.checkState(bufferOffset + bufferLength == length,
+ "EOF detected, but not at the last byte of the chunk");
+ return true;
+ }
+ }
+
+ /**
+ * If EOF is reached, release the buffers.
+ */
+ private void releaseBuffers() {
+ buffers = null;
+ bufferIndex = 0;
+ }
+
+ /**
+ * Reset the chunkPosition once the buffers are allocated.
+ */
+ void resetPosition() {
+ this.chunkPosition = -1;
+ }
+
+ String getChunkName() {
+ return chunkInfo.getChunkName();
+ }
+
+ protected long getLength() {
+ return length;
+ }
+
+ @VisibleForTesting
+ protected long getChunkPosition() {
+ return chunkPosition;
+ }
+}
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
index b6ceb2b..a1985f0 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -1,32 +1,33 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
+
package org.apache.hadoop.hdds.scm.storage;
+import com.google.common.primitives.Bytes;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ChecksumData;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ChecksumType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.security.token.Token;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -34,106 +35,127 @@ import org.junit.Test;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
-import java.util.UUID;
+
+import static org.apache.hadoop.hdds.scm.storage.TestChunkInputStream.generateRandomData;
/**
- * Tests {@link BlockInputStream}.
+ * Tests for {@link BlockInputStream}'s functionality.
*/
public class TestBlockInputStream {
- private static BlockInputStream blockInputStream;
- private static List<ChunkInfo> chunks;
- private static int blockSize;
+ private static final int CHUNK_SIZE = 100;
+ private static Checksum checksum;
- private static final int CHUNK_SIZE = 20;
+ private BlockInputStream blockStream;
+ private byte[] blockData;
+ private int blockSize;
+ private List<ChunkInfo> chunks;
+ private Map<String, byte[]> chunkDataMap;
@Before
public void setup() throws Exception {
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
- chunks = createChunkList(10);
- String traceID = UUID.randomUUID().toString();
- blockInputStream = new DummyBlockInputStream(blockID, null, null, chunks,
- traceID, false, 0);
-
- blockSize = 0;
- for (ChunkInfo chunk : chunks) {
- blockSize += chunk.getLen();
- }
+ checksum = new Checksum(ChecksumType.NONE, CHUNK_SIZE);
+ createChunkList(5);
+
+ blockStream = new DummyBlockInputStream(blockID, blockSize, null, null,
+ false, null, null);
}
/**
* Create a mock list of chunks. The first n-1 chunks of length CHUNK_SIZE
* and the last chunk with length CHUNK_SIZE/2.
- * @param numChunks
- * @return
*/
- private static List<ChunkInfo> createChunkList(int numChunks) {
- ChecksumData dummyChecksumData = ChecksumData.newBuilder()
- .setType(ChecksumType.NONE)
- .setBytesPerChecksum(100)
- .build();
- List<ChunkInfo> chunkList = new ArrayList<>(numChunks);
- int i;
- for (i = 0; i < numChunks - 1; i++) {
- String chunkName = "chunk-" + i;
+ private void createChunkList(int numChunks)
+ throws Exception {
+
+ chunks = new ArrayList<>(numChunks);
+ chunkDataMap = new HashMap<>();
+ blockData = new byte[0];
+ int i, chunkLen;
+ byte[] byteData;
+ String chunkName;
+
+ for (i = 0; i < numChunks; i++) {
+ chunkName = "chunk-" + i;
+ chunkLen = CHUNK_SIZE;
+ if (i == numChunks - 1) {
+ chunkLen = CHUNK_SIZE / 2;
+ }
+ byteData = generateRandomData(chunkLen);
ChunkInfo chunkInfo = ChunkInfo.newBuilder()
.setChunkName(chunkName)
.setOffset(0)
- .setLen(CHUNK_SIZE)
- .setChecksumData(dummyChecksumData)
+ .setLen(chunkLen)
+ .setChecksumData(checksum.computeChecksum(
+ byteData, 0, chunkLen).getProtoBufMessage())
.build();
- chunkList.add(chunkInfo);
+
+ chunkDataMap.put(chunkName, byteData);
+ chunks.add(chunkInfo);
+
+ blockSize += chunkLen;
+ blockData = Bytes.concat(blockData, byteData);
}
- ChunkInfo chunkInfo = ChunkInfo.newBuilder()
- .setChunkName("chunk-" + i)
- .setOffset(0)
- .setLen(CHUNK_SIZE/2)
- .setChecksumData(dummyChecksumData)
- .build();
- chunkList.add(chunkInfo);
-
- return chunkList;
}
/**
- * A dummy BlockInputStream to test the functionality of BlockInputStream.
+ * A dummy BlockInputStream to mock read block call to DN.
*/
- private static class DummyBlockInputStream extends BlockInputStream {
+ private class DummyBlockInputStream extends BlockInputStream {
- DummyBlockInputStream(BlockID blockID,
- XceiverClientManager xceiverClientManager,
- XceiverClientSpi xceiverClient,
- List<ChunkInfo> chunks,
- String traceID,
+ DummyBlockInputStream(BlockID blockId,
+ long blockLen,
+ Pipeline pipeline,
+ Token<OzoneBlockTokenIdentifier> token,
boolean verifyChecksum,
- long initialPosition) throws IOException {
- super(blockID, xceiverClientManager, xceiverClient, chunks, traceID,
- verifyChecksum, initialPosition);
+ String traceId,
+ XceiverClientManager xceiverClientManager) {
+ super(blockId, blockLen, pipeline, token, verifyChecksum,
+ traceId, xceiverClientManager);
}
@Override
- protected ByteString readChunk(final ChunkInfo chunkInfo)
- throws IOException {
- return getByteString(chunkInfo.getChunkName(), (int) chunkInfo.getLen());
+ protected List<ChunkInfo> getChunkInfos() {
+ return chunks;
}
@Override
- protected List<DatanodeDetails> getDatanodeList() {
- // return an empty dummy list of size 10
- return new ArrayList<>(10);
+ protected void addStream(ChunkInfo chunkInfo) {
+ TestChunkInputStream testChunkInputStream = new TestChunkInputStream();
+ getChunkStreams().add(testChunkInputStream.new DummyChunkInputStream(
+ chunkInfo, null, null, null, false,
+ chunkDataMap.get(chunkInfo.getChunkName()).clone()));
}
- /**
- * Create ByteString with the input data to return when a readChunk call is
- * placed.
- */
- private static ByteString getByteString(String data, int length) {
- while (data.length() < length) {
- data = data + "0";
- }
- return ByteString.copyFrom(data.getBytes(), 0, length);
+ @Override
+ protected synchronized void checkOpen() throws IOException {
+ // No action needed
+ }
+ }
+
+ private void seekAndVerify(int pos) throws Exception {
+ blockStream.seek(pos);
+ Assert.assertEquals("Current position of buffer does not match with the " +
+ "seeked position", pos, blockStream.getPos());
+ }
+
+ /**
+ * Match readData with the chunkData byte-wise.
+ * @param readData Data read through ChunkInputStream
+ * @param inputDataStartIndex first index (inclusive) in chunkData to compare
+ * with read data
+ * @param length the number of bytes of data to match starting from
+ * inputDataStartIndex
+ */
+ private void matchWithInputData(byte[] readData, int inputDataStartIndex,
+ int length) {
+ for (int i = inputDataStartIndex; i < inputDataStartIndex + length; i++) {
+ Assert.assertEquals(blockData[i], readData[i - inputDataStartIndex]);
}
}
@@ -143,17 +165,26 @@ public class TestBlockInputStream {
int pos = 0;
seekAndVerify(pos);
Assert.assertEquals("ChunkIndex is incorrect", 0,
- blockInputStream.getChunkIndex());
+ blockStream.getChunkIndex());
+ // Before BlockInputStream is initialized (initialization happens during
+ // read operation), seek should update the BlockInputStream#blockPosition
pos = CHUNK_SIZE;
seekAndVerify(pos);
+ Assert.assertEquals("ChunkIndex is incorrect", 0,
+ blockStream.getChunkIndex());
+ Assert.assertEquals(pos, blockStream.getBlockPosition());
+
+ // Initialize the BlockInputStream. After initializtion, the chunkIndex
+ // should be updated to correspond to the seeked position.
+ blockStream.initialize();
Assert.assertEquals("ChunkIndex is incorrect", 1,
- blockInputStream.getChunkIndex());
+ blockStream.getChunkIndex());
- pos = (CHUNK_SIZE * 5) + 5;
+ pos = (CHUNK_SIZE * 4) + 5;
seekAndVerify(pos);
- Assert.assertEquals("ChunkIndex is incorrect", 5,
- blockInputStream.getChunkIndex());
+ Assert.assertEquals("ChunkIndex is incorrect", 4,
+ blockStream.getChunkIndex());
try {
// Try seeking beyond the blockSize.
@@ -161,7 +192,7 @@ public class TestBlockInputStream {
seekAndVerify(pos);
Assert.fail("Seek to position beyond block size should fail.");
} catch (EOFException e) {
- // Expected
+ System.out.println(e);
}
// Seek to random positions between 0 and the block size.
@@ -173,20 +204,32 @@ public class TestBlockInputStream {
}
@Test
- public void testBlockEOF() throws Exception {
- // Seek to some position < blockSize and verify EOF is not reached.
- seekAndVerify(CHUNK_SIZE);
- Assert.assertFalse(blockInputStream.blockStreamEOF());
-
- // Seek to blockSize-1 and verify that EOF is not reached as the chunk
- // has not been read from container yet.
- seekAndVerify(blockSize-1);
- Assert.assertFalse(blockInputStream.blockStreamEOF());
+ public void testRead() throws Exception {
+ // read 200 bytes of data starting from position 50. Chunk0 contains
+ // indices 0 to 99, chunk1 from 100 to 199 and chunk3 from 200 to 299. So
+ // the read should result in 3 ChunkInputStream reads
+ seekAndVerify(50);
+ byte[] b = new byte[200];
+ blockStream.read(b, 0, 200);
+ matchWithInputData(b, 50, 200);
+
+ // The new position of the blockInputStream should be the last index read
+ // + 1.
+ Assert.assertEquals(250, blockStream.getPos());
+ Assert.assertEquals(2, blockStream.getChunkIndex());
}
- private void seekAndVerify(int pos) throws Exception {
- blockInputStream.seek(pos);
- Assert.assertEquals("Current position of buffer does not match with the " +
- "seeked position", pos, blockInputStream.getPos());
+ @Test
+ public void testSeekAndRead() throws Exception {
+ // Seek to a position and read data
+ seekAndVerify(50);
+ byte[] b1 = new byte[100];
+ blockStream.read(b1, 0, 100);
+ matchWithInputData(b1, 50, 100);
+
+ // Next read should start from the position of the last read + 1 i.e. 100
+ byte[] b2 = new byte[100];
+ blockStream.read(b2, 0, 100);
+ matchWithInputData(b2, 150, 100);
}
}
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
new file mode 100644
index 0000000..b113bc7
--- /dev/null
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.EOFException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Tests for {@link ChunkInputStream}'s functionality.
+ */
+public class TestChunkInputStream {
+
+ private static final int CHUNK_SIZE = 100;
+ private static final int BYTES_PER_CHECKSUM = 20;
+ private static final String CHUNK_NAME = "dummyChunk";
+ private static final Random RANDOM = new Random();
+ private static Checksum checksum;
+
+ private DummyChunkInputStream chunkStream;
+ private ChunkInfo chunkInfo;
+ private byte[] chunkData;
+
+ @Before
+ public void setup() throws Exception {
+ checksum = new Checksum(ChecksumType.valueOf(
+ OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT),
+ BYTES_PER_CHECKSUM);
+
+ chunkData = generateRandomData(CHUNK_SIZE);
+
+ chunkInfo = ChunkInfo.newBuilder()
+ .setChunkName(CHUNK_NAME)
+ .setOffset(0)
+ .setLen(CHUNK_SIZE)
+ .setChecksumData(checksum.computeChecksum(
+ chunkData, 0, CHUNK_SIZE).getProtoBufMessage())
+ .build();
+
+ chunkStream = new DummyChunkInputStream(chunkInfo, null, null, null, true);
+ }
+
+ static byte[] generateRandomData(int length) {
+ byte[] bytes = new byte[length];
+ RANDOM.nextBytes(bytes);
+ return bytes;
+ }
+
+ /**
+ * A dummy ChunkInputStream to mock read chunk calls to DN.
+ */
+ public class DummyChunkInputStream extends ChunkInputStream {
+
+ // Stores the read chunk data in each readChunk call
+ private List<ByteString> readByteBuffers = new ArrayList<>();
+
+ DummyChunkInputStream(ChunkInfo chunkInfo,
+ BlockID blockId,
+ String traceId,
+ XceiverClientSpi xceiverClient,
+ boolean verifyChecksum) {
+ super(chunkInfo, blockId, traceId, xceiverClient, verifyChecksum);
+ }
+
+ public DummyChunkInputStream(ChunkInfo chunkInfo,
+ BlockID blockId,
+ String traceId,
+ XceiverClientSpi xceiverClient,
+ boolean verifyChecksum,
+ byte[] data) {
+ super(chunkInfo, blockId, traceId, xceiverClient, verifyChecksum);
+ chunkData = data;
+ }
+
+ @Override
+ protected ByteString readChunk(ChunkInfo readChunkInfo) {
+ ByteString byteString = ByteString.copyFrom(chunkData,
+ (int) readChunkInfo.getOffset(),
+ (int) readChunkInfo.getLen());
+ readByteBuffers.add(byteString);
+ return byteString;
+ }
+
+ @Override
+ protected void checkOpen() {
+ // No action needed
+ }
+ }
+
+ /**
+ * Match readData with the chunkData byte-wise.
+ * @param readData Data read through ChunkInputStream
+ * @param inputDataStartIndex first index (inclusive) in chunkData to compare
+ * with read data
+ * @param length the number of bytes of data to match starting from
+ * inputDataStartIndex
+ */
+ private void matchWithInputData(byte[] readData, int inputDataStartIndex,
+ int length) {
+ for (int i = inputDataStartIndex; i < inputDataStartIndex + length; i++) {
+ Assert.assertEquals(chunkData[i], readData[i - inputDataStartIndex]);
+ }
+ }
+
+ /**
+ * Seek to a position and verify through getPos().
+ */
+ private void seekAndVerify(int pos) throws Exception {
+ chunkStream.seek(pos);
+ Assert.assertEquals("Current position of buffer does not match with the " +
+ "seeked position", pos, chunkStream.getPos());
+ }
+
+ @Test
+ public void testFullChunkRead() throws Exception {
+ byte[] b = new byte[CHUNK_SIZE];
+ chunkStream.read(b, 0, CHUNK_SIZE);
+
+ matchWithInputData(b, 0, CHUNK_SIZE);
+ }
+
+ @Test
+ public void testPartialChunkRead() throws Exception {
+ int len = CHUNK_SIZE / 2;
+ byte[] b = new byte[len];
+
+ chunkStream.read(b, 0, len);
+
+ matchWithInputData(b, 0, len);
+
+ // To read chunk data from index 0 to 49 (len = 50), we need to read
+ // chunk from offset 0 to 60 as the checksum boundary is at every 20
+ // bytes. Verify that 60 bytes of chunk data are read and stored in the
+ // buffers.
+ matchWithInputData(chunkStream.readByteBuffers.get(0).toByteArray(),
+ 0, 60);
+
+ }
+
+ @Test
+ public void testSeek() throws Exception {
+ seekAndVerify(0);
+
+ try {
+ seekAndVerify(CHUNK_SIZE);
+ Assert.fail("Seeking to Chunk Length should fail.");
+ } catch (EOFException e) {
+ GenericTestUtils.assertExceptionContains("EOF encountered at pos: "
+ + CHUNK_SIZE + " for chunk: " + CHUNK_NAME, e);
+ }
+
+ // Seek before read should update the ChunkInputStream#chunkPosition
+ seekAndVerify(25);
+ Assert.assertEquals(25, chunkStream.getChunkPosition());
+
+ // Read from the seeked position.
+ // Reading from index 25 to 54 should result in the ChunkInputStream
+ // copying chunk data from index 20 to 59 into the buffers (checksum
+ // boundaries).
+ byte[] b = new byte[30];
+ chunkStream.read(b, 0, 30);
+ matchWithInputData(b, 25, 30);
+ matchWithInputData(chunkStream.readByteBuffers.get(0).toByteArray(),
+ 20, 40);
+
+ // After read, the position of the chunkStream is evaluated from the
+ // buffers and the chunkPosition should be reset to -1.
+ Assert.assertEquals(-1, chunkStream.getChunkPosition());
+
+ // Seek to a position within the current buffers. Current buffers contain
+ // data from index 20 to 59. ChunkPosition should still not be used to
+ // set the position.
+ seekAndVerify(35);
+ Assert.assertEquals(-1, chunkStream.getChunkPosition());
+
+ // Seek to a position outside the current buffers. In this case, the
+ // chunkPosition should be updated to the seeked position.
+ seekAndVerify(75);
+ Assert.assertEquals(75, chunkStream.getChunkPosition());
+ }
+
+ @Test
+ public void testSeekAndRead() throws Exception {
+ // Seek to a position and read data
+ seekAndVerify(50);
+ byte[] b1 = new byte[20];
+ chunkStream.read(b1, 0, 20);
+ matchWithInputData(b1, 50, 20);
+
+ // Next read should start from the position of the last read + 1 i.e. 70
+ byte[] b2 = new byte[20];
+ chunkStream.read(b2, 0, 20);
+ matchWithInputData(b2, 70, 20);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
index 1a359fe..0e70515 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
@@ -225,15 +225,17 @@ public class Checksum {
/**
* Computes the ChecksumData for the input data and verifies that it
- * matches with that of the input checksumData.
+ * matches with that of the input checksumData, starting from index
+ * startIndex.
* @param byteString input data
* @param checksumData checksumData to match with
+ * @param startIndex index of first checksum in checksumData to match with
+ * data's computed checksum.
* @throws OzoneChecksumException is thrown if checksums do not match
*/
- public static boolean verifyChecksum(
- ByteString byteString, ChecksumData checksumData)
- throws OzoneChecksumException {
- return verifyChecksum(byteString.toByteArray(), checksumData);
+ public static boolean verifyChecksum(ByteString byteString,
+ ChecksumData checksumData, int startIndex) throws OzoneChecksumException {
+ return verifyChecksum(byteString.toByteArray(), checksumData, startIndex);
}
/**
@@ -245,6 +247,20 @@ public class Checksum {
*/
public static boolean verifyChecksum(byte[] data, ChecksumData checksumData)
throws OzoneChecksumException {
+ return verifyChecksum(data, checksumData, 0);
+ }
+
+ /**
+ * Computes the ChecksumData for the input data and verifies that it
+ * matches with that of the input checksumData.
+ * @param data input data
+ * @param checksumData checksumData to match with
+ * @param startIndex index of first checksum in checksumData to match with
+ * data's computed checksum.
+ * @throws OzoneChecksumException is thrown if checksums do not match
+ */
+ public static boolean verifyChecksum(byte[] data, ChecksumData checksumData,
+ int startIndex) throws OzoneChecksumException {
ChecksumType checksumType = checksumData.getChecksumType();
if (checksumType == ChecksumType.NONE) {
// Checksum is set to NONE. No further verification is required.
@@ -256,7 +272,8 @@ public class Checksum {
ChecksumData computedChecksumData =
checksum.computeChecksum(data, 0, data.length);
- return checksumData.verifyChecksumDataMatches(computedChecksumData);
+ return checksumData.verifyChecksumDataMatches(computedChecksumData,
+ startIndex);
}
/**
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java
index dafa0e3..c0799bb 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java
@@ -111,13 +111,20 @@ public class ChecksumData {
}
/**
- * Verify that this ChecksumData matches with the input ChecksumData.
+ * Verify that this ChecksumData from startIndex to endIndex matches with the
+ * provided ChecksumData.
+ * The checksum at startIndex of this ChecksumData will be matched with the
+ * checksum at index 0 of the provided ChecksumData, and checksum at
+ * (startIndex + 1) of this ChecksumData with checksum at index 1 of
+ * provided ChecksumData and so on.
* @param that the ChecksumData to match with
+ * @param startIndex index of the first checksum from this ChecksumData
+ * which will be used to compare checksums
* @return true if checksums match
* @throws OzoneChecksumException
*/
- public boolean verifyChecksumDataMatches(ChecksumData that) throws
- OzoneChecksumException {
+ public boolean verifyChecksumDataMatches(ChecksumData that, int startIndex)
+ throws OzoneChecksumException {
// pre checks
if (this.checksums.size() == 0) {
@@ -130,18 +137,22 @@ public class ChecksumData {
"checksums");
}
- if (this.checksums.size() != that.checksums.size()) {
- throw new OzoneChecksumException("Original and Computed checksumData's " +
- "has different number of checksums");
- }
+ int numChecksums = that.checksums.size();
- // Verify that checksum matches at each index
- for (int index = 0; index < this.checksums.size(); index++) {
- if (!matchChecksumAtIndex(this.checksums.get(index),
- that.checksums.get(index))) {
- // checksum mismatch. throw exception.
- throw new OzoneChecksumException(index);
+ try {
+ // Verify that checksum matches at each index
+ for (int index = 0; index < numChecksums; index++) {
+ if (!matchChecksumAtIndex(this.checksums.get(startIndex + index),
+ that.checksums.get(index))) {
+ // checksum mismatch. throw exception.
+ throw new OzoneChecksumException(index);
+ }
}
+ } catch (ArrayIndexOutOfBoundsException e) {
+ throw new OzoneChecksumException("Computed checksum has "
+ + numChecksums + " number of checksums. Original checksum has " +
+ (this.checksums.size() - startIndex) + " number of checksums " +
+ "starting from index " + startIndex);
}
return true;
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index 5b63420..41ac60f 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -20,19 +20,10 @@ package org.apache.hadoop.ozone.client.io;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
-import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.ratis.util.Preconditions;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,60 +44,93 @@ public class KeyInputStream extends InputStream implements Seekable {
private static final int EOF = -1;
- private final ArrayList<ChunkInputStreamEntry> streamEntries;
- // streamOffset[i] stores the offset at which blockInputStream i stores
- // data in the key
- private long[] streamOffset = null;
- private int currentStreamIndex;
+ private String key;
private long length = 0;
private boolean closed = false;
- private String key;
- public KeyInputStream() {
- streamEntries = new ArrayList<>();
- currentStreamIndex = 0;
- }
+ // List of BlockInputStreams, one for each block in the key
+ private final List<BlockInputStream> blockStreams;
- @VisibleForTesting
- public synchronized int getCurrentStreamIndex() {
- return currentStreamIndex;
- }
+ // blockOffsets[i] stores the index of the first data byte in
+ // blockStream w.r.t the key data.
+ // For example, let’s say the block size is 200 bytes and block[0] stores
+ // data from indices 0 - 199, block[1] from indices 200 - 399 and so on.
+ // Then, blockOffset[0] = 0 (the offset of the first byte of data in
+ // block[0]), blockOffset[1] = 200 and so on.
+ private long[] blockOffsets = null;
- @VisibleForTesting
- public long getRemainingOfIndex(int index) throws IOException {
- return streamEntries.get(index).getRemaining();
+ // Index of the blockStream corresponding to the current position of the
+ // KeyInputStream i.e. offset of the data to be read next
+ private int blockIndex;
+
+ // Tracks the blockIndex corresponding to the last seeked position so that it
+ // can be reset if a new position is seeked.
+ private int blockIndexOfPrevPosition;
+
+ public KeyInputStream() {
+ blockStreams = new ArrayList<>();
+ blockIndex = 0;
}
/**
- * Append another stream to the end of the list.
- *
- * @param stream the stream instance.
- * @param streamLength the max number of bytes that should be written to this
- * stream.
+ * For each block in keyInfo, add a BlockInputStream to blockStreams.
*/
- @VisibleForTesting
- public synchronized void addStream(BlockInputStream stream,
- long streamLength) {
- streamEntries.add(new ChunkInputStreamEntry(stream, streamLength));
+ public static LengthInputStream getFromOmKeyInfo(OmKeyInfo keyInfo,
+ XceiverClientManager xceiverClientManager, String requestId,
+ boolean verifyChecksum) {
+ List<OmKeyLocationInfo> keyLocationInfos = keyInfo
+ .getLatestVersionLocations().getBlocksLatestVersionOnly();
+
+ KeyInputStream keyInputStream = new KeyInputStream();
+ keyInputStream.initialize(keyInfo.getKeyName(), keyLocationInfos,
+ xceiverClientManager, requestId, verifyChecksum);
+
+ return new LengthInputStream(keyInputStream, keyInputStream.length);
+ }
+
+ private synchronized void initialize(String keyName,
+ List<OmKeyLocationInfo> blockInfos,
+ XceiverClientManager xceiverClientManager, String requestId,
+ boolean verifyChecksum) {
+ this.key = keyName;
+ this.blockOffsets = new long[blockInfos.size()];
+ long keyLength = 0;
+ for (int i = 0; i < blockInfos.size(); i++) {
+ OmKeyLocationInfo omKeyLocationInfo = blockInfos.get(i);
+ LOG.debug("Adding stream for accessing {}. The stream will be " +
+ "initialized later.", omKeyLocationInfo);
+
+ addStream(omKeyLocationInfo, xceiverClientManager, requestId,
+ verifyChecksum);
+
+ this.blockOffsets[i] = keyLength;
+ keyLength += omKeyLocationInfo.getLength();
+ }
+ this.length = keyLength;
}
/**
- * Append another ChunkInputStreamEntry to the end of the list.
- * The stream will be constructed from the input information when it needs
- * to be accessed.
+ * Append another BlockInputStream to the end of the list. Note that the
+ * BlockInputStream is only created here and not initialized. The
+ * BlockInputStream is initialized when a read operation is performed on
+ * the block for the first time.
*/
- private synchronized void addStream(OmKeyLocationInfo omKeyLocationInfo,
+ private synchronized void addStream(OmKeyLocationInfo blockInfo,
XceiverClientManager xceiverClientMngr, String clientRequestId,
boolean verifyChecksum) {
- streamEntries.add(new ChunkInputStreamEntry(omKeyLocationInfo,
- xceiverClientMngr, clientRequestId, verifyChecksum));
+ blockStreams.add(new BlockInputStream(blockInfo.getBlockID(),
+ blockInfo.getLength(), blockInfo.getPipeline(), blockInfo.getToken(),
+ verifyChecksum, clientRequestId, xceiverClientMngr));
}
- private synchronized ChunkInputStreamEntry getStreamEntry(int index)
- throws IOException {
- return streamEntries.get(index).getStream();
+ @VisibleForTesting
+ public void addStream(BlockInputStream blockInputStream) {
+ blockStreams.add(blockInputStream);
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public synchronized int read() throws IOException {
byte[] buf = new byte[1];
@@ -116,9 +140,12 @@ public class KeyInputStream extends InputStream implements Seekable {
return Byte.toUnsignedInt(buf[0]);
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
- checkNotClosed();
+ checkOpen();
if (b == null) {
throw new NullPointerException();
}
@@ -131,13 +158,15 @@ public class KeyInputStream extends InputStream implements Seekable {
int totalReadLen = 0;
while (len > 0) {
// if we are at the last block and have read the entire block, return
- if (streamEntries.size() == 0 ||
- (streamEntries.size() - 1 <= currentStreamIndex &&
- streamEntries.get(currentStreamIndex)
- .getRemaining() == 0)) {
+ if (blockStreams.size() == 0 ||
+ (blockStreams.size() - 1 <= blockIndex &&
+ blockStreams.get(blockIndex)
+ .getRemaining() == 0)) {
return totalReadLen == 0 ? EOF : totalReadLen;
}
- ChunkInputStreamEntry current = getStreamEntry(currentStreamIndex);
+
+ // Get the current blockStream and read data from it
+ BlockInputStream current = blockStreams.get(blockIndex);
int numBytesToRead = Math.min(len, (int)current.getRemaining());
int numBytesRead = current.read(b, off, numBytesToRead);
if (numBytesRead != numBytesToRead) {
@@ -146,23 +175,35 @@ public class KeyInputStream extends InputStream implements Seekable {
// this case.
throw new IOException(String.format(
"Inconsistent read for blockID=%s length=%d numBytesRead=%d",
- current.blockInputStream.getBlockID(), current.length,
- numBytesRead));
+ current.getBlockID(), current.getLength(), numBytesRead));
}
totalReadLen += numBytesRead;
off += numBytesRead;
len -= numBytesRead;
if (current.getRemaining() <= 0 &&
- ((currentStreamIndex + 1) < streamEntries.size())) {
- currentStreamIndex += 1;
+ ((blockIndex + 1) < blockStreams.size())) {
+ blockIndex += 1;
}
}
return totalReadLen;
}
+ /**
+ * Seeks the KeyInputStream to the specified position. This involves 2 steps:
+ * 1. Updating the blockIndex to the blockStream corresponding to the
+ * seeked position.
+ * 2. Seeking the corresponding blockStream to the adjusted position.
+ *
+ * For example, let’s say the block size is 200 bytes and block[0] stores
+ * data from indices 0 - 199, block[1] from indices 200 - 399 and so on.
+ * Let’s say we seek to position 240. In the first step, the blockIndex
+ * would be updated to 1 as indices 200 - 399 reside in blockStream[1]. In
+ * the second step, the blockStream[1] would be seeked to position 40 (=
+ * 240 - blockOffset[1] (= 200)).
+ */
@Override
- public void seek(long pos) throws IOException {
- checkNotClosed();
+ public synchronized void seek(long pos) throws IOException {
+ checkOpen();
if (pos < 0 || pos >= length) {
if (pos == 0) {
// It is possible for length and pos to be zero in which case
@@ -172,35 +213,39 @@ public class KeyInputStream extends InputStream implements Seekable {
throw new EOFException(
"EOF encountered at pos: " + pos + " for key: " + key);
}
- Preconditions.assertTrue(currentStreamIndex >= 0);
- if (currentStreamIndex >= streamEntries.size()) {
- currentStreamIndex = Arrays.binarySearch(streamOffset, pos);
- } else if (pos < streamOffset[currentStreamIndex]) {
- currentStreamIndex =
- Arrays.binarySearch(streamOffset, 0, currentStreamIndex, pos);
- } else if (pos >= streamOffset[currentStreamIndex] + streamEntries
- .get(currentStreamIndex).length) {
- currentStreamIndex = Arrays
- .binarySearch(streamOffset, currentStreamIndex + 1,
- streamEntries.size(), pos);
+
+ // 1. Update the blockIndex
+ if (blockIndex >= blockStreams.size()) {
+ blockIndex = Arrays.binarySearch(blockOffsets, pos);
+ } else if (pos < blockOffsets[blockIndex]) {
+ blockIndex =
+ Arrays.binarySearch(blockOffsets, 0, blockIndex, pos);
+ } else if (pos >= blockOffsets[blockIndex] + blockStreams
+ .get(blockIndex).getLength()) {
+ blockIndex = Arrays
+ .binarySearch(blockOffsets, blockIndex + 1,
+ blockStreams.size(), pos);
}
- if (currentStreamIndex < 0) {
+ if (blockIndex < 0) {
// Binary search returns -insertionPoint - 1 if element is not present
// in the array. insertionPoint is the point at which element would be
- // inserted in the sorted array. We need to adjust the currentStreamIndex
- // accordingly so that currentStreamIndex = insertionPoint - 1
- currentStreamIndex = -currentStreamIndex - 2;
+ // inserted in the sorted array. We need to adjust the blockIndex
+ // accordingly so that blockIndex = insertionPoint - 1
+ blockIndex = -blockIndex - 2;
}
- // seek to the proper offset in the BlockInputStream
- streamEntries.get(currentStreamIndex)
- .seek(pos - streamOffset[currentStreamIndex]);
+
+ // Reset the previous blockStream's position
+ blockStreams.get(blockIndexOfPrevPosition).resetPosition();
+
+ // 2. Seek the blockStream to the adjusted position
+ blockStreams.get(blockIndex).seek(pos - blockOffsets[blockIndex]);
+ blockIndexOfPrevPosition = blockIndex;
}
@Override
- public long getPos() throws IOException {
- return length == 0 ? 0 :
- streamOffset[currentStreamIndex] + streamEntries.get(currentStreamIndex)
- .getPos();
+ public synchronized long getPos() throws IOException {
+ return length == 0 ? 0 : blockOffsets[blockIndex] +
+ blockStreams.get(blockIndex).getPos();
}
@Override
@@ -210,7 +255,7 @@ public class KeyInputStream extends InputStream implements Seekable {
@Override
public int available() throws IOException {
- checkNotClosed();
+ checkOpen();
long remaining = length - getPos();
return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE;
}
@@ -218,177 +263,30 @@ public class KeyInputStream extends InputStream implements Seekable {
@Override
public void close() throws IOException {
closed = true;
- for (int i = 0; i < streamEntries.size(); i++) {
- streamEntries.get(i).close();
+ for (BlockInputStream blockStream : blockStreams) {
+ blockStream.close();
}
}
/**
- * Encapsulates BlockInputStream.
- */
- public static class ChunkInputStreamEntry extends InputStream
- implements Seekable {
-
- private BlockInputStream blockInputStream;
- private final OmKeyLocationInfo blockLocationInfo;
- private final long length;
- private final XceiverClientManager xceiverClientManager;
- private final String requestId;
- private boolean verifyChecksum;
-
- // the position of the blockInputStream is maintained by this variable
- // till the stream is initialized
- private long position;
-
- public ChunkInputStreamEntry(OmKeyLocationInfo omKeyLocationInfo,
- XceiverClientManager xceiverClientMngr, String clientRequestId,
- boolean verifyChecksum) {
- this.blockLocationInfo = omKeyLocationInfo;
- this.length = omKeyLocationInfo.getLength();
- this.xceiverClientManager = xceiverClientMngr;
- this.requestId = clientRequestId;
- this.verifyChecksum = verifyChecksum;
- }
-
- @VisibleForTesting
- public ChunkInputStreamEntry(BlockInputStream blockInputStream,
- long length) {
- this.blockInputStream = blockInputStream;
- this.length = length;
- this.blockLocationInfo = null;
- this.xceiverClientManager = null;
- this.requestId = null;
- }
-
- private ChunkInputStreamEntry getStream() throws IOException {
- if (this.blockInputStream == null) {
- initializeBlockInputStream();
- }
- return this;
- }
-
- private void initializeBlockInputStream() throws IOException {
- BlockID blockID = blockLocationInfo.getBlockID();
- long containerID = blockID.getContainerID();
- Pipeline pipeline = blockLocationInfo.getPipeline();
-
- // irrespective of the container state, we will always read via Standalone
- // protocol.
- if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
- pipeline = Pipeline.newBuilder(pipeline)
- .setType(HddsProtos.ReplicationType.STAND_ALONE).build();
- }
- XceiverClientSpi xceiverClient = xceiverClientManager
- .acquireClient(pipeline);
- boolean success = false;
- long containerKey = blockLocationInfo.getLocalID();
- try {
- LOG.debug("Initializing stream for get key to access {} {}",
- containerID, containerKey);
- ContainerProtos.DatanodeBlockID datanodeBlockID = blockID
- .getDatanodeBlockIDProtobuf();
- if (blockLocationInfo.getToken() != null) {
- UserGroupInformation.getCurrentUser().
- addToken(blockLocationInfo.getToken());
- }
- ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls
- .getBlock(xceiverClient, datanodeBlockID, requestId);
- List<ContainerProtos.ChunkInfo> chunks =
- response.getBlockData().getChunksList();
- success = true;
- this.blockInputStream = new BlockInputStream(
- blockLocationInfo.getBlockID(), xceiverClientManager, xceiverClient,
- chunks, requestId, verifyChecksum, position);
- } finally {
- if (!success) {
- xceiverClientManager.releaseClient(xceiverClient, false);
- }
- }
- }
-
- synchronized long getRemaining() throws IOException {
- return length - getPos();
- }
-
- @Override
- public synchronized int read(byte[] b, int off, int len)
- throws IOException {
- int readLen = blockInputStream.read(b, off, len);
- return readLen;
- }
-
- @Override
- public synchronized int read() throws IOException {
- int data = blockInputStream.read();
- return data;
- }
-
- @Override
- public synchronized void close() throws IOException {
- if (blockInputStream != null) {
- blockInputStream.close();
- }
- }
-
- @Override
- public void seek(long pos) throws IOException {
- if (blockInputStream != null) {
- blockInputStream.seek(pos);
- } else {
- position = pos;
- }
- }
-
- @Override
- public long getPos() throws IOException {
- if (blockInputStream != null) {
- return blockInputStream.getPos();
- } else {
- return position;
- }
- }
-
- @Override
- public boolean seekToNewSource(long targetPos) throws IOException {
- return false;
- }
- }
-
- public static LengthInputStream getFromOmKeyInfo(
- OmKeyInfo keyInfo,
- XceiverClientManager xceiverClientManager,
- StorageContainerLocationProtocol
- storageContainerLocationClient,
- String requestId, boolean verifyChecksum) throws IOException {
- long length = 0;
- KeyInputStream groupInputStream = new KeyInputStream();
- groupInputStream.key = keyInfo.getKeyName();
- List<OmKeyLocationInfo> keyLocationInfos =
- keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly();
- groupInputStream.streamOffset = new long[keyLocationInfos.size()];
- for (int i = 0; i < keyLocationInfos.size(); i++) {
- OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(i);
- LOG.debug("Adding stream for accessing {}. The stream will be " +
- "initialized later.", omKeyLocationInfo);
- groupInputStream.addStream(omKeyLocationInfo, xceiverClientManager,
- requestId, verifyChecksum);
-
- groupInputStream.streamOffset[i] = length;
- length += omKeyLocationInfo.getLength();
- }
- groupInputStream.length = length;
- return new LengthInputStream(groupInputStream, length);
- }
-
- /**
* Verify that the input stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed.
*/
- private void checkNotClosed() throws IOException {
+ private void checkOpen() throws IOException {
if (closed) {
throw new IOException(
": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + key);
}
}
+
+ @VisibleForTesting
+ public synchronized int getCurrentStreamIndex() {
+ return blockIndex;
+ }
+
+ @VisibleForTesting
+ public long getRemainingOfIndex(int index) throws IOException {
+ return blockStreams.get(index).getRemaining();
+ }
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 48968a4..5f2df7d 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -1072,8 +1072,8 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
private OzoneInputStream createInputStream(OmKeyInfo keyInfo,
String requestId) throws IOException {
LengthInputStream lengthInputStream = KeyInputStream
- .getFromOmKeyInfo(keyInfo, xceiverClientManager,
- storageContainerLocationClient, requestId, verifyChecksum);
+ .getFromOmKeyInfo(keyInfo, xceiverClientManager, requestId,
+ verifyChecksum);
FileEncryptionInfo feInfo = keyInfo.getFileEncryptionInfo();
if (feInfo != null) {
final KeyProvider.KeyVersion decrypted = getDEK(feInfo);
diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index a4aa361..6876166 100644
--- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -503,8 +503,7 @@ public final class DistributedStorageHandler implements StorageHandler {
.build();
OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
return KeyInputStream.getFromOmKeyInfo(
- keyInfo, xceiverClientManager, storageContainerLocationClient,
- args.getRequestID(), verifyChecksum);
+ keyInfo, xceiverClientManager, args.getRequestID(), verifyChecksum);
}
@Override
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
index 45f04df..80717dd 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
@@ -25,7 +25,6 @@ import org.junit.rules.ExpectedException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.util.ArrayList;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
@@ -48,8 +47,7 @@ public class TestChunkStreams {
for (int i = 0; i < 5; i++) {
int tempOffset = offset;
BlockInputStream in =
- new BlockInputStream(null, null, null, new ArrayList<>(), null,
- true, 0) {
+ new BlockInputStream(null, 100, null, null, true, null, null) {
private long pos = 0;
private ByteArrayInputStream in =
new ByteArrayInputStream(buf, tempOffset, 100);
@@ -84,7 +82,7 @@ public class TestChunkStreams {
}
};
offset += 100;
- groupInputStream.addStream(in, 100);
+ groupInputStream.addStream(in);
}
byte[] resBuf = new byte[500];
@@ -105,8 +103,7 @@ public class TestChunkStreams {
for (int i = 0; i < 5; i++) {
int tempOffset = offset;
BlockInputStream in =
- new BlockInputStream(null, null, null, new ArrayList<>(), null,
- true, 0) {
+ new BlockInputStream(null, 100, null, null, true, null, null) {
private long pos = 0;
private ByteArrayInputStream in =
new ByteArrayInputStream(buf, tempOffset, 100);
@@ -141,7 +138,7 @@ public class TestChunkStreams {
}
};
offset += 100;
- groupInputStream.addStream(in, 100);
+ groupInputStream.addStream(in);
}
byte[] resBuf = new byte[600];
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org