You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by GitBox <gi...@apache.org> on 2022/05/23 15:57:25 UTC

[GitHub] [parquet-mr] steveloughran commented on a diff in pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader

steveloughran commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r879586630


##########
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##########
@@ -0,0 +1,162 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  private int fetchIndex = 0;
+  private final SeekableInputStream fileInputStream;
+  private int readIndex = 0;
+  private ExecutorService threadPool;
+  private LinkedBlockingQueue<Future<Void>> readFutures;
+  private boolean closed = false;
+
+  private LongAdder totalTimeBlocked = new LongAdder();
+  private LongAdder totalCountBlocked = new LongAdder();
+  private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+  AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream fileInputStream,
+    List<ByteBuffer> buffers) {
+    super(buffers);
+    this.fileInputStream = fileInputStream;
+    this.threadPool = threadPool;
+    readFutures = new LinkedBlockingQueue<>(buffers.size());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("ASYNC: Begin read into buffers ");
+      for (ByteBuffer buf : buffers) {
+        LOG.debug("ASYNC: buffer {} ", buf);
+      }
+    }
+    fetchAll();
+  }
+
+  private void checkState() {
+    if (closed) {
+      throw new RuntimeException("Stream is closed");
+    }
+  }
+
+  private void fetchAll() {
+    checkState();
+    submitReadTask(0);
+  }
+
+  private void submitReadTask(int bufferNo) {
+    ByteBuffer buffer = buffers.get(bufferNo);
+    try {
+      readFutures.put(threadPool.submit(() -> {
+          readOneBuffer(buffer);
+          if (bufferNo < buffers.size() - 1) {
+            submitReadTask(bufferNo + 1);
+          }
+          return null;
+        })
+      );
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void readOneBuffer(ByteBuffer buffer) {
+    long startTime = System.nanoTime();
+    try {
+      fileInputStream.readFully(buffer);
+      buffer.flip();
+      long readCompleted = System.nanoTime();
+      long timeSpent = readCompleted - startTime;
+      LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+      long putStart = System.nanoTime();
+      long putCompleted = System.nanoTime();
+      LOG.debug("ASYNC Stream: FS READ (output) BLOCKED - {}",

Review Comment:
   how does this work?



##########
parquet-common/src/main/java/org/apache/parquet/bytes/SequenceByteBufferInputStream.java:
##########
@@ -0,0 +1,269 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ *   A bare minimum implementation of a {@link java.io.SequenceInputStream} that wraps an
+ *   <i>ordered</i> collection of ByteBufferInputStreams.
+ *   <p>
+ *   This class, as implemented, is intended only for a specific use in the ParquetFileReader and
+ *   throws {@link UnsupportedOperationException} in unimplemented methods to catch any unintended
+ *   use in other cases.
+ *   <p>
+ *   Even thought this class is derived from ByteBufferInputStream it explicitly does not support any
+ *   byte buffer related methods like slice. It does, however support sliceBuffers which is a
+ *   curious case of reading data from underlying streams
+ *   <p>
+ *   Even though this class changes the state of the underlying streams (by reading from them)
+ *   it does not own them and so the close method does not close the streams. To avoid resource
+ *   leaks the calling code should close the underlying streams
+ */
+public class SequenceByteBufferInputStream extends ByteBufferInputStream {
+
+  Collection<ByteBufferInputStream> collection;
+  Iterator<ByteBufferInputStream> iterator;
+  ByteBufferInputStream current;
+  long position = 0;
+
+  @Override
+  public String toString() {
+    return "SequenceByteBufferInputStream{" +
+      "collection=" + collection +
+      ", current=" + current +
+      ", position=" + position +
+      '}';
+  }
+
+  public SequenceByteBufferInputStream(Collection<ByteBufferInputStream> collection) {
+    this.collection = collection;
+    iterator = collection.iterator();
+    current = iterator.hasNext() ? iterator.next() : null;
+    if (current == null) {
+      throw new UnsupportedOperationException(
+        "Initializing SequenceByteBufferInputStream with an empty collection is not supported");
+    }
+  }
+
+  @Override
+  public long position() {
+    return position;
+  }
+
+  @Override
+  public int read(ByteBuffer out) {
+    int len = out.remaining();
+    if (len <= 0) {
+      return 0;
+    }
+    if (current == null) {
+      return -1;
+    }
+    int totalBytesRead = 0;
+    while (totalBytesRead < len) {
+      int bytesRead = current.read(out);
+      if (bytesRead == -1) {
+        if (iterator.hasNext()) {
+          current = iterator.next();
+        } else {
+          break;
+        }
+      } else {
+        totalBytesRead += bytesRead;
+      }
+    }
+    position += totalBytesRead;
+    return totalBytesRead;
+  }
+
+  @Override
+  public ByteBuffer slice(int length) throws EOFException {
+    throw new UnsupportedOperationException("slice is not supported");
+  }
+
+  @Override
+  /**
+   * This is a blocking call. Use with care when using in asynchronous mode.
+   */
+  public List<ByteBuffer> sliceBuffers(long len) throws EOFException {
+    if (len <= 0) {
+      return Collections.emptyList();
+    }
+
+    if (current == null) {
+      throw new EOFException();
+    }
+
+    List<ByteBuffer> buffers = new ArrayList<>();
+    long bytesAccumulated = 0;
+    while (bytesAccumulated < len) {
+      // This is not strictly according to the input stream contract, but once again the
+      // underlying implementations of ByteBufferInputStream return the available bytes
+      // based on the size of the underlying buffers rather than the bytes currently read
+      // into the buffers. This works for us because the underlying implementations will
+      // actually fill the buffers with the data before returning the slices we ask for
+      // (which is why this is a blocking call)
+      if (current.available() > 0) {
+        int bufLen = (int) Math.min(len - bytesAccumulated, current.available());
+        List<ByteBuffer> currentSlices = current.sliceBuffers(bufLen);
+        buffers.addAll(currentSlices);
+        bytesAccumulated += bufLen;
+
+        // update state; the bytes are considered read
+        this.position += bufLen;
+      } else {
+        if (iterator.hasNext()) {
+          current = iterator.next();
+        } else {
+          // there are no more streams
+          throw new EOFException();
+        }
+      }
+    }
+    position += bytesAccumulated;
+    return buffers;
+  }
+
+  @Override
+  public ByteBufferInputStream sliceStream(long length) throws EOFException {
+    throw new UnsupportedOperationException("sliceStream is not supported");
+  }
+
+  @Override
+  public List<ByteBuffer> remainingBuffers() {
+    throw new UnsupportedOperationException("remainingBuffers is not supported");
+  }
+
+  @Override
+  public ByteBufferInputStream remainingStream() {
+    throw new UnsupportedOperationException("remainingStream is not supported");
+  }
+
+  @Override
+  public int read() throws IOException {
+    int val;
+    while (true) {
+      try {
+        val = current.read() & 0xFF; // as unsigned
+        position += 1;
+        break;
+      } catch (EOFException e) {
+        if (iterator.hasNext()) {
+          current = iterator.next();
+        } else {
+          throw new EOFException("End of streams");

Review Comment:
   InputStream.read mandates return -1 on eof



##########
parquet-common/src/main/java/org/apache/parquet/bytes/SequenceByteBufferInputStream.java:
##########
@@ -0,0 +1,269 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ *   A bare minimum implementation of a {@link java.io.SequenceInputStream} that wraps an
+ *   <i>ordered</i> collection of ByteBufferInputStreams.
+ *   <p>
+ *   This class, as implemented, is intended only for a specific use in the ParquetFileReader and
+ *   throws {@link UnsupportedOperationException} in unimplemented methods to catch any unintended
+ *   use in other cases.
+ *   <p>
+ *   Even thought this class is derived from ByteBufferInputStream it explicitly does not support any
+ *   byte buffer related methods like slice. It does, however support sliceBuffers which is a
+ *   curious case of reading data from underlying streams
+ *   <p>
+ *   Even though this class changes the state of the underlying streams (by reading from them)
+ *   it does not own them and so the close method does not close the streams. To avoid resource
+ *   leaks the calling code should close the underlying streams
+ */
+public class SequenceByteBufferInputStream extends ByteBufferInputStream {
+
+  Collection<ByteBufferInputStream> collection;
+  Iterator<ByteBufferInputStream> iterator;
+  ByteBufferInputStream current;
+  long position = 0;
+
+  @Override
+  public String toString() {
+    return "SequenceByteBufferInputStream{" +
+      "collection=" + collection +
+      ", current=" + current +
+      ", position=" + position +
+      '}';
+  }
+
+  public SequenceByteBufferInputStream(Collection<ByteBufferInputStream> collection) {
+    this.collection = collection;
+    iterator = collection.iterator();
+    current = iterator.hasNext() ? iterator.next() : null;
+    if (current == null) {
+      throw new UnsupportedOperationException(
+        "Initializing SequenceByteBufferInputStream with an empty collection is not supported");
+    }
+  }
+
+  @Override
+  public long position() {
+    return position;
+  }
+
+  @Override
+  public int read(ByteBuffer out) {

Review Comment:
   are you 100% confident that all uses of read() in parquet are in a single thread? it is not unknown for apps to read across threads, even though the java APIs say "don't"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org