You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/09/22 16:56:28 UTC

[GitHub] [beam] iemejia commented on a diff in pull request #23214: use avro DataFileReader to read avro container files

iemejia commented on code in PR #23214:
URL: https://github.com/apache/beam/pull/23214#discussion_r977872542


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java:
##########
@@ -561,77 +553,24 @@ private Object readResolve() throws ObjectStreamException {
    */
   @Experimental(Kind.SOURCE_SINK)
   static class AvroBlock<T> extends Block<T> {
-    private final Mode<T> mode;
-
-    // The number of records in the block.
-    private final long numRecords;
 
     // The current record in the block. Initialized in readNextRecord.
     private @Nullable T currentRecord;
 
     // The index of the current record in the block.
     private long currentRecordIndex = 0;
 
-    // A DatumReader to read records from the block.
-    private final DatumReader<?> reader;
+    private final Iterator<?> iterator;
 
-    // A BinaryDecoder used by the reader to decode records.
-    private final BinaryDecoder decoder;
+    private final SerializableFunction<GenericRecord, T> parseFn;
 
-    /**
-     * Decodes a byte array as an InputStream. The byte array may be compressed using some codec.
-     * Reads from the returned stream will result in decompressed bytes.
-     *
-     * <p>This supports the same codecs as Avro's {@link CodecFactory}, namely those defined in
-     * {@link DataFileConstants}.
-     *
-     * <ul>
-     *   <li>"snappy" : Google's Snappy compression
-     *   <li>"deflate" : deflate compression
-     *   <li>"bzip2" : Bzip2 compression
-     *   <li>"xz" : xz compression
-     *   <li>"null" (the string, not the value): Uncompressed data
-     * </ul>
-     */
-    private static InputStream decodeAsInputStream(byte[] data, String codec) throws IOException {
-      ByteArrayInputStream byteStream = new ByteArrayInputStream(data);
-      switch (codec) {
-        case DataFileConstants.SNAPPY_CODEC:
-          return new SnappyCompressorInputStream(byteStream, 1 << 16 /* Avro uses 64KB blocks */);
-        case DataFileConstants.DEFLATE_CODEC:
-          // nowrap == true: Do not expect ZLIB header or checksum, as Avro does not write them.
-          Inflater inflater = new Inflater(true);
-          return new InflaterInputStream(byteStream, inflater);
-        case DataFileConstants.XZ_CODEC:
-          return new XZCompressorInputStream(byteStream);
-        case DataFileConstants.BZIP2_CODEC:
-          return new BZip2CompressorInputStream(byteStream);
-        case DataFileConstants.NULL_CODEC:
-          return byteStream;
-        default:
-          throw new IllegalArgumentException("Unsupported codec: " + codec);
-      }
-    }
+    private final long numRecordsInBlock;

Review Comment:
   Much better name 👍 



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java:
##########
@@ -717,71 +678,25 @@ public synchronized AvroSource<T> getCurrentSource() {
     //
     // Postcondition: same as above, but for the new current (formerly next) block.
     @Override
-    public boolean readNextBlock() throws IOException {
-      long startOfNextBlock;
-      synchronized (progressLock) {

Review Comment:
   👍 



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java:
##########
@@ -669,13 +608,49 @@ public double getFractionOfBlockConsumed() {
    */
   @Experimental(Kind.SOURCE_SINK)
   public static class AvroReader<T> extends BlockBasedReader<T> {
-    // Initialized in startReading.
-    private @Nullable AvroMetadata metadata;
+
+    private static class SeekableChannelInput implements SeekableInput {

Review Comment:
   A pity this one does not exist in Avro



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org