You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by rs...@apache.org on 2020/11/02 08:43:47 UTC

[avro] branch master updated: AVRO-2944: Fix read pointer in java reader to avoid hang on partial read input stream (#969)

This is an automated email from the ASF dual-hosted git repository.

rskraba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new 942ec8e  AVRO-2944: Fix read pointer in java reader to avoid hang on partial read input stream (#969)
942ec8e is described below

commit 942ec8e0cc011ec9fcfff7ba9f5c656bb54d51d1
Author: Mick Jermsurawong <Mi...@users.noreply.github.com>
AuthorDate: Mon Nov 2 00:41:00 2020 -0800

    AVRO-2944: Fix read pointer in java reader to avoid hang on partial read input stream (#969)
    
    * Add regression test to show failure before fixing
    
    format test
    
    * Fix: increment counter with number of bytes read
---
 .../java/org/apache/avro/file/DataFileReader.java  | 10 ++--
 .../java/org/apache/avro/TestDataFileReader.java   | 64 +++++++++++++++++++++-
 2 files changed, 68 insertions(+), 6 deletions(-)

diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java
index b06105f..f027852 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java
@@ -58,7 +58,7 @@ public class DataFileReader<D> extends DataFileStream<D> implements FileReader<D
     // read magic header
     byte[] magic = new byte[MAGIC.length];
     in.seek(0);
-    for (int c = 0; c < magic.length; c = in.read(magic, c, magic.length - c)) {
+    for (int c = 0; c < magic.length; c += in.read(magic, c, magic.length - c)) {
     }
     in.seek(0);
 
@@ -92,13 +92,13 @@ public class DataFileReader<D> extends DataFileStream<D> implements FileReader<D
    * Construct a reader for a file. For example,if you want to read a file
    * record,you need to close the resource. You can use try-with-resource as
    * follows:
-   * 
+   *
    * <pre>
    * try (FileReader<User> dataFileReader =
    * DataFileReader.openReader(file,datumReader)) { //Consume the reader } catch
    * (IOException e) { throw new RunTimeIOException(e,"Failed to read metadata for
    * file: %s", file); }
-   * 
+   *
    * <pre/>
    */
   public DataFileReader(File file, DatumReader<D> reader) throws IOException {
@@ -109,13 +109,13 @@ public class DataFileReader<D> extends DataFileStream<D> implements FileReader<D
    * Construct a reader for a file. For example,if you want to read a file
    * record,you need to close the resource. You can use try-with-resource as
    * follows:
-   * 
+   *
    * <pre>
    * try (FileReader<User> dataFileReader =
    * DataFileReader.openReader(file,datumReader)) { //Consume the reader } catch
    * (IOException e) { throw new RunTimeIOException(e,"Failed to read metadata for
    * file: %s", file); }
-   * 
+   *
    * <pre/>
    */
   public DataFileReader(SeekableInput sin, DatumReader<D> reader) throws IOException {
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestDataFileReader.java b/lang/java/avro/src/test/java/org/apache/avro/TestDataFileReader.java
index a4e1043..c222685 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/TestDataFileReader.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestDataFileReader.java
@@ -28,13 +28,16 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.OperatingSystemMXBean;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import com.sun.management.UnixOperatingSystemMXBean;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableFileInput;
+import org.apache.avro.file.SeekableInput;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.junit.Test;
-import com.sun.management.UnixOperatingSystemMXBean;
 
 @SuppressWarnings("restriction")
 public class TestDataFileReader {
@@ -70,6 +73,65 @@ public class TestDataFileReader {
   }
 
   @Test
+  // regression test for bug AVRO-2944
+  public void testThrottledInputStream() throws IOException {
+    // AVRO-2944 describes hanging/failure in reading Avro file with performing
+    // magic header check. This happens with throttled input stream,
+    // where we read into buffer less bytes than requested.
+
+    Schema legacySchema = new Schema.Parser().setValidate(false).setValidateDefaults(false)
+        .parse("{\"type\": \"record\", \"name\": \"TestSchema\", \"fields\": "
+            + "[ {\"name\": \"id\", \"type\": [\"long\", \"null\"], \"default\": null}]}");
+    File f = Files.createTempFile("testThrottledInputStream", ".avro").toFile();
+    try (DataFileWriter<?> w = new DataFileWriter<>(new GenericDatumWriter<>())) {
+      w.create(legacySchema, f);
+      w.flush();
+    }
+
+    // Without checking for magic header, throttled input has no effect
+    FileReader r = new DataFileReader(throttledInputStream(f), new GenericDatumReader<>());
+    assertEquals("TestSchema", r.getSchema().getName());
+
+    // With checking for magic header, throttled input should pass too.
+    FileReader r2 = DataFileReader.openReader(throttledInputStream(f), new GenericDatumReader<>());
+    assertEquals("TestSchema", r2.getSchema().getName());
+  }
+
+  private SeekableInput throttledInputStream(File f) throws IOException {
+    SeekableFileInput input = new SeekableFileInput(f);
+    return new SeekableInput() {
+      @Override
+      public void close() throws IOException {
+        input.close();
+      }
+
+      @Override
+      public void seek(long p) throws IOException {
+        input.seek(p);
+      }
+
+      @Override
+      public long tell() throws IOException {
+        return input.tell();
+      }
+
+      @Override
+      public long length() throws IOException {
+        return input.length();
+      }
+
+      @Override
+      public int read(byte[] b, int off, int len) throws IOException {
+        if (len == 1) {
+          return input.read(b, off, len);
+        } else {
+          return input.read(b, off, len - 1);
+        }
+      }
+    };
+  }
+
+  @Test
   public void testIgnoreSchemaValidationOnRead() throws IOException {
     // This schema has an accent in the name and the default for the field doesn't
     // match the first type in the union. A Java SDK in the past could create a file