You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2019/05/23 02:32:45 UTC

[GitHub] [hadoop] hadoop-yetus commented on a change in pull request #804: HDDS-1496. Support partial chunk reads and checksum verification

hadoop-yetus commented on a change in pull request #804: HDDS-1496. Support partial chunk reads and checksum verification
URL: https://github.com/apache/hadoop/pull/804#discussion_r286755505
 
 

 ##########
 File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
 ##########
 @@ -43,467 +41,334 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 
 /**
  * An {@link InputStream} used by the REST service in combination with the
  * SCMClient to read the value of a key from a sequence
  * of container chunks.  All bytes of the key value are stored in container
- * chunks.  Each chunk may contain multiple underlying {@link ByteBuffer}
+ * chunks. Each chunk may contain multiple underlying {@link ByteBuffer}
  * instances.  This class encapsulates all state management for iterating
- * through the sequence of chunks and the sequence of buffers within each chunk.
+ * through the sequence of chunks through {@link ChunkInputStream}.
  */
 public class BlockInputStream extends InputStream implements Seekable {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BlockInputStream.class);
+
   private static final int EOF = -1;
 
   private final BlockID blockID;
+  private final long length;
+  private Pipeline pipeline;
+  private final long containerKey;
+  private final Token<OzoneBlockTokenIdentifier> token;
+  private final boolean verifyChecksum;
   private final String traceID;
   private XceiverClientManager xceiverClientManager;
   private XceiverClientSpi xceiverClient;
-  private List<ChunkInfo> chunks;
-  // ChunkIndex points to the index current chunk in the buffers or the the
-  // index of chunk which will be read next into the buffers in
-  // readChunkFromContainer().
+  private boolean initialized = false;
+
+  // List of ChunkInputStreams, one for each chunk in the block
+  private List<ChunkInputStream> chunkStreams;
+
+  // chunkOffsets[i] stores the index of the first data byte in
+  // chunkStream i w.r.t the block data.
+  // Let’s say we have chunk size as 40 bytes. And let's say the parent
+  // block stores data from index 200 and has length 400.
+  // The first 40 bytes of this block will be stored in chunk[0], next 40 in
+  // chunk[1] and so on. But since the chunkOffsets are w.r.t the block only
+  // and not the key, the values in chunkOffsets will be [0, 40, 80,....].
+  private long[] chunkOffsets = null;
+
+  // Index of the chunkStream corresponding to the current postion of the
+  // BlockInputStream i.e offset of the data to be read next from this block
   private int chunkIndex;
-  // ChunkIndexOfCurrentBuffer points to the index of chunk read into the
-  // buffers or index of the last chunk in the buffers. It is updated only
-  // when a new chunk is read from container into the buffers.
-  private int chunkIndexOfCurrentBuffer;
-  private long[] chunkOffset;
-  private List<ByteBuffer> buffers;
-  private int bufferIndex;
-  private long bufferPosition;
-  private final boolean verifyChecksum;
 
-  /**
-   * Creates a new BlockInputStream.
-   *
-   * @param blockID block ID of the chunk
-   * @param xceiverClientManager client manager that controls client
-   * @param xceiverClient client to perform container calls
-   * @param chunks list of chunks to read
-   * @param traceID container protocol call traceID
-   * @param verifyChecksum verify checksum
-   * @param initialPosition the initial position of the stream pointer. This
-   *                        position is seeked now if the up-stream was seeked
-   *                        before this was created.
-   */
-  public BlockInputStream(
-      BlockID blockID, XceiverClientManager xceiverClientManager,
-      XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID,
-      boolean verifyChecksum, long initialPosition) throws IOException {
-    this.blockID = blockID;
-    this.traceID = traceID;
-    this.xceiverClientManager = xceiverClientManager;
-    this.xceiverClient = xceiverClient;
-    this.chunks = chunks;
-    this.chunkIndex = 0;
-    this.chunkIndexOfCurrentBuffer = -1;
-    // chunkOffset[i] stores offset at which chunk i stores data in
-    // BlockInputStream
-    this.chunkOffset = new long[this.chunks.size()];
-    initializeChunkOffset();
-    this.buffers = null;
-    this.bufferIndex = 0;
-    this.bufferPosition = -1;
+  // Position of the BlockInputStream is maintainted by this variable till
+  // the stream is initialized. This postion is w.r.t to the block only and
+  // not the key.
+  // For the above example, if we seek to postion 240 before the stream is
+  // initialized, then value of postion will be set to 40.
+  // Once, the stream is initialized, the postion of the stream
+  // will be determined by the current chunkStream and its postion.
+  private long blockPosition = 0;
+
+  // Tracks the chunkIndex corresponding to the last blockPosition so that it
+  // can be reset if a new postion is seeked.
+  private int chunkIndexOfPrevPosition;
+
+  public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
+      long containerKey, Token<OzoneBlockTokenIdentifier> token,
+      boolean verifyChecksum, String traceId,
+      XceiverClientManager xceiverClientManager) {
+    this.blockID = blockId;
+    this.length = blockLen;
+    this.pipeline = pipeline;
+    this.containerKey = containerKey;
+    this.token = token;
     this.verifyChecksum = verifyChecksum;
-    if (initialPosition > 0) {
-      // The stream was seeked to a position before the stream was
-      // initialized. So seeking to the position now.
-      seek(initialPosition);
-    }
-  }
-
-  private void initializeChunkOffset() {
-    long tempOffset = 0;
-    for (int i = 0; i < chunks.size(); i++) {
-      chunkOffset[i] = tempOffset;
-      tempOffset += chunks.get(i).getLen();
-    }
+    this.traceID = traceId;
+    this.xceiverClientManager = xceiverClientManager;
   }
 
-  @Override
-  public synchronized int read()
-      throws IOException {
-    checkOpen();
-    int available = prepareRead(1);
-    int dataout = EOF;
+  /**
+   * Initialize the BlockInputStream. Get the BlockData (list of chunks) from
+   * the Container and create the ChunkInputStreams for each Chunk in the Block.
+   */
+  public synchronized void initialize() throws IOException {
 
-    if (available == EOF) {
-      Preconditions
-          .checkState(buffers == null); //should have released by now, see below
-    } else {
-      dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
-    }
+    List<ChunkInfo> chunks = getChunkInfos();
 
-    if (blockStreamEOF()) {
-      // consumer might use getPos to determine EOF,
-      // so release buffers when serving the last byte of data
-      releaseBuffers();
-    }
+    if (chunks != null && !chunks.isEmpty()) {
+      intializeChunkInputStreams(chunks);
 
-    return dataout;
-  }
-
-  @Override
-  public synchronized int read(byte[] b, int off, int len) throws IOException {
-    // According to the JavaDocs for InputStream, it is recommended that
-    // subclasses provide an override of bulk read if possible for performance
-    // reasons.  In addition to performance, we need to do it for correctness
-    // reasons.  The Ozone REST service uses PipedInputStream and
-    // PipedOutputStream to relay HTTP response data between a Jersey thread and
-    // a Netty thread.  It turns out that PipedInputStream/PipedOutputStream
-    // have a subtle dependency (bug?) on the wrapped stream providing separate
-    // implementations of single-byte read and bulk read.  Without this, get key
-    // responses might close the connection before writing all of the bytes
-    // advertised in the Content-Length.
-    if (b == null) {
-      throw new NullPointerException();
-    }
-    if (off < 0 || len < 0 || len > b.length - off) {
-      throw new IndexOutOfBoundsException();
-    }
-    if (len == 0) {
-      return 0;
-    }
-    checkOpen();
-    int total = 0;
-    while (len > 0) {
-      int available = prepareRead(len);
-      if (available == EOF) {
-        Preconditions
-            .checkState(buffers == null); //should have been released by now
-        return total != 0 ? total : EOF;
+      if (blockPosition > 0) {
+        // Stream was seeked to blockPosition before initialization. Seek to the
+        // blockPosition now.
+        seek(blockPosition);
       }
-      buffers.get(bufferIndex).get(b, off + total, available);
-      len -= available;
-      total += available;
-    }
-
-    if (blockStreamEOF()) {
-      // smart consumers determine EOF by calling getPos()
-      // so we release buffers when serving the final bytes of data
-      releaseBuffers();
+      initialized = true;
     }
-
-    return total;
   }
 
   /**
-   * Determines if all data in the stream has been consumed.
-   *
-   * @return true if EOF, false if more data is available
+   * Send RPC call to get the block info from the container.
+   * @return List of chunks in this block.
    */
-  protected boolean blockStreamEOF() {
-    if (buffersHaveData() || chunksRemaining()) {
-      return false;
-    } else {
-      // if there are any chunks, we better be at the last chunk for EOF
-      Preconditions.checkState(((chunks == null) || chunks.isEmpty() ||
-              chunkIndex == (chunks.size() - 1)),
-          "EOF detected, but not at the last chunk");
-      return true;
+  private List<ChunkInfo> getChunkInfos() throws IOException {
+    // irrespective of the container state, we will always read via Standalone
+    // protocol.
+    if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
+      pipeline = Pipeline.newBuilder(pipeline)
+          .setType(HddsProtos.ReplicationType.STAND_ALONE).build();
     }
-  }
-
-  private void releaseBuffers() {
-    //ashes to ashes, dust to dust
-    buffers = null;
-    bufferIndex = 0;
-  }
+    xceiverClient = xceiverClientManager.acquireClient(pipeline);
+    boolean success = false;
+    List<ChunkInfo> chunks;
+    try {
+      LOG.debug("Initializing BlockInputStream for get key to access {} {}",
+          blockID.getContainerID(), containerKey);
 
-  @Override
-  public synchronized void close() {
-    if (xceiverClientManager != null && xceiverClient != null) {
-      xceiverClientManager.releaseClient(xceiverClient, false);
-      xceiverClientManager = null;
-      xceiverClient = null;
+      if (token != null) {
+        UserGroupInformation.getCurrentUser().addToken(token);
+      }
+      DatanodeBlockID datanodeBlockID = blockID
+          .getDatanodeBlockIDProtobuf();
+      GetBlockResponseProto response = ContainerProtocolCalls
+          .getBlock(xceiverClient, datanodeBlockID, traceID);
+
+      chunks = response.getBlockData().getChunksList();
+      success = true;
+    } finally {
+      if (!success) {
+        xceiverClientManager.releaseClient(xceiverClient, false);
+      }
     }
+
+    return chunks;
   }
 
   /**
-   * Checks if the stream is open.  If not, throws an exception.
-   *
-   * @throws IOException if stream is closed
+   * For each chunk in the block, create a ChunkInputStream and compute
+   * its chunkOffset.
    */
-  private synchronized void checkOpen() throws IOException {
-    if (xceiverClient == null) {
-      throw new IOException("BlockInputStream has been closed.");
+  private void intializeChunkInputStreams(List<ChunkInfo> chunks) {
+    this.chunkOffsets = new long[chunks.size()];
+    long tempOffset = 0;
+
+    this.chunkStreams = new ArrayList<>(chunks.size());
+    for (int i = 0; i < chunks.size(); i++) {
+      addStream(chunks.get(i));
+      chunkOffsets[i] = tempOffset;
+      tempOffset += chunks.get(i).getLen();
     }
+
+    this.chunkIndex = 0;
   }
 
   /**
-   * Prepares to read by advancing through chunks and buffers as needed until it
-   * finds data to return or encounters EOF.
-   *
-   * @param len desired length of data to read
-   * @return length of data available to read, possibly less than desired length
+   * Append another ChunkInputStream to the end of the list. Note that the
+   * ChunkInputStream is only created here. The chunk will be read from the
+   * Datanode only when a read operation is performed on for that chunk.
    */
-  private synchronized int prepareRead(int len) throws IOException {
-    for (;;) {
-      if (!buffersAllocated()) {
-        // The current chunk at chunkIndex has not been read from the
-        // container. Read the chunk and put the data into buffers.
-        readChunkFromContainer();
-      }
-      if (buffersHaveData()) {
-        // Data is available from buffers
-        ByteBuffer bb = buffers.get(bufferIndex);
-        return len > bb.remaining() ? bb.remaining() : len;
-      } else if (chunksRemaining()) {
-        // There are additional chunks available.
-        // Read the next chunk in the block.
-        chunkIndex += 1;
-        readChunkFromContainer();
-      } else {
-        // All available input has been consumed.
-        return EOF;
-      }
-    }
+  private synchronized void addStream(ChunkInfo chunkInfo) {
+    chunkStreams.add(new ChunkInputStream(chunkInfo, blockID, traceID,
+        xceiverClient, verifyChecksum));
   }
 
-  private boolean buffersAllocated() {
-    if (buffers == null || buffers.isEmpty()) {
-      return false;
-    }
-    return true;
+  public synchronized long getRemaining() throws IOException {
+    return length - getPos();
   }
 
-  private boolean buffersHaveData() {
-    boolean hasData = false;
-
-    if (buffersAllocated()) {
-      while (bufferIndex < (buffers.size())) {
-        if (buffers.get(bufferIndex).hasRemaining()) {
-          // current buffer has data
-          hasData = true;
-          break;
-        } else {
-          if (buffersRemaining()) {
-            // move to next available buffer
-            ++bufferIndex;
-            Preconditions.checkState(bufferIndex < buffers.size());
-          } else {
-            // no more buffers remaining
-            break;
-          }
-        }
-      }
+  @Override
+  public synchronized int read() throws IOException {
+    byte[] buf = new byte[1];
+    if (read(buf, 0, 1) == EOF) {
+      return EOF;
     }
-
-    return hasData;
-  }
-
-  private boolean buffersRemaining() {
-    return (bufferIndex < (buffers.size() - 1));
+    return Byte.toUnsignedInt(buf[0]);
   }
 
-  private boolean chunksRemaining() {
-    if ((chunks == null) || chunks.isEmpty()) {
-      return false;
+  @Override
+  public synchronized int read(byte[] b, int off, int len) throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
     }
-    // Check if more chunks are remaining in the stream after chunkIndex
-    if (chunkIndex < (chunks.size() - 1)) {
-      return true;
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
     }
-    // ChunkIndex is the last chunk in the stream. Check if this chunk has
-    // been read from container or not. Return true if chunkIndex has not
-    // been read yet and false otherwise.
-    return chunkIndexOfCurrentBuffer != chunkIndex;
-  }
-
-  /**
-   * Attempts to read the chunk at the specified offset in the chunk list.  If
-   * successful, then the data of the read chunk is saved so that its bytes can
-   * be returned from subsequent read calls.
-   *
-   * @throws IOException if there is an I/O error while performing the call
-   */
-  private synchronized void readChunkFromContainer() throws IOException {
-    // Read the chunk at chunkIndex
-    final ChunkInfo chunkInfo = chunks.get(chunkIndex);
-    List<DatanodeDetails> excludeDns = null;
-    ByteString byteString;
-    List<DatanodeDetails> dnList = getDatanodeList();
-    while (true) {
-      List<DatanodeDetails> dnListFromReadChunkCall = new ArrayList<>();
-      byteString = readChunk(chunkInfo, excludeDns, dnListFromReadChunkCall);
-      try {
-        if (byteString.size() != chunkInfo.getLen()) {
-          // Bytes read from chunk should be equal to chunk size.
-          throw new IOException(String
-              .format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
-                  chunkInfo.getChunkName(), chunkInfo.getLen(),
-                  byteString.size()));
-        }
-        ChecksumData checksumData =
-            ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
-        if (verifyChecksum) {
-          Checksum.verifyChecksum(byteString, checksumData);
-        }
-        break;
-      } catch (IOException ioe) {
-        // we will end up in this situation only if the checksum mismatch
-        // happens or the length of the chunk mismatches.
-        // In this case, read should be retried on a different replica.
-        // TODO: Inform SCM of a possible corrupt container replica here
-        if (excludeDns == null) {
-          excludeDns = new ArrayList<>();
-        }
-        excludeDns.addAll(dnListFromReadChunkCall);
-        if (excludeDns.size() == dnList.size()) {
-          throw ioe;
-        }
-      }
+    if (len == 0) {
+      return 0;
     }
 
-    buffers = byteString.asReadOnlyByteBufferList();
-    bufferIndex = 0;
-    chunkIndexOfCurrentBuffer = chunkIndex;
+    if (!initialized) {
+      initialize();
+    }
 
-    // The bufferIndex and position might need to be adjusted if seek() was
-    // called on the stream before. This needs to be done so that the buffer
-    // position can be advanced to the 'seeked' position.
-    adjustBufferIndex();
-  }
+    checkOpen();
+    int totalReadLen = 0;
+    while (len > 0) {
+      // if we are at the last chunk and have read the entire chunk, return
+      if (chunkStreams.size() == 0 ||
+          (chunkStreams.size() - 1 <= chunkIndex &&
+              chunkStreams.get(chunkIndex)
+                  .getRemaining() == 0)) {
+        return totalReadLen == 0 ? EOF : totalReadLen;
+      }
 
-  /**
-   * Send RPC call to get the chunk from the container.
-   */
-  @VisibleForTesting
-  protected ByteString readChunk(final ChunkInfo chunkInfo,
-      List<DatanodeDetails> excludeDns, List<DatanodeDetails> dnListFromReply)
-      throws IOException {
-    XceiverClientReply reply;
-    ReadChunkResponseProto readChunkResponse = null;
-    try {
-      reply = ContainerProtocolCalls
-          .readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns);
-      ContainerProtos.ContainerCommandResponseProto response;
-      response = reply.getResponse().get();
-      ContainerProtocolCalls.validateContainerResponse(response);
-      readChunkResponse = response.getReadChunk();
-      dnListFromReply.addAll(reply.getDatanodes());
-    } catch (IOException e) {
-      if (e instanceof StorageContainerException) {
-        throw e;
+      // Get the current chunkStream and read data from it
+      ChunkInputStream current = chunkStreams.get(chunkIndex);
+      int numBytesToRead = Math.min(len, (int)current.getRemaining());
+      int numBytesRead = current.read(b, off, numBytesToRead);
+      if (numBytesRead != numBytesToRead) {
+        // This implies that there is either data loss or corruption in the
+        // chunk entries. Even EOF in the current stream would be covered in
+        // this case.
+        throw new IOException(String.format(
+            "Inconsistent read for chunkName=%s length=%d numBytesRead=%d",
+            current.getChunkName(), current.getLength(), numBytesRead));
+      }
+      totalReadLen += numBytesRead;
+      off += numBytesRead;
+      len -= numBytesRead;
+      if (current.getRemaining() <= 0 &&
+          ((chunkIndex + 1) < chunkStreams.size())) {
+        chunkIndex += 1;
       }
-      throw new IOException("Unexpected OzoneException: " + e.toString(), e);
-    } catch (ExecutionException | InterruptedException e) {
-      throw new IOException(
-          "Failed to execute ReadChunk command for chunk  " + chunkInfo
-              .getChunkName(), e);
     }
-    return readChunkResponse.getData();
-  }
-
-  @VisibleForTesting
-  protected List<DatanodeDetails> getDatanodeList() {
-    return xceiverClient.getPipeline().getNodes();
+    return totalReadLen;
   }
 
+  /**
+   * Seeks the BlockInputStream to the specified postion. If the stream is 
 
 Review comment:
   whitespace:end of line
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org