You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by sh...@apache.org on 2022/07/24 19:48:34 UTC

[parquet-mr] branch master updated: PARQUET-2134: Fix type checking in HadoopStreams.wrap (#951)

This is an automated email from the ASF dual-hosted git repository.

shangxinli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ed2dbb9b PARQUET-2134: Fix type checking in HadoopStreams.wrap (#951)
3ed2dbb9b is described below

commit 3ed2dbb9ba40d93caaa5aa3149581f2108ac2bc0
Author: Todd Gao <to...@gmail.com>
AuthorDate: Mon Jul 25 03:48:28 2022 +0800

    PARQUET-2134: Fix type checking in HadoopStreams.wrap (#951)
    
    HadoopStreams.wrap produces a wrong H2SeekableInputStream if the
    passed-in FSDataInputStream wraps another FSDataInputStream.
    
    Since [HDFS-14111](https://issues.apache.org/jira/browse/HDFS-14111) all
    input streams in the hadoop codebase which implement `ByteBufferReadable`
    return true on the StreamCapabilities probe
    `stream.hasCapability("in:readbytebuffer")`;
    those which don't are forbidden to do so.
    
    This means that on Hadoop 3.3.0+ the preferred way to probe for the API
    is to ask the stream.
    
    The StreamCapabilities probe was added in Hadoop 2.9. Along with
    making all use of `ByteBufferReadable` non-reflective, this makes
    the checks fairly straightforward.
    
    Tests verify that if a stream implements `ByteBufferReadable' then
    it will be bonded to H2SeekableInputStream, even if multiply wrapped
    by FSDataInputStreams, and that if it doesn't, it won't.
    
    Co-authored-by: Steve Loughran <st...@cloudera.com>
    
    Co-authored-by: Steve Loughran <st...@cloudera.com>
---
 .../parquet/hadoop/util/H2SeekableInputStream.java |  2 +-
 .../apache/parquet/hadoop/util/HadoopStreams.java  | 76 ++++++++++------------
 .../hadoop/util/TestHadoop2ByteBufferReads.java    | 47 +++++++++++++
 3 files changed, 81 insertions(+), 44 deletions(-)

diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
index 2994ca829..4bbbb8ed1 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
@@ -84,7 +84,7 @@ class H2SeekableInputStream extends DelegatingSeekableInputStream {
   }
 
   public static void readFully(Reader reader, ByteBuffer buf) throws IOException {
-    // unfortunately the Hadoop APIs seem to not have a 'readFully' equivalent for the byteBuffer read
+    // unfortunately the Hadoop 2 APIs do not have a 'readFully' equivalent for the byteBuffer read
     // calls. The read(ByteBuffer) call might read fewer than byteBuffer.hasRemaining() bytes. Thus we
     // have to loop to ensure we read them.
     while (buf.hasRemaining()) {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
index 40f12fefd..bafb45ad3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
@@ -19,16 +19,15 @@
 
 package org.apache.parquet.hadoop.util;
 
+import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.io.PositionOutputStream;
+import org.apache.parquet.io.SeekableInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
+import java.io.InputStream;
 import java.util.Objects;
 
 /**
@@ -38,9 +37,6 @@ public class HadoopStreams {
 
   private static final Logger LOG = LoggerFactory.getLogger(HadoopStreams.class);
 
-  private static final Class<?> byteBufferReadableClass = getReadableClass();
-  static final Constructor<SeekableInputStream> h2SeekableConstructor = getH2SeekableConstructor();
-
   /**
    * Wraps a {@link FSDataInputStream} in a {@link SeekableInputStream}
    * implementation for Parquet readers.
@@ -50,51 +46,45 @@ public class HadoopStreams {
    */
   public static SeekableInputStream wrap(FSDataInputStream stream) {
     Objects.requireNonNull(stream, "Cannot wrap a null input stream");
-    if (byteBufferReadableClass != null && h2SeekableConstructor != null &&
-        byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
-      try {
-        return h2SeekableConstructor.newInstance(stream);
-      } catch (InstantiationException | IllegalAccessException e) {
-        LOG.warn("Could not instantiate H2SeekableInputStream, falling back to byte array reads", e);
-        return new H1SeekableInputStream(stream);
-      } catch (InvocationTargetException e) {
-        throw new ParquetDecodingException(
-            "Could not instantiate H2SeekableInputStream", e.getTargetException());
-      }
+    if (isWrappedStreamByteBufferReadable(stream)) {
+      return new H2SeekableInputStream(stream);
     } else {
       return new H1SeekableInputStream(stream);
     }
   }
 
-  private static Class<?> getReadableClass() {
-    try {
-      return Class.forName("org.apache.hadoop.fs.ByteBufferReadable");
-    } catch (ClassNotFoundException | NoClassDefFoundError e) {
-      return null;
+  /**
+   * Is the inner stream byte buffer readable?
+   * The test is "the stream is not FSDataInputStream
+   * and implements ByteBufferReadable'
+   *
+   * That is: all streams which implement ByteBufferReadable
+   * other than FSDataInputStream successfuly support read(ByteBuffer).
+   * This is true for all filesytem clients the hadoop codebase.
+   *
+   * In hadoop 3.3.0+, the StreamCapabilities probe can be used to
+   * check this: only those streams which provide the read(ByteBuffer)
+   * semantics MAY return true for the probe "in:readbytebuffer";
+   * FSDataInputStream will pass the probe down to the underlying stream.
+   *
+   * @param stream stream to probe
+   * @return true if it is safe to a H2SeekableInputStream to access the data
+   */
+  private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) {
+    if (stream.hasCapability("in:readbytebuffer")) {
+      // stream is issuing the guarantee that it implements the
+      // API. Holds for all implementations in hadoop-*
+      // since Hadoop 3.3.0 (HDFS-14111).
+      return true;
     }
-  }
-
-  @SuppressWarnings("unchecked")
-  private static Class<SeekableInputStream> getH2SeekableClass() {
-    try {
-      return (Class<SeekableInputStream>) Class.forName(
-          "org.apache.parquet.hadoop.util.H2SeekableInputStream");
-    } catch (ClassNotFoundException | NoClassDefFoundError e) {
-      return null;
+    InputStream wrapped = stream.getWrappedStream();
+    if (wrapped instanceof FSDataInputStream) {
+      LOG.debug("Checking on wrapped stream {} of {} whether is ByteBufferReadable", wrapped, stream);
+      return isWrappedStreamByteBufferReadable(((FSDataInputStream) wrapped));
     }
+    return wrapped instanceof ByteBufferReadable;
   }
 
-  private static Constructor<SeekableInputStream> getH2SeekableConstructor() {
-    Class<SeekableInputStream> h2SeekableClass = getH2SeekableClass();
-    if (h2SeekableClass != null) {
-      try {
-        return h2SeekableClass.getConstructor(FSDataInputStream.class);
-      } catch (NoSuchMethodException e) {
-        return null;
-      }
-    }
-    return null;
-  }
 
   /**
    * Wraps a {@link FSDataOutputStream} in a {@link PositionOutputStream}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
index 1b1e37354..b514febcb 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
@@ -19,8 +19,10 @@
 
 package org.apache.parquet.hadoop.util;
 
+import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.parquet.hadoop.TestUtils;
+import org.apache.parquet.io.SeekableInputStream;
 import org.junit.Assert;
 import org.junit.Test;
 import java.io.EOFException;
@@ -28,6 +30,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.Callable;
 
+import static org.apache.parquet.hadoop.util.HadoopStreams.wrap;
 import static org.apache.parquet.hadoop.util.MockHadoopInputStream.TEST_ARRAY;
 
 public class TestHadoop2ByteBufferReads {
@@ -396,4 +399,48 @@ public class TestHadoop2ByteBufferReads {
     Assert.assertEquals("Buffer contents should match",
         ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
   }
+
+  @Test
+  public void testCreateStreamNoByteBufferReadable() {
+    final SeekableInputStream s = wrap(new FSDataInputStream(
+      new MockHadoopInputStream()));
+    Assert.assertTrue("Wrong wrapper: " + s,
+      s instanceof H1SeekableInputStream);
+  }
+
+  @Test
+  public void testDoubleWrapNoByteBufferReadable() {
+    final SeekableInputStream s = wrap(new FSDataInputStream(
+      new FSDataInputStream(new MockHadoopInputStream())));
+    Assert.assertTrue("Wrong wrapper: " + s,
+      s instanceof H1SeekableInputStream);
+  }
+
+  @Test
+  public void testCreateStreamWithByteBufferReadable() {
+    final SeekableInputStream s = wrap(new FSDataInputStream(
+      new MockByteBufferInputStream()));
+    Assert.assertTrue("Wrong wrapper: " + s,
+      s instanceof H2SeekableInputStream);
+  }
+
+  @Test
+  public void testDoubleWrapByteBufferReadable() {
+    final SeekableInputStream s = wrap(new FSDataInputStream(
+      new FSDataInputStream(new MockByteBufferInputStream())));
+    Assert.assertTrue("Wrong wrapper: " + s,
+      s instanceof H2SeekableInputStream);
+  }
+
+  /**
+   * Input stream which claims to implement ByteBufferReadable.
+   */
+  private static final class MockByteBufferInputStream
+    extends MockHadoopInputStream implements ByteBufferReadable {
+
+    @Override
+    public int read(final ByteBuffer buf) {
+      return 0;
+    }
+  }
 }