You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2019/01/29 15:57:50 UTC
[beam] branch master updated: [BEAM-6302] Allow setting Compression
Codec in ParquetIO
This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2e770df [BEAM-6302] Allow setting Compression Codec in ParquetIO
new 47ff095 Merge pull request #7662: [BEAM-6302] Allow setting Compression Codec in ParquetIO
2e770df is described below
commit 2e770df88aed78fa1e768774633763c405bcdd26
Author: Ćukasz Gajowy <lu...@gmail.com>
AuthorDate: Tue Jan 29 13:05:26 2019 +0100
[BEAM-6302] Allow setting Compression Codec in ParquetIO
---
.../org/apache/beam/sdk/io/parquet/ParquetIO.java | 29 +++++++++++++++++++---
1 file changed, 25 insertions(+), 4 deletions(-)
diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
index 270d6bf..0a4bdb4 100644
--- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
+++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
@@ -46,6 +46,7 @@ import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.DelegatingSeekableInputStream;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.OutputFile;
@@ -91,6 +92,10 @@ import org.apache.parquet.io.SeekableInputStream;
* a Parquet file. It can be used with the general-purpose {@link FileIO} transforms with
* FileIO.write/writeDynamic specifically.
*
+ * <p>By default, {@link ParquetIO.Sink} produces output files that are compressed using the {@link
+ * org.apache.parquet.format.CompressionCodec#SNAPPY}. This default can be changed or overridden
+ * using {@link ParquetIO.Sink#withCompressionCodec(CompressionCodecName)}.
+ *
* <p>For example:
*
* <pre>{@code
@@ -98,7 +103,8 @@ import org.apache.parquet.io.SeekableInputStream;
* .apply(...) // PCollection<GenericRecord>
* .apply(FileIO.<GenericRecord>
* .write()
- * .via(ParquetIO.sink(SCHEMA))
+ * .via(ParquetIO.sink(SCHEMA)
+ * .withCompression(CompressionCodecName.SNAPPY))
* .to("destination/path")
* }</pre>
*
@@ -134,7 +140,7 @@ public class ParquetIO {
@Nullable
abstract Schema getSchema();
- abstract Builder builder();
+ abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
@@ -147,7 +153,7 @@ public class ParquetIO {
/** Reads from the given filename or filepattern. */
public Read from(ValueProvider<String> filepattern) {
- return builder().setFilepattern(filepattern).build();
+ return toBuilder().setFilepattern(filepattern).build();
}
/** Like {@link #from(ValueProvider)}. */
@@ -252,7 +258,10 @@ public class ParquetIO {
/** Creates a {@link Sink} that, for use with {@link FileIO#write}. */
public static Sink sink(Schema schema) {
- return new AutoValue_ParquetIO_Sink.Builder().setJsonSchema(schema.toString()).build();
+ return new AutoValue_ParquetIO_Sink.Builder()
+ .setJsonSchema(schema.toString())
+ .setCompressionCodec(CompressionCodecName.SNAPPY)
+ .build();
}
/** Implementation of {@link #sink}. */
@@ -262,13 +271,24 @@ public class ParquetIO {
@Nullable
abstract String getJsonSchema();
+ abstract CompressionCodecName getCompressionCodec();
+
+ abstract Builder toBuilder();
+
@AutoValue.Builder
abstract static class Builder {
abstract Builder setJsonSchema(String jsonSchema);
+ abstract Builder setCompressionCodec(CompressionCodecName compressionCodec);
+
abstract Sink build();
}
+ /** Specifies compression codec. By default, CompressionCodecName.SNAPPY. */
+ public Sink withCompressionCodec(CompressionCodecName compressionCodecName) {
+ return toBuilder().setCompressionCodec(compressionCodecName).build();
+ }
+
@Nullable private transient ParquetWriter<GenericRecord> writer;
@Override
@@ -283,6 +303,7 @@ public class ParquetIO {
this.writer =
AvroParquetWriter.<GenericRecord>builder(beamParquetOutputFile)
.withSchema(schema)
+ .withCompressionCodec(getCompressionCodec())
.withWriteMode(OVERWRITE)
.build();
}