You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by rs...@apache.org on 2020/11/02 08:43:47 UTC
[avro] branch master updated: AVRO-2944: Fix read pointer in java
reader to avoid hang on partial read input stream (#969)
This is an automated email from the ASF dual-hosted git repository.
rskraba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new 942ec8e AVRO-2944: Fix read pointer in java reader to avoid hang on partial read input stream (#969)
942ec8e is described below
commit 942ec8e0cc011ec9fcfff7ba9f5c656bb54d51d1
Author: Mick Jermsurawong <Mi...@users.noreply.github.com>
AuthorDate: Mon Nov 2 00:41:00 2020 -0800
AVRO-2944: Fix read pointer in java reader to avoid hang on partial read input stream (#969)
* Add regression test to show failure before fixing
format test
* Fix: increment counter with number of bytes read
---
.../java/org/apache/avro/file/DataFileReader.java | 10 ++--
.../java/org/apache/avro/TestDataFileReader.java | 64 +++++++++++++++++++++-
2 files changed, 68 insertions(+), 6 deletions(-)
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java
index b06105f..f027852 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java
@@ -58,7 +58,7 @@ public class DataFileReader<D> extends DataFileStream<D> implements FileReader<D
// read magic header
byte[] magic = new byte[MAGIC.length];
in.seek(0);
- for (int c = 0; c < magic.length; c = in.read(magic, c, magic.length - c)) {
+ for (int c = 0; c < magic.length; c += in.read(magic, c, magic.length - c)) {
}
in.seek(0);
@@ -92,13 +92,13 @@ public class DataFileReader<D> extends DataFileStream<D> implements FileReader<D
* Construct a reader for a file. For example,if you want to read a file
* record,you need to close the resource. You can use try-with-resource as
* follows:
- *
+ *
* <pre>
* try (FileReader<User> dataFileReader =
* DataFileReader.openReader(file,datumReader)) { //Consume the reader } catch
* (IOException e) { throw new RunTimeIOException(e,"Failed to read metadata for
* file: %s", file); }
- *
+ *
* <pre/>
*/
public DataFileReader(File file, DatumReader<D> reader) throws IOException {
@@ -109,13 +109,13 @@ public class DataFileReader<D> extends DataFileStream<D> implements FileReader<D
* Construct a reader for a file. For example,if you want to read a file
* record,you need to close the resource. You can use try-with-resource as
* follows:
- *
+ *
* <pre>
* try (FileReader<User> dataFileReader =
* DataFileReader.openReader(file,datumReader)) { //Consume the reader } catch
* (IOException e) { throw new RunTimeIOException(e,"Failed to read metadata for
* file: %s", file); }
- *
+ *
* <pre/>
*/
public DataFileReader(SeekableInput sin, DatumReader<D> reader) throws IOException {
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestDataFileReader.java b/lang/java/avro/src/test/java/org/apache/avro/TestDataFileReader.java
index a4e1043..c222685 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/TestDataFileReader.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestDataFileReader.java
@@ -28,13 +28,16 @@ import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.nio.file.Files;
import java.nio.file.Path;
+import com.sun.management.UnixOperatingSystemMXBean;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableFileInput;
+import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.junit.Test;
-import com.sun.management.UnixOperatingSystemMXBean;
@SuppressWarnings("restriction")
public class TestDataFileReader {
@@ -70,6 +73,65 @@ public class TestDataFileReader {
}
@Test
+ // regression test for bug AVRO-2944
+ public void testThrottledInputStream() throws IOException {
+ // AVRO-2944 describes hanging/failure in reading Avro file with performing
+ // magic header check. This happens with throttled input stream,
+ // where we read into buffer less bytes than requested.
+
+ Schema legacySchema = new Schema.Parser().setValidate(false).setValidateDefaults(false)
+ .parse("{\"type\": \"record\", \"name\": \"TestSchema\", \"fields\": "
+ + "[ {\"name\": \"id\", \"type\": [\"long\", \"null\"], \"default\": null}]}");
+ File f = Files.createTempFile("testThrottledInputStream", ".avro").toFile();
+ try (DataFileWriter<?> w = new DataFileWriter<>(new GenericDatumWriter<>())) {
+ w.create(legacySchema, f);
+ w.flush();
+ }
+
+ // Without checking for magic header, throttled input has no effect
+ FileReader r = new DataFileReader(throttledInputStream(f), new GenericDatumReader<>());
+ assertEquals("TestSchema", r.getSchema().getName());
+
+ // With checking for magic header, throttled input should pass too.
+ FileReader r2 = DataFileReader.openReader(throttledInputStream(f), new GenericDatumReader<>());
+ assertEquals("TestSchema", r2.getSchema().getName());
+ }
+
+ private SeekableInput throttledInputStream(File f) throws IOException {
+ SeekableFileInput input = new SeekableFileInput(f);
+ return new SeekableInput() {
+ @Override
+ public void close() throws IOException {
+ input.close();
+ }
+
+ @Override
+ public void seek(long p) throws IOException {
+ input.seek(p);
+ }
+
+ @Override
+ public long tell() throws IOException {
+ return input.tell();
+ }
+
+ @Override
+ public long length() throws IOException {
+ return input.length();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (len == 1) {
+ return input.read(b, off, len);
+ } else {
+ return input.read(b, off, len - 1);
+ }
+ }
+ };
+ }
+
+ @Test
public void testIgnoreSchemaValidationOnRead() throws IOException {
// This schema has an accent in the name and the default for the field doesn't
// match the first type in the union. A Java SDK in the past could create a file