You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2019/06/03 17:05:56 UTC

[GitHub] [hadoop] bharatviswa504 commented on a change in pull request #804: HDDS-1496. Support partial chunk reads and checksum verification

bharatviswa504 commented on a change in pull request #804: HDDS-1496. Support partial chunk reads and checksum verification
URL: https://github.com/apache/hadoop/pull/804#discussion_r289945119
 
 

 ##########
 File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
 ##########
 @@ -0,0 +1,536 @@
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * An {@link InputStream} used by the REST service in combination with the
+ * SCMClient to read the value of a key from a sequence of container chunks.
+ * All bytes of the key value are stored in container chunks. Each chunk may
+ * contain multiple underlying {@link ByteBuffer} instances.  This class
+ * encapsulates all state management for iterating through the sequence of
+ * buffers within each chunk.
+ */
+public class ChunkInputStream extends InputStream implements Seekable {
+
+  private final ChunkInfo chunkInfo;
+  private final long length;
+  private final BlockID blockID;
+  private final String traceID;
+  private XceiverClientSpi xceiverClient;
+  private final boolean verifyChecksum;
+  private boolean allocated = false;
+
+  // Buffer to store the chunk data read from the DN container
+  private List<ByteBuffer> buffers;
+
+  // Index of the buffers corresponding to the current postion of the buffers
+  private int bufferIndex;
+  
+  // The offset of the current data residing in the buffers w.r.t the start
+  // of chunk data
+  private long bufferOffset;
+  
+  // The number of bytes of chunk data residing in the buffers currently
+  private long bufferLength;
+  
+  // Position of the ChunkInputStream is maintained by this variable (if a
+  // seek is performed) and till the buffers are allocated. This position is
+  // w.r.t to the chunk only and not the block or key.
+  private long chunkPosition = -1;
+
+  private static final int EOF = -1;
+
+  ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId,
+      String traceId, XceiverClientSpi xceiverClient, boolean verifyChecksum) {
+    this.chunkInfo = chunkInfo;
+    this.length = chunkInfo.getLen();
+    this.blockID = blockId;
+    this.traceID = traceId;
+    this.xceiverClient = xceiverClient;
+    this.verifyChecksum = verifyChecksum;
+  }
+
+  public synchronized long getRemaining() throws IOException {
+    return length - getPos();
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+    checkOpen();
+    int available = prepareRead(1);
+    int dataout = EOF;
+
+    if (available == EOF) {
+      // There is no more data in the chunk stream. The buffers should have
+      // been released by now
+      Preconditions.checkState(buffers == null);
+    } else {
+      dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
+    }
+
+    if (chunkStreamEOF()) {
+      // consumer might use getPos to determine EOF,
+      // so release buffers when serving the last byte of data
+      releaseBuffers();
+    }
+
+    return dataout;
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int off, int len) throws IOException {
+    // According to the JavaDocs for InputStream, it is recommended that
+    // subclasses provide an override of bulk read if possible for performance
+    // reasons.  In addition to performance, we need to do it for correctness
+    // reasons.  The Ozone REST service uses PipedInputStream and
+    // PipedOutputStream to relay HTTP response data between a Jersey thread and
+    // a Netty thread.  It turns out that PipedInputStream/PipedOutputStream
+    // have a subtle dependency (bug?) on the wrapped stream providing separate
+    // implementations of single-byte read and bulk read.  Without this, get key
+    // responses might close the connection before writing all of the bytes
+    // advertised in the Content-Length.
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return 0;
+    }
+    checkOpen();
+    int total = 0;
+    while (len > 0) {
+      int available = prepareRead(len);
+      if (available == EOF) {
+        // There is no more data in the chunk stream. The buffers should have
+        // been released by now
+        Preconditions.checkState(buffers == null);
+        return total != 0 ? total : EOF;
+      }
+      buffers.get(bufferIndex).get(b, off + total, available);
+      len -= available;
+      total += available;
+    }
+
+    if (chunkStreamEOF()) {
+      // smart consumers determine EOF by calling getPos()
+      // so we release buffers when serving the final bytes of data
+      releaseBuffers();
+    }
+
+    return total;
+  }
+
+  /**
+   * Seeks the ChunkInputStream to the specified position. This is done by
+   * updating the chunkPosition to the seeked position in case the buffers
+   * are not allocated or buffers do not contain the data corresponding to
+   * the seeked position (determined by buffersHavePosition()). Otherwise,
+   * the buffers position is updated to the seeked position.
+   */
+  @Override
+  public void seek(long pos) throws IOException {
+    if (pos < 0 || pos >= length) {
+      if (pos == 0) {
+        // It is possible for length and pos to be zero in which case
+        // seek should return instead of throwing exception
+        return;
+      }
+      throw new EOFException("EOF encountered at pos: " + pos + " for chunk: "
+          + chunkInfo.getChunkName());
+    }
+
+    if (buffersHavePosition(pos)) {
+      // The bufferPosition is w.r.t the current chunk.
+      // Adjust the bufferIndex and postion to the seeked postion.
+      adjustBufferPosition(pos - bufferOffset);
+    } else {
+      chunkPosition = pos;
+    }
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    if (chunkPosition >= 0) {
+      return chunkPosition;
+    }
+    if (chunkStreamEOF()) {
+      return length;
+    }
+    if (buffersHaveData()) {
+      return bufferOffset + buffers.get(bufferIndex).position();
+    }
+    if (buffersAllocated()) {
+      return bufferOffset + bufferLength;
+    }
+    return 0;
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return false;
+  }
+
+  @Override
+  public synchronized void close() {
+    if (xceiverClient != null) {
+      xceiverClient = null;
+    }
+  }
+
+  /**
+   * Checks if the stream is open.  If not, throw an exception.
+   *
+   * @throws IOException if stream is closed
+   */
+  protected synchronized void checkOpen() throws IOException {
+    if (xceiverClient == null) {
+      throw new IOException("BlockInputStream has been closed.");
+    }
+  }
+
+  /**
+   * Prepares to read by advancing through buffers or allocating new buffers,
+   * as needed until it finds data to return, or encounters EOF.
+   * @param len desired lenght of data to read
+   * @return length of data available to read, possibly less than desired length
+   */
+  private synchronized int prepareRead(int len) throws IOException {
+    for (;;) {
+      if (chunkPosition >= 0) {
+        if (buffersHavePosition(chunkPosition)) {
+          // The current buffers have the seeked postion. Adjust the buffer
+          // index and postion to point to the chunkPosition.
+          adjustBufferPosition(chunkPosition - bufferOffset);
+        } else {
+          // Read a required chunk data to fill the buffers with seeked
+          // postion data
+          readChunkFromContainer(len);
+        }
+      }
+      if (buffersHaveData()) {
+        // Data is available from buffers
+        ByteBuffer bb = buffers.get(bufferIndex);
+        return len > bb.remaining() ? bb.remaining() : len;
+      } else  if (dataRemainingInChunk()) {
+        // There is more data in the chunk stream which has not
+        // been read into the buffers yet.
+        readChunkFromContainer(len);
+      } else {
+        // All available input from this chunk stream has been consumed.
+        return EOF;
+      }
+    }
+  }
+
+  /**
+   * Reads full or partial Chunk from DN Container based on the current
+   * postion of the ChunkInputStream, the number of bytes of data to read
+   * and the checksum boundaries.
+   * If successful, then the read data in saved in the buffers so that
+   * subsequent read calls can utilize it.
+   * @param len number of bytes of data to be read
+   * @throws IOException if there is an I/O error while performing the call
+   * to Datanode
+   */
+  private synchronized void readChunkFromContainer(int len) throws IOException {
+
+    List<DatanodeDetails> excludeDns = null;
+    ByteString byteString;
+    List<DatanodeDetails> dnList = getDatanodeList();
+
+    // index of first byte to be read from the chunk
+    long startByteIndex;
+    if (chunkPosition >= 0) {
+      // If seek operation was called to advance the buffer postion, the
+      // chunk should be read from that postion onwards.
+      startByteIndex = chunkPosition;
+    } else {
+      // Start reading the chunk from the last chunkPosition onwards.
+      startByteIndex = bufferOffset + bufferLength;
+    }
+
+    if (verifyChecksum) {
+      // Update the bufferOffset and bufferLength as per the checksum
+      // boundary requirement.
+      computeChecksumBoundaries(startByteIndex, len);
+    } else {
+      // Read from the startByteIndex
+      bufferOffset = startByteIndex;
+      bufferLength = len;
+    }
+
+    // Adjust the chunkInfo so that only the required bytes are read from
+    // the chunk.
+    final ChunkInfo adjustedChunkInfo = ChunkInfo.newBuilder(chunkInfo)
+        .setOffset(bufferOffset)
+        .setLen(bufferLength)
+        .build();
+
+    while (true) {
+      List<DatanodeDetails> dnListFromReadChunkCall = new ArrayList<>();
+      byteString = readChunk(adjustedChunkInfo, excludeDns,
+          dnListFromReadChunkCall);
+      try {
+        if (byteString.size() != adjustedChunkInfo.getLen()) {
+          // Bytes read from chunk should be equal to chunk size.
+          throw new IOException(String
+              .format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
+                  adjustedChunkInfo.getChunkName(), adjustedChunkInfo.getLen(),
+                  byteString.size()));
+        }
+
+        if (verifyChecksum) {
+          ChecksumData checksumData = ChecksumData.getFromProtoBuf(
+              chunkInfo.getChecksumData());
+          int checkumStartIndex =
+              (int) (bufferOffset / checksumData.getBytesPerChecksum());
+          Checksum.verifyChecksum(byteString, checksumData, checkumStartIndex);
+        }
+        break;
+      } catch (IOException ioe) {
+        // we will end up in this situation only if the checksum mismatch
+        // happens or the length of the chunk mismatches.
+        // In this case, read should be retried on a different replica.
+        // TODO: Inform SCM of a possible corrupt container replica here
+        if (excludeDns == null) {
+          excludeDns = new ArrayList<>();
+        }
+        excludeDns.addAll(dnListFromReadChunkCall);
+        if (excludeDns.size() == dnList.size()) {
+          throw ioe;
+        }
+      }
+    }
+
+    buffers = byteString.asReadOnlyByteBufferList();
+    bufferIndex = 0;
+    allocated = true;
+
+    // If the stream was seeked to postion before, then the buffer
 
 Review comment:
   Minor NIT: postion -> position.
   Need to be changed in multiple places.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org