You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/28 14:00:42 UTC

[GitHub] [beam] iemejia commented on a change in pull request #13619: [BEAM-11527] Allow user defined Hadoop ReadSupport flags for ParquetReader

iemejia commented on a change in pull request #13619:
URL: https://github.com/apache/beam/pull/13619#discussion_r549345130



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -311,6 +313,12 @@ public static ReadFiles readFiles(Schema schema) {
 
       abstract Builder setAvroDataModel(GenericData model);
 
+      abstract Builder setConfiguration(SerializableConfiguration configuration);
+
+      Builder setHadoopConfigurationFlags(Map<String, String> flags) {

Review comment:
       Can you please remove this method and replace its uses with `setConfiguration(makeHadoopConfiguration(...))`

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -388,6 +402,12 @@ public void populateDisplayData(DisplayData.Builder builder) {
 
       abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
 
+      abstract Builder<T> setConfiguration(SerializableConfiguration configuration);
+
+      Builder<T> setHadoopConfigurationFlags(Map<String, String> flags) {

Review comment:
       Please remove all definitions of this method and replace its uses with setConfiguration(makeHadoopConfiguration(...)) in all classes where it appears

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -564,14 +623,20 @@ public ReadFiles withSplit() {
       // Default initial splitting the file into blocks of 64MB. Unit of SPLIT_LIMIT is byte.
       private static final long SPLIT_LIMIT = 64000000;
 
+      private final SerializableConfiguration hadoopBaseConfig;

Review comment:
       rename to configuration

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -819,9 +884,15 @@ public Progress getProgress() {
 
       private final SerializableFunction<GenericRecord, T> parseFn;
 
-      ReadFn(GenericData model, SerializableFunction<GenericRecord, T> parseFn) {
+      private final SerializableConfiguration hadoopBaseConfig;

Review comment:
       rename to configuration

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -920,13 +996,7 @@ public Sink withCompressionCodec(CompressionCodecName compressionCodecName) {
 
     /** Specifies configuration to be passed into the sink's writer. */
     public Sink withConfiguration(Map<String, String> configuration) {
-      Configuration hadoopConfiguration = new Configuration();
-      for (Map.Entry<String, String> entry : configuration.entrySet()) {
-        hadoopConfiguration.set(entry.getKey(), entry.getValue());
-      }
-      return toBuilder()
-          .setConfiguration(new SerializableConfiguration(hadoopConfiguration))
-          .build();
+      return toBuilder().setConfiguration(makeHadoopConfigurationUsingFlags(configuration)).build();

Review comment:
       :+1: 

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -332,6 +340,10 @@ public Read withProjection(Schema projectionSchema, Schema encoderSchema) {
           .build();
     }
 
+    public Read withConfiguration(Map<String, String> flags) {

Review comment:
       can you please name the argument of the withConfiguration methods consistently everywhere as `configuration` instead of `flags` or `hadoopConfigFlags`

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -835,13 +906,18 @@ public void processElement(ProcessContext processContext) throws Exception {
 
         SeekableByteChannel seekableByteChannel = file.openSeekable();
 
-        AvroParquetReader.Builder builder =
-            AvroParquetReader.<GenericRecord>builder(new BeamParquetInputFile(seekableByteChannel));
+        AvroParquetReader.Builder<GenericRecord> builder =
+            AvroParquetReader.builder(new BeamParquetInputFile(seekableByteChannel));
         if (modelClass != null) {
           // all GenericData implementations have a static get method
           builder = builder.withDataModel((GenericData) modelClass.getMethod("get").invoke(null));
         }
 
+        if (hadoopBaseConfig != null) {

Review comment:
       We should probably define a default value inside of the builders (read, readFiles, parseGenericRecords, parseFilesGenericRecords)  `.setConfiguration(...)` and since we define a default value we won't need this `if`

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -1058,6 +1128,18 @@ public GenericRecord apply(GenericRecord input) {
     private GenericRecordPassthroughFn() {}
   }
 
+  /** Returns a new Hadoop {@link Configuration} instance with provided flags. */

Review comment:
       s/Hadoop {@link Configuration}/{@link SerializableConfiguration}

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -682,7 +747,7 @@ public void processElement(
       }
 
       public Configuration getConfWithModelClass() throws Exception {
-        Configuration conf = new Configuration();
+        Configuration conf = SerializableConfiguration.newConfiguration(hadoopBaseConfig);

Review comment:
       :+1: 

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -1058,6 +1128,18 @@ public GenericRecord apply(GenericRecord input) {
     private GenericRecordPassthroughFn() {}
   }
 
+  /** Returns a new Hadoop {@link Configuration} instance with provided flags. */
+  private static SerializableConfiguration makeHadoopConfigurationUsingFlags(

Review comment:
       Can we move this method into the SerializableConfiguration class and make it `public static SerializableConfiguration fromMap(Map<String, string> entries) {` 

##########
File path: sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
##########
@@ -416,6 +416,9 @@ public void testWriteAndReadwithSplitUsingReflectDataSchemaWithDataModel() {
     readPipeline.run().waitUntilFinish();
   }
 
+  @Test
+  public void testConfigurationReadFile() {}

Review comment:
       test or remove

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -532,6 +581,12 @@ public ReadFiles withProjection(Schema projectionSchema, Schema encoderSchema) {
           .setSplittable(true)
           .build();
     }
+
+    /** Specify Hadoop configuration for ParquetReader. */
+    public ReadFiles withHadoopConfiguration(Map<String, String> configurationFlags) {

Review comment:
       Rename to `withConfiguration` to be consistent with the other methods + s/configurationFlags/configuration

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -564,14 +623,20 @@ public ReadFiles withSplit() {
       // Default initial splitting the file into blocks of 64MB. Unit of SPLIT_LIMIT is byte.
       private static final long SPLIT_LIMIT = 64000000;
 
+      private final SerializableConfiguration hadoopBaseConfig;
+
       private final SerializableFunction<GenericRecord, T> parseFn;
 
       SplitReadFn(
-          GenericData model, Schema requestSchema, SerializableFunction<GenericRecord, T> parseFn) {
+          GenericData model,
+          Schema requestSchema,
+          SerializableFunction<GenericRecord, T> parseFn,
+          SerializableConfiguration hadoopBaseConfig) {

Review comment:
       rename to configuration

##########
File path: sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
##########
@@ -147,7 +147,7 @@ public void testBlockTracker() throws Exception {
   public void testSplitBlockWithLimit() {
     ParquetIO.ReadFiles.SplitReadFn<GenericRecord> testFn =
         new ParquetIO.ReadFiles.SplitReadFn<>(
-            null, null, ParquetIO.GenericRecordPassthroughFn.create());
+            null, null, ParquetIO.GenericRecordPassthroughFn.create(), null);

Review comment:
       Test with new Configuration(), this should not be nullable




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org