You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/09/08 18:51:51 UTC

[GitHub] [beam] danielxjd opened a new pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

danielxjd opened a new pull request #12786:
URL: https://github.com/apache/beam/pull/12786


   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] danielxjd commented on a change in pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

Posted by GitBox <gi...@apache.org>.
danielxjd commented on a change in pull request #12786:
URL: https://github.com/apache/beam/pull/12786#discussion_r485262811



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -336,36 +388,41 @@ public void processElement(
                 + tracker.currentRestriction().getFrom()
                 + " to "
                 + tracker.currentRestriction().getTo());
-        ParquetReadOptions options = HadoopReadOptions.builder(getConfWithModelClass()).build();
-        ParquetFileReader reader =
-            ParquetFileReader.open(new BeamParquetInputFile(file.openSeekable()), options);
+        Configuration conf = getConfWithModelClass();
         GenericData model = null;
         if (modelClass != null) {
           model = (GenericData) modelClass.getMethod("get").invoke(null);
         }
-        ReadSupport<GenericRecord> readSupport = new AvroReadSupport<GenericRecord>(model);
-
+        AvroReadSupport<GenericRecord> readSupport = new AvroReadSupport<GenericRecord>(model);
+        if (requestSchemaString != null) {
+          AvroReadSupport.setRequestedProjection(
+              conf, new Schema.Parser().parse(requestSchemaString));
+        }
+        ParquetReadOptions options = HadoopReadOptions.builder(conf).build();
+        ParquetFileReader reader =
+            ParquetFileReader.open(new BeamParquetInputFile(file.openSeekable()), options);
         Filter filter = checkNotNull(options.getRecordFilter(), "filter");
         Configuration hadoopConf = ((HadoopReadOptions) options).getConf();
+        for (String property : options.getPropertyNames()) {
+          hadoopConf.set(property, options.getProperty(property));
+        }
         FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData();
         MessageType fileSchema = parquetFileMetadata.getSchema();
         Map<String, String> fileMetadata = parquetFileMetadata.getKeyValueMetaData();
-
         ReadSupport.ReadContext readContext =
             readSupport.init(
                 new InitContext(
                     hadoopConf, Maps.transformValues(fileMetadata, ImmutableSet::of), fileSchema));
         ColumnIOFactory columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
-        MessageType requestedSchema = readContext.getRequestedSchema();
+
         RecordMaterializer<GenericRecord> recordConverter =
             readSupport.prepareForRead(hadoopConf, fileMetadata, fileSchema, readContext);
-        reader.setRequestedSchema(requestedSchema);

Review comment:
       Maybe I should leave this line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] danielxjd commented on a change in pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

Posted by GitBox <gi...@apache.org>.
danielxjd commented on a change in pull request #12786:
URL: https://github.com/apache/beam/pull/12786#discussion_r485260498



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -336,36 +388,41 @@ public void processElement(
                 + tracker.currentRestriction().getFrom()
                 + " to "
                 + tracker.currentRestriction().getTo());
-        ParquetReadOptions options = HadoopReadOptions.builder(getConfWithModelClass()).build();
-        ParquetFileReader reader =
-            ParquetFileReader.open(new BeamParquetInputFile(file.openSeekable()), options);
+        Configuration conf = getConfWithModelClass();
         GenericData model = null;
         if (modelClass != null) {
           model = (GenericData) modelClass.getMethod("get").invoke(null);
         }
-        ReadSupport<GenericRecord> readSupport = new AvroReadSupport<GenericRecord>(model);
-
+        AvroReadSupport<GenericRecord> readSupport = new AvroReadSupport<GenericRecord>(model);
+        if (requestSchemaString != null) {
+          AvroReadSupport.setRequestedProjection(
+              conf, new Schema.Parser().parse(requestSchemaString));
+        }
+        ParquetReadOptions options = HadoopReadOptions.builder(conf).build();
+        ParquetFileReader reader =
+            ParquetFileReader.open(new BeamParquetInputFile(file.openSeekable()), options);
         Filter filter = checkNotNull(options.getRecordFilter(), "filter");
         Configuration hadoopConf = ((HadoopReadOptions) options).getConf();
+        for (String property : options.getPropertyNames()) {
+          hadoopConf.set(property, options.getProperty(property));
+        }
         FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData();
         MessageType fileSchema = parquetFileMetadata.getSchema();
         Map<String, String> fileMetadata = parquetFileMetadata.getKeyValueMetaData();
-
         ReadSupport.ReadContext readContext =
             readSupport.init(
                 new InitContext(
                     hadoopConf, Maps.transformValues(fileMetadata, ImmutableSet::of), fileSchema));
         ColumnIOFactory columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
-        MessageType requestedSchema = readContext.getRequestedSchema();
+
         RecordMaterializer<GenericRecord> recordConverter =
             readSupport.prepareForRead(hadoopConf, fileMetadata, fileSchema, readContext);
-        reader.setRequestedSchema(requestedSchema);

Review comment:
       yes, this is duplicate. Should delete this line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] danielxjd commented on a change in pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

Posted by GitBox <gi...@apache.org>.
danielxjd commented on a change in pull request #12786:
URL: https://github.com/apache/beam/pull/12786#discussion_r485264135



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -135,6 +135,16 @@
  * ...
  * }</pre>
  *
+ * <p>Reading with projection can be enabled with the projection schema as following. The
+ * projection_schema contains only the column that we would like to read and encoder_schema contains
+ * all field but with the unwanted columns changed to nullable.
+ *

Review comment:
       Added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] danielxjd commented on a change in pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

Posted by GitBox <gi...@apache.org>.
danielxjd commented on a change in pull request #12786:
URL: https://github.com/apache/beam/pull/12786#discussion_r485779725



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -135,6 +135,16 @@
  * ...
  * }</pre>
  *
+ * <p>Reading with projection can be enabled with the projection schema as following. The
+ * projection_schema contains only the column that we would like to read and encoder_schema contains
+ * all field but with the unwanted columns changed to nullable.
+ *

Review comment:
       The improvement is not as significant, since the processing time saved is only the time to read the unwanted columns, the reader will still go over the entire data set since data for each column in a row is stored interleaved. So the test showed that there is only a slight reduction in the reading time. For the memory, it is hard for me to track with the Dataflow server.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] danielxjd commented on a change in pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

Posted by GitBox <gi...@apache.org>.
danielxjd commented on a change in pull request #12786:
URL: https://github.com/apache/beam/pull/12786#discussion_r485260041



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -312,12 +362,14 @@ public ReadFiles withSplit() {
     static class SplitReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
       private Class<? extends GenericData> modelClass;
       private static final Logger LOG = LoggerFactory.getLogger(SplitReadFn.class);
+      private String requestSchemaString;

Review comment:
       Because the Schema class is not serializable so can only pass with string.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] danielxjd commented on a change in pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

Posted by GitBox <gi...@apache.org>.
danielxjd commented on a change in pull request #12786:
URL: https://github.com/apache/beam/pull/12786#discussion_r485891627



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -135,6 +135,16 @@
  * ...
  * }</pre>
  *
+ * <p>Reading with projection can be enabled with the projection schema as following. The
+ * projection_schema contains only the column that we would like to read and encoder_schema contains
+ * all field but with the unwanted columns changed to nullable.
+ *

Review comment:
       Added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] danielxjd commented on a change in pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

Posted by GitBox <gi...@apache.org>.
danielxjd commented on a change in pull request #12786:
URL: https://github.com/apache/beam/pull/12786#discussion_r485264174



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -299,9 +344,14 @@ public ReadFiles withSplit() {
     public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile> input) {
       checkNotNull(getSchema(), "Schema can not be null");
       if (isSplittable()) {
+        if (getProjection() == null) {

Review comment:
       Changed.

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -279,6 +312,10 @@ public void populateDisplayData(DisplayData.Builder builder) {
 
       abstract Builder setAvroDataModel(GenericData model);
 
+      abstract Builder setProjectionEncoder(Schema schema);
+
+      abstract Builder setProjection(Schema schema);

Review comment:
       Changed

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -269,6 +298,10 @@ public void populateDisplayData(DisplayData.Builder builder) {
 
     abstract @Nullable GenericData getAvroDataModel();
 
+    abstract @Nullable Schema getProjectionEncoder();
+
+    abstract @Nullable Schema getProjection();

Review comment:
       Changed.

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -209,6 +223,10 @@ public static ReadFiles readFiles(Schema schema) {
 
       abstract Builder setSchema(Schema schema);
 
+      abstract Builder setProjectionEncoder(Schema schema);
+
+      abstract Builder setProjection(Schema schema);

Review comment:
       Changed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] danielxjd commented on a change in pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

Posted by GitBox <gi...@apache.org>.
danielxjd commented on a change in pull request #12786:
URL: https://github.com/apache/beam/pull/12786#discussion_r485260498



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -336,36 +388,41 @@ public void processElement(
                 + tracker.currentRestriction().getFrom()
                 + " to "
                 + tracker.currentRestriction().getTo());
-        ParquetReadOptions options = HadoopReadOptions.builder(getConfWithModelClass()).build();
-        ParquetFileReader reader =
-            ParquetFileReader.open(new BeamParquetInputFile(file.openSeekable()), options);
+        Configuration conf = getConfWithModelClass();
         GenericData model = null;
         if (modelClass != null) {
           model = (GenericData) modelClass.getMethod("get").invoke(null);
         }
-        ReadSupport<GenericRecord> readSupport = new AvroReadSupport<GenericRecord>(model);
-
+        AvroReadSupport<GenericRecord> readSupport = new AvroReadSupport<GenericRecord>(model);
+        if (requestSchemaString != null) {
+          AvroReadSupport.setRequestedProjection(
+              conf, new Schema.Parser().parse(requestSchemaString));
+        }
+        ParquetReadOptions options = HadoopReadOptions.builder(conf).build();
+        ParquetFileReader reader =
+            ParquetFileReader.open(new BeamParquetInputFile(file.openSeekable()), options);
         Filter filter = checkNotNull(options.getRecordFilter(), "filter");
         Configuration hadoopConf = ((HadoopReadOptions) options).getConf();
+        for (String property : options.getPropertyNames()) {
+          hadoopConf.set(property, options.getProperty(property));
+        }
         FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData();
         MessageType fileSchema = parquetFileMetadata.getSchema();
         Map<String, String> fileMetadata = parquetFileMetadata.getKeyValueMetaData();
-
         ReadSupport.ReadContext readContext =
             readSupport.init(
                 new InitContext(
                     hadoopConf, Maps.transformValues(fileMetadata, ImmutableSet::of), fileSchema));
         ColumnIOFactory columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
-        MessageType requestedSchema = readContext.getRequestedSchema();
+
         RecordMaterializer<GenericRecord> recordConverter =
             readSupport.prepareForRead(hadoopConf, fileMetadata, fileSchema, readContext);
-        reader.setRequestedSchema(requestedSchema);

Review comment:
       yes, this is duplicate. Should delete this line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ihji commented on pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

Posted by GitBox <gi...@apache.org>.
ihji commented on pull request #12786:
URL: https://github.com/apache/beam/pull/12786#issuecomment-689787232


   @chamikaramj Do you have anything to comment?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] danielxjd commented on a change in pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

Posted by GitBox <gi...@apache.org>.
danielxjd commented on a change in pull request #12786:
URL: https://github.com/apache/beam/pull/12786#discussion_r485264309



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -194,6 +204,10 @@ public static ReadFiles readFiles(Schema schema) {
 
     abstract @Nullable Schema getSchema();
 
+    abstract @Nullable Schema getProjection();

Review comment:
       Changed.

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -135,6 +135,16 @@
  * ...
  * }</pre>
  *
+ * <p>Reading with projection can be enabled with the projection schema as following. The
+ * projection_schema contains only the column that we would like to read and encoder_schema contains
+ * all field but with the unwanted columns changed to nullable.

Review comment:
       Added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ihji merged pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

Posted by GitBox <gi...@apache.org>.
ihji merged pull request #12786:
URL: https://github.com/apache/beam/pull/12786


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ihji commented on a change in pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

Posted by GitBox <gi...@apache.org>.
ihji commented on a change in pull request #12786:
URL: https://github.com/apache/beam/pull/12786#discussion_r485884651



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -135,6 +135,20 @@
  * ...
  * }</pre>
  *
+ * <p>Reading with projection can be enabled with the projection schema as following. Splittable
+ * reading is enabled when reading with projection. The projection_schema contains only the column
+ * that we would like to read and encoder_schema contains the schema to encode the output with the
+ * unwanted columns changed to nullable. Partial reading provide increase of reading time due to

Review comment:
       increase or decrease?

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -135,6 +135,16 @@
  * ...
  * }</pre>
  *
+ * <p>Reading with projection can be enabled with the projection schema as following. The
+ * projection_schema contains only the column that we would like to read and encoder_schema contains
+ * all field but with the unwanted columns changed to nullable.
+ *

Review comment:
       Please also adding 
   ```
   Note that the improvement is not as significant though, since the processing
   time saved is only the time to read the unwanted columns, the reader will still
   go over the entire data set since data for each column in a row is stored interleaved.
   ```
   to the comments. It will help users a lot to understand how the column projection works.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] danielxjd commented on pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

Posted by GitBox <gi...@apache.org>.
danielxjd commented on pull request #12786:
URL: https://github.com/apache/beam/pull/12786#issuecomment-689812448


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] danielxjd commented on pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

Posted by GitBox <gi...@apache.org>.
danielxjd commented on pull request #12786:
URL: https://github.com/apache/beam/pull/12786#issuecomment-689089962


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] danielxjd commented on a change in pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

Posted by GitBox <gi...@apache.org>.
danielxjd commented on a change in pull request #12786:
URL: https://github.com/apache/beam/pull/12786#discussion_r485260306



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -336,36 +388,41 @@ public void processElement(
                 + tracker.currentRestriction().getFrom()
                 + " to "
                 + tracker.currentRestriction().getTo());
-        ParquetReadOptions options = HadoopReadOptions.builder(getConfWithModelClass()).build();
-        ParquetFileReader reader =
-            ParquetFileReader.open(new BeamParquetInputFile(file.openSeekable()), options);
+        Configuration conf = getConfWithModelClass();
         GenericData model = null;
         if (modelClass != null) {
           model = (GenericData) modelClass.getMethod("get").invoke(null);
         }
-        ReadSupport<GenericRecord> readSupport = new AvroReadSupport<GenericRecord>(model);
-
+        AvroReadSupport<GenericRecord> readSupport = new AvroReadSupport<GenericRecord>(model);
+        if (requestSchemaString != null) {
+          AvroReadSupport.setRequestedProjection(
+              conf, new Schema.Parser().parse(requestSchemaString));
+        }
+        ParquetReadOptions options = HadoopReadOptions.builder(conf).build();
+        ParquetFileReader reader =
+            ParquetFileReader.open(new BeamParquetInputFile(file.openSeekable()), options);
         Filter filter = checkNotNull(options.getRecordFilter(), "filter");
         Configuration hadoopConf = ((HadoopReadOptions) options).getConf();
+        for (String property : options.getPropertyNames()) {
+          hadoopConf.set(property, options.getProperty(property));

Review comment:
       You are right ,I should delete this part. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] danielxjd commented on pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

Posted by GitBox <gi...@apache.org>.
danielxjd commented on pull request #12786:
URL: https://github.com/apache/beam/pull/12786#issuecomment-689118928


   R:@ihji R:@chamikaramj 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ihji commented on a change in pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

Posted by GitBox <gi...@apache.org>.
ihji commented on a change in pull request #12786:
URL: https://github.com/apache/beam/pull/12786#discussion_r485383858



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -135,6 +135,16 @@
  * ...
  * }</pre>
  *
+ * <p>Reading with projection can be enabled with the projection schema as following. The
+ * projection_schema contains only the column that we would like to read and encoder_schema contains
+ * all field but with the unwanted columns changed to nullable.
+ *

Review comment:
       The description is still a bit vague. Can you please elaborate more on how much improvement users could expect from the column projection? For example, if the transform with the column projection only reads a half of the columns, does it also use only half of the memory and finish two times faster than the transform without the column projection? (If not, why is that?)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ihji commented on a change in pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

Posted by GitBox <gi...@apache.org>.
ihji commented on a change in pull request #12786:
URL: https://github.com/apache/beam/pull/12786#discussion_r485238785



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -135,6 +135,16 @@
  * ...
  * }</pre>
  *
+ * <p>Reading with projection can be enabled with the projection schema as following. The
+ * projection_schema contains only the column that we would like to read and encoder_schema contains
+ * all field but with the unwanted columns changed to nullable.

Review comment:
       Please also mention that `withSplit()` will be enabled automatically.

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -194,6 +204,10 @@ public static ReadFiles readFiles(Schema schema) {
 
     abstract @Nullable Schema getSchema();
 
+    abstract @Nullable Schema getProjection();

Review comment:
       I think `getProjectionSchema` represents the field better.

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -312,12 +362,14 @@ public ReadFiles withSplit() {
     static class SplitReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
       private Class<? extends GenericData> modelClass;
       private static final Logger LOG = LoggerFactory.getLogger(SplitReadFn.class);
+      private String requestSchemaString;

Review comment:
       Is there any reason to use `String` instead of `Schema`? Looks like this is referred only once.

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -336,36 +388,41 @@ public void processElement(
                 + tracker.currentRestriction().getFrom()
                 + " to "
                 + tracker.currentRestriction().getTo());
-        ParquetReadOptions options = HadoopReadOptions.builder(getConfWithModelClass()).build();
-        ParquetFileReader reader =
-            ParquetFileReader.open(new BeamParquetInputFile(file.openSeekable()), options);
+        Configuration conf = getConfWithModelClass();
         GenericData model = null;
         if (modelClass != null) {
           model = (GenericData) modelClass.getMethod("get").invoke(null);
         }
-        ReadSupport<GenericRecord> readSupport = new AvroReadSupport<GenericRecord>(model);
-
+        AvroReadSupport<GenericRecord> readSupport = new AvroReadSupport<GenericRecord>(model);
+        if (requestSchemaString != null) {
+          AvroReadSupport.setRequestedProjection(
+              conf, new Schema.Parser().parse(requestSchemaString));
+        }
+        ParquetReadOptions options = HadoopReadOptions.builder(conf).build();
+        ParquetFileReader reader =
+            ParquetFileReader.open(new BeamParquetInputFile(file.openSeekable()), options);
         Filter filter = checkNotNull(options.getRecordFilter(), "filter");
         Configuration hadoopConf = ((HadoopReadOptions) options).getConf();
+        for (String property : options.getPropertyNames()) {
+          hadoopConf.set(property, options.getProperty(property));
+        }
         FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData();
         MessageType fileSchema = parquetFileMetadata.getSchema();
         Map<String, String> fileMetadata = parquetFileMetadata.getKeyValueMetaData();
-
         ReadSupport.ReadContext readContext =
             readSupport.init(
                 new InitContext(
                     hadoopConf, Maps.transformValues(fileMetadata, ImmutableSet::of), fileSchema));
         ColumnIOFactory columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
-        MessageType requestedSchema = readContext.getRequestedSchema();
+
         RecordMaterializer<GenericRecord> recordConverter =
             readSupport.prepareForRead(hadoopConf, fileMetadata, fileSchema, readContext);
-        reader.setRequestedSchema(requestedSchema);

Review comment:
       Is this okay to be skipped?

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -209,6 +223,10 @@ public static ReadFiles readFiles(Schema schema) {
 
       abstract Builder setSchema(Schema schema);
 
+      abstract Builder setProjectionEncoder(Schema schema);
+
+      abstract Builder setProjection(Schema schema);

Review comment:
       ditto.

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -279,6 +312,10 @@ public void populateDisplayData(DisplayData.Builder builder) {
 
       abstract Builder setAvroDataModel(GenericData model);
 
+      abstract Builder setProjectionEncoder(Schema schema);
+
+      abstract Builder setProjection(Schema schema);

Review comment:
       ditto.

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -269,6 +298,10 @@ public void populateDisplayData(DisplayData.Builder builder) {
 
     abstract @Nullable GenericData getAvroDataModel();
 
+    abstract @Nullable Schema getProjectionEncoder();
+
+    abstract @Nullable Schema getProjection();

Review comment:
       ditto.

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -299,9 +344,14 @@ public ReadFiles withSplit() {
     public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile> input) {
       checkNotNull(getSchema(), "Schema can not be null");
       if (isSplittable()) {
+        if (getProjection() == null) {

Review comment:
       To minimize duplication:
   
   ```
   Schema coderSchema = getProjection() == null ? getSchema() : getProjectionEncoder(); 
   return input
                 .apply(ParDo.of(new SplitReadFn(getAvroDataModel(), getProjection())))
                 .setCoder(AvroCoder.of(coderSchema));
   ```

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -336,36 +388,41 @@ public void processElement(
                 + tracker.currentRestriction().getFrom()
                 + " to "
                 + tracker.currentRestriction().getTo());
-        ParquetReadOptions options = HadoopReadOptions.builder(getConfWithModelClass()).build();
-        ParquetFileReader reader =
-            ParquetFileReader.open(new BeamParquetInputFile(file.openSeekable()), options);
+        Configuration conf = getConfWithModelClass();
         GenericData model = null;
         if (modelClass != null) {
           model = (GenericData) modelClass.getMethod("get").invoke(null);
         }
-        ReadSupport<GenericRecord> readSupport = new AvroReadSupport<GenericRecord>(model);
-
+        AvroReadSupport<GenericRecord> readSupport = new AvroReadSupport<GenericRecord>(model);
+        if (requestSchemaString != null) {
+          AvroReadSupport.setRequestedProjection(
+              conf, new Schema.Parser().parse(requestSchemaString));
+        }
+        ParquetReadOptions options = HadoopReadOptions.builder(conf).build();
+        ParquetFileReader reader =
+            ParquetFileReader.open(new BeamParquetInputFile(file.openSeekable()), options);
         Filter filter = checkNotNull(options.getRecordFilter(), "filter");
         Configuration hadoopConf = ((HadoopReadOptions) options).getConf();
+        for (String property : options.getPropertyNames()) {
+          hadoopConf.set(property, options.getProperty(property));

Review comment:
       Just out of curiosity: `hadoopConf` is from `options`, is this necessary to set properties from `options` again?

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -135,6 +135,16 @@
  * ...
  * }</pre>
  *
+ * <p>Reading with projection can be enabled with the projection schema as following. The
+ * projection_schema contains only the column that we would like to read and encoder_schema contains
+ * all field but with the unwanted columns changed to nullable.
+ *

Review comment:
       It would be also great if we could mention what is the expected improvement by projecting columns such as better memory usage or faster reading time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] danielxjd commented on a change in pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

Posted by GitBox <gi...@apache.org>.
danielxjd commented on a change in pull request #12786:
URL: https://github.com/apache/beam/pull/12786#discussion_r485887994



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -135,6 +135,20 @@
  * ...
  * }</pre>
  *
+ * <p>Reading with projection can be enabled with the projection schema as following. Splittable
+ * reading is enabled when reading with projection. The projection_schema contains only the column
+ * that we would like to read and encoder_schema contains the schema to encode the output with the
+ * unwanted columns changed to nullable. Partial reading provide increase of reading time due to

Review comment:
       should be decrease




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org