You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/28 09:12:53 UTC

[GitHub] [beam] anantdamle opened a new pull request #13619: [BEAM-11527] Allow user defined Hadoop ReadSupport flags for ParquetReader

anantdamle opened a new pull request #13619:
URL: https://github.com/apache/beam/pull/13619


   `ParquetIO.Sink` supports user provided Configuration flags. With increasing number of flags in AvroParquetReader options.
   Making the configuration flags accessible to the user makes it more usable.
   
   For example, enable reading INT96 data.
   
   R:@iemejia can you help review.
   ------------------------
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam
 .apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.a
 pache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] anantdamle commented on a change in pull request #13619: [BEAM-11527] Allow user defined Hadoop ReadSupport flags for ParquetReader

Posted by GitBox <gi...@apache.org>.
anantdamle commented on a change in pull request #13619:
URL: https://github.com/apache/beam/pull/13619#discussion_r549561833



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -1058,6 +1128,18 @@ public GenericRecord apply(GenericRecord input) {
     private GenericRecordPassthroughFn() {}
   }
 
+  /** Returns a new Hadoop {@link Configuration} instance with provided flags. */
+  private static SerializableConfiguration makeHadoopConfigurationUsingFlags(

Review comment:
       Done, added a single test for the new method.
   
   Initially I was contemplating this alternative but had discarded to reduce touching more files.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] anantdamle commented on a change in pull request #13619: [BEAM-11527] Allow user defined Hadoop ReadSupport flags for ParquetReader

Posted by GitBox <gi...@apache.org>.
anantdamle commented on a change in pull request #13619:
URL: https://github.com/apache/beam/pull/13619#discussion_r549562209



##########
File path: sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
##########
@@ -147,7 +147,7 @@ public void testBlockTracker() throws Exception {
   public void testSplitBlockWithLimit() {
     ParquetIO.ReadFiles.SplitReadFn<GenericRecord> testFn =
         new ParquetIO.ReadFiles.SplitReadFn<>(
-            null, null, ParquetIO.GenericRecordPassthroughFn.create());
+            null, null, ParquetIO.GenericRecordPassthroughFn.create(), null);

Review comment:
       As I'm using `SerializableConfiguration#newConfiguration` it can be null.
   Do you want to me to add a test where a non-null configuration is tested?

##########
File path: sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
##########
@@ -416,6 +416,9 @@ public void testWriteAndReadwithSplitUsingReflectDataSchemaWithDataModel() {
     readPipeline.run().waitUntilFinish();
   }
 
+  @Test
+  public void testConfigurationReadFile() {}

Review comment:
       Thanks, Removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iemejia commented on a change in pull request #13619: [BEAM-11527] Allow user defined Hadoop ReadSupport flags for ParquetReader

Posted by GitBox <gi...@apache.org>.
iemejia commented on a change in pull request #13619:
URL: https://github.com/apache/beam/pull/13619#discussion_r549345130



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -311,6 +313,12 @@ public static ReadFiles readFiles(Schema schema) {
 
       abstract Builder setAvroDataModel(GenericData model);
 
+      abstract Builder setConfiguration(SerializableConfiguration configuration);
+
+      Builder setHadoopConfigurationFlags(Map<String, String> flags) {

Review comment:
       Can you please remove this method and replace its uses with `setConfiguration(makeHadoopConfiguration(...))`

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -388,6 +402,12 @@ public void populateDisplayData(DisplayData.Builder builder) {
 
       abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
 
+      abstract Builder<T> setConfiguration(SerializableConfiguration configuration);
+
+      Builder<T> setHadoopConfigurationFlags(Map<String, String> flags) {

Review comment:
       Please remove all definitions of this method and replace its uses with setConfiguration(makeHadoopConfiguration(...)) in all classes where it appears

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -564,14 +623,20 @@ public ReadFiles withSplit() {
       // Default initial splitting the file into blocks of 64MB. Unit of SPLIT_LIMIT is byte.
       private static final long SPLIT_LIMIT = 64000000;
 
+      private final SerializableConfiguration hadoopBaseConfig;

Review comment:
       rename to configuration

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -819,9 +884,15 @@ public Progress getProgress() {
 
       private final SerializableFunction<GenericRecord, T> parseFn;
 
-      ReadFn(GenericData model, SerializableFunction<GenericRecord, T> parseFn) {
+      private final SerializableConfiguration hadoopBaseConfig;

Review comment:
       rename to configuration

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -920,13 +996,7 @@ public Sink withCompressionCodec(CompressionCodecName compressionCodecName) {
 
     /** Specifies configuration to be passed into the sink's writer. */
     public Sink withConfiguration(Map<String, String> configuration) {
-      Configuration hadoopConfiguration = new Configuration();
-      for (Map.Entry<String, String> entry : configuration.entrySet()) {
-        hadoopConfiguration.set(entry.getKey(), entry.getValue());
-      }
-      return toBuilder()
-          .setConfiguration(new SerializableConfiguration(hadoopConfiguration))
-          .build();
+      return toBuilder().setConfiguration(makeHadoopConfigurationUsingFlags(configuration)).build();

Review comment:
       :+1: 

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -332,6 +340,10 @@ public Read withProjection(Schema projectionSchema, Schema encoderSchema) {
           .build();
     }
 
+    public Read withConfiguration(Map<String, String> flags) {

Review comment:
       can you please name the argument of the withConfiguration methods consistently everywhere as `configuration` instead of `flags` or `hadoopConfigFlags`

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -835,13 +906,18 @@ public void processElement(ProcessContext processContext) throws Exception {
 
         SeekableByteChannel seekableByteChannel = file.openSeekable();
 
-        AvroParquetReader.Builder builder =
-            AvroParquetReader.<GenericRecord>builder(new BeamParquetInputFile(seekableByteChannel));
+        AvroParquetReader.Builder<GenericRecord> builder =
+            AvroParquetReader.builder(new BeamParquetInputFile(seekableByteChannel));
         if (modelClass != null) {
           // all GenericData implementations have a static get method
           builder = builder.withDataModel((GenericData) modelClass.getMethod("get").invoke(null));
         }
 
+        if (hadoopBaseConfig != null) {

Review comment:
       We should probably define a default value inside of the builders (read, readFiles, parseGenericRecords, parseFilesGenericRecords)  `.setConfiguration(...)` and since we define a default value we won't need this `if`

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -1058,6 +1128,18 @@ public GenericRecord apply(GenericRecord input) {
     private GenericRecordPassthroughFn() {}
   }
 
+  /** Returns a new Hadoop {@link Configuration} instance with provided flags. */

Review comment:
       s/Hadoop {@link Configuration}/{@link SerializableConfiguration}

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -682,7 +747,7 @@ public void processElement(
       }
 
       public Configuration getConfWithModelClass() throws Exception {
-        Configuration conf = new Configuration();
+        Configuration conf = SerializableConfiguration.newConfiguration(hadoopBaseConfig);

Review comment:
       :+1: 

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -1058,6 +1128,18 @@ public GenericRecord apply(GenericRecord input) {
     private GenericRecordPassthroughFn() {}
   }
 
+  /** Returns a new Hadoop {@link Configuration} instance with provided flags. */
+  private static SerializableConfiguration makeHadoopConfigurationUsingFlags(

Review comment:
       Can we move this method into the SerializableConfiguration class and make it `public static SerializableConfiguration fromMap(Map<String, string> entries) {` 

##########
File path: sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
##########
@@ -416,6 +416,9 @@ public void testWriteAndReadwithSplitUsingReflectDataSchemaWithDataModel() {
     readPipeline.run().waitUntilFinish();
   }
 
+  @Test
+  public void testConfigurationReadFile() {}

Review comment:
       test or remove

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -532,6 +581,12 @@ public ReadFiles withProjection(Schema projectionSchema, Schema encoderSchema) {
           .setSplittable(true)
           .build();
     }
+
+    /** Specify Hadoop configuration for ParquetReader. */
+    public ReadFiles withHadoopConfiguration(Map<String, String> configurationFlags) {

Review comment:
       Rename to `withConfiguration` to be consistent with the other methods + s/configurationFlags/configuration

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -564,14 +623,20 @@ public ReadFiles withSplit() {
       // Default initial splitting the file into blocks of 64MB. Unit of SPLIT_LIMIT is byte.
       private static final long SPLIT_LIMIT = 64000000;
 
+      private final SerializableConfiguration hadoopBaseConfig;
+
       private final SerializableFunction<GenericRecord, T> parseFn;
 
       SplitReadFn(
-          GenericData model, Schema requestSchema, SerializableFunction<GenericRecord, T> parseFn) {
+          GenericData model,
+          Schema requestSchema,
+          SerializableFunction<GenericRecord, T> parseFn,
+          SerializableConfiguration hadoopBaseConfig) {

Review comment:
       rename to configuration

##########
File path: sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
##########
@@ -147,7 +147,7 @@ public void testBlockTracker() throws Exception {
   public void testSplitBlockWithLimit() {
     ParquetIO.ReadFiles.SplitReadFn<GenericRecord> testFn =
         new ParquetIO.ReadFiles.SplitReadFn<>(
-            null, null, ParquetIO.GenericRecordPassthroughFn.create());
+            null, null, ParquetIO.GenericRecordPassthroughFn.create(), null);

Review comment:
       Test with new Configuration(), this should not be nullable




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iemejia commented on a change in pull request #13619: [BEAM-11527] Allow user defined Hadoop ReadSupport flags for ParquetReader

Posted by GitBox <gi...@apache.org>.
iemejia commented on a change in pull request #13619:
URL: https://github.com/apache/beam/pull/13619#discussion_r550488704



##########
File path: sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
##########
@@ -147,7 +147,7 @@ public void testBlockTracker() throws Exception {
   public void testSplitBlockWithLimit() {
     ParquetIO.ReadFiles.SplitReadFn<GenericRecord> testFn =
         new ParquetIO.ReadFiles.SplitReadFn<>(
-            null, null, ParquetIO.GenericRecordPassthroughFn.create());
+            null, null, ParquetIO.GenericRecordPassthroughFn.create(), null);

Review comment:
       I would prefer it to not be Nullable but since this is internal I suppose we can adjust this later, on the other hand if someday Parquet finally gets rid of its Hadoop dependencies probably the null value would align better.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iemejia merged pull request #13619: [BEAM-11527] Allow user defined Hadoop ReadSupport flags for ParquetReader

Posted by GitBox <gi...@apache.org>.
iemejia merged pull request #13619:
URL: https://github.com/apache/beam/pull/13619


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] anantdamle commented on pull request #13619: [BEAM-11527] Allow user defined Hadoop ReadSupport flags for ParquetReader

Posted by GitBox <gi...@apache.org>.
anantdamle commented on pull request #13619:
URL: https://github.com/apache/beam/pull/13619#issuecomment-751941146


   INT96 would be one of the tests, though I'm curious to learn how to test for Configuration values used by a ParquetReader.
   
   More generally, I want to learn how to test for presence of values using `Mockito#verify`. I understand that as per Effective Java its not a recommended pattern, but is important for some scenarios like this one.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] anantdamle commented on a change in pull request #13619: [BEAM-11527] Allow user defined Hadoop ReadSupport flags for ParquetReader

Posted by GitBox <gi...@apache.org>.
anantdamle commented on a change in pull request #13619:
URL: https://github.com/apache/beam/pull/13619#discussion_r549561833



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -1058,6 +1128,18 @@ public GenericRecord apply(GenericRecord input) {
     private GenericRecordPassthroughFn() {}
   }
 
+  /** Returns a new Hadoop {@link Configuration} instance with provided flags. */
+  private static SerializableConfiguration makeHadoopConfigurationUsingFlags(

Review comment:
       Done. 
   Initially I was contemplating this alternative but discarded to reduce touching more files.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] anantdamle commented on a change in pull request #13619: [BEAM-11527] Allow user defined Hadoop ReadSupport flags for ParquetReader

Posted by GitBox <gi...@apache.org>.
anantdamle commented on a change in pull request #13619:
URL: https://github.com/apache/beam/pull/13619#discussion_r549561660



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -835,13 +906,18 @@ public void processElement(ProcessContext processContext) throws Exception {
 
         SeekableByteChannel seekableByteChannel = file.openSeekable();
 
-        AvroParquetReader.Builder builder =
-            AvroParquetReader.<GenericRecord>builder(new BeamParquetInputFile(seekableByteChannel));
+        AvroParquetReader.Builder<GenericRecord> builder =
+            AvroParquetReader.builder(new BeamParquetInputFile(seekableByteChannel));
         if (modelClass != null) {
           // all GenericData implementations have a static get method
           builder = builder.withDataModel((GenericData) modelClass.getMethod("get").invoke(null));
         }
 
+        if (hadoopBaseConfig != null) {

Review comment:
       looking at the [`SerializableConfiguration#91`](https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java#L91) , It seems `null` is the expected value for building a default configuration. 
   
   I've also replaced all `configuration != null` checks with `SerializableConfiguration.newConfiguration(configuration)` for consistency and to avoid `NPE`. What do you think?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org