You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2021/02/02 17:20:23 UTC
[crunch] branch master updated: CRUNCH-698: Inclusion of local
patch for AVRO-2944
This is an automated email from the ASF dual-hosted git repository.
jwills pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/crunch.git
The following commit(s) were added to refs/heads/master by this push:
new ee751f6 CRUNCH-698: Inclusion of local patch for AVRO-2944
new f75cce9 Merge pull request #34 from noslowerdna/CRUNCH-698
ee751f6 is described below
commit ee751f6ca77ea335bab0327ddb8e77f4724ed0b9
Author: Andrew Olson <ao...@cerner.com>
AuthorDate: Tue Feb 2 10:38:53 2021 -0600
CRUNCH-698: Inclusion of local patch for AVRO-2944
---
.../apache/crunch/types/avro/AvroRecordReader.java | 37 +++++++++++++++++++++-
1 file changed, 36 insertions(+), 1 deletion(-)
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
index ab2f30e..45e2130 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
@@ -17,10 +17,15 @@
*/
package org.apache.crunch.types.avro;
+import static org.apache.avro.file.DataFileConstants.MAGIC;
+
+import java.io.EOFException;
import java.io.IOException;
+import java.util.Arrays;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileReader12;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.io.DatumReader;
@@ -55,7 +60,7 @@ class AvroRecordReader<T> extends RecordReader<AvroWrapper<T>, NullWritable> {
DatumReader<T> datumReader = AvroMode
.fromConfiguration(context.getConfiguration())
.getReader(schema);
- this.reader = DataFileReader.openReader(in, datumReader);
+ this.reader = openAvroDataFileReader(in, datumReader);
reader.sync(split.getStart()); // sync to start
this.start = reader.tell();
this.end = split.getStart() + split.getLength();
@@ -108,4 +113,34 @@ class AvroRecordReader<T> extends RecordReader<AvroWrapper<T>, NullWritable> {
reader = null;
}
}
+
+ /**
+ * Local patch for AVRO-2944.
+ */
+ private static <D> FileReader<D> openAvroDataFileReader(SeekableInput in, DatumReader<D> reader) throws IOException {
+ if (in.length() < MAGIC.length)
+ throw new IOException("Not an Avro data file");
+
+ // read magic header
+ byte[] magic = new byte[MAGIC.length];
+ in.seek(0);
+ int offset = 0;
+ int length = magic.length;
+ while (length > 0) {
+ int bytesRead = in.read(magic, offset, length);
+ if (bytesRead < 0)
+ throw new EOFException("Unexpected EOF with " + length + " bytes remaining to read");
+
+ length -= bytesRead;
+ offset += bytesRead;
+ }
+ in.seek(0);
+
+ if (Arrays.equals(MAGIC, magic)) // current format
+ return new DataFileReader<>(in, reader);
+ if (Arrays.equals(new byte[] { (byte) 'O', (byte) 'b', (byte) 'j', (byte) 0 }, magic)) // 1.2 format
+ return new DataFileReader12<>(in, reader);
+
+ throw new IOException("Not an Avro data file");
+ }
}