You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2019/01/29 15:57:50 UTC

[beam] branch master updated: [BEAM-6302] Allow setting Compression Codec in ParquetIO

This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e770df  [BEAM-6302] Allow setting Compression Codec in ParquetIO
     new 47ff095  Merge pull request #7662: [BEAM-6302] Allow setting Compression Codec in ParquetIO
2e770df is described below

commit 2e770df88aed78fa1e768774633763c405bcdd26
Author: Ɓukasz Gajowy <lu...@gmail.com>
AuthorDate: Tue Jan 29 13:05:26 2019 +0100

    [BEAM-6302] Allow setting Compression Codec in ParquetIO
---
 .../org/apache/beam/sdk/io/parquet/ParquetIO.java  | 29 +++++++++++++++++++---
 1 file changed, 25 insertions(+), 4 deletions(-)

diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
index 270d6bf..0a4bdb4 100644
--- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
+++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
@@ -46,6 +46,7 @@ import org.apache.parquet.avro.AvroParquetReader;
 import org.apache.parquet.avro.AvroParquetWriter;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.io.DelegatingSeekableInputStream;
 import org.apache.parquet.io.InputFile;
 import org.apache.parquet.io.OutputFile;
@@ -91,6 +92,10 @@ import org.apache.parquet.io.SeekableInputStream;
  * a Parquet file. It can be used with the general-purpose {@link FileIO} transforms with
  * FileIO.write/writeDynamic specifically.
  *
+ * <p>By default, {@link ParquetIO.Sink} produces output files that are compressed using the {@link
+ * org.apache.parquet.format.CompressionCodec#SNAPPY}. This default can be changed or overridden
+ * using {@link ParquetIO.Sink#withCompressionCodec(CompressionCodecName)}.
+ *
  * <p>For example:
  *
  * <pre>{@code
@@ -98,7 +103,8 @@ import org.apache.parquet.io.SeekableInputStream;
  *   .apply(...) // PCollection<GenericRecord>
  *   .apply(FileIO.<GenericRecord>
  *     .write()
- *     .via(ParquetIO.sink(SCHEMA))
+ *     .via(ParquetIO.sink(SCHEMA)
+ *       .withCompression(CompressionCodecName.SNAPPY))
  *     .to("destination/path")
  * }</pre>
  *
@@ -134,7 +140,7 @@ public class ParquetIO {
     @Nullable
     abstract Schema getSchema();
 
-    abstract Builder builder();
+    abstract Builder toBuilder();
 
     @AutoValue.Builder
     abstract static class Builder {
@@ -147,7 +153,7 @@ public class ParquetIO {
 
     /** Reads from the given filename or filepattern. */
     public Read from(ValueProvider<String> filepattern) {
-      return builder().setFilepattern(filepattern).build();
+      return toBuilder().setFilepattern(filepattern).build();
     }
 
     /** Like {@link #from(ValueProvider)}. */
@@ -252,7 +258,10 @@ public class ParquetIO {
 
   /** Creates a {@link Sink} that, for use with {@link FileIO#write}. */
   public static Sink sink(Schema schema) {
-    return new AutoValue_ParquetIO_Sink.Builder().setJsonSchema(schema.toString()).build();
+    return new AutoValue_ParquetIO_Sink.Builder()
+        .setJsonSchema(schema.toString())
+        .setCompressionCodec(CompressionCodecName.SNAPPY)
+        .build();
   }
 
   /** Implementation of {@link #sink}. */
@@ -262,13 +271,24 @@ public class ParquetIO {
     @Nullable
     abstract String getJsonSchema();
 
+    abstract CompressionCodecName getCompressionCodec();
+
+    abstract Builder toBuilder();
+
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setJsonSchema(String jsonSchema);
 
+      abstract Builder setCompressionCodec(CompressionCodecName compressionCodec);
+
       abstract Sink build();
     }
 
+    /** Specifies compression codec. By default, CompressionCodecName.SNAPPY. */
+    public Sink withCompressionCodec(CompressionCodecName compressionCodecName) {
+      return toBuilder().setCompressionCodec(compressionCodecName).build();
+    }
+
     @Nullable private transient ParquetWriter<GenericRecord> writer;
 
     @Override
@@ -283,6 +303,7 @@ public class ParquetIO {
       this.writer =
           AvroParquetWriter.<GenericRecord>builder(beamParquetOutputFile)
               .withSchema(schema)
+              .withCompressionCodec(getCompressionCodec())
               .withWriteMode(OVERWRITE)
               .build();
     }