You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/09/08 23:59:41 UTC

[GitHub] [beam] ihji commented on a change in pull request #12786: [BEAM-7925]Add Column Projection to ParquetIO

ihji commented on a change in pull request #12786:
URL: https://github.com/apache/beam/pull/12786#discussion_r485238785



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -135,6 +135,16 @@
  * ...
  * }</pre>
  *
+ * <p>Reading with projection can be enabled with the projection schema as following. The
+ * projection_schema contains only the column that we would like to read and encoder_schema contains
+ * all field but with the unwanted columns changed to nullable.

Review comment:
       Please also mention that `withSplit()` will be enabled automatically.

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -194,6 +204,10 @@ public static ReadFiles readFiles(Schema schema) {
 
     abstract @Nullable Schema getSchema();
 
+    abstract @Nullable Schema getProjection();

Review comment:
       I think `getProjectionSchema` represents the field better.

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -312,12 +362,14 @@ public ReadFiles withSplit() {
     static class SplitReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
       private Class<? extends GenericData> modelClass;
       private static final Logger LOG = LoggerFactory.getLogger(SplitReadFn.class);
+      private String requestSchemaString;

Review comment:
       Is there any reason to use `String` instead of `Schema`? Looks like this is referred only once.

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -336,36 +388,41 @@ public void processElement(
                 + tracker.currentRestriction().getFrom()
                 + " to "
                 + tracker.currentRestriction().getTo());
-        ParquetReadOptions options = HadoopReadOptions.builder(getConfWithModelClass()).build();
-        ParquetFileReader reader =
-            ParquetFileReader.open(new BeamParquetInputFile(file.openSeekable()), options);
+        Configuration conf = getConfWithModelClass();
         GenericData model = null;
         if (modelClass != null) {
           model = (GenericData) modelClass.getMethod("get").invoke(null);
         }
-        ReadSupport<GenericRecord> readSupport = new AvroReadSupport<GenericRecord>(model);
-
+        AvroReadSupport<GenericRecord> readSupport = new AvroReadSupport<GenericRecord>(model);
+        if (requestSchemaString != null) {
+          AvroReadSupport.setRequestedProjection(
+              conf, new Schema.Parser().parse(requestSchemaString));
+        }
+        ParquetReadOptions options = HadoopReadOptions.builder(conf).build();
+        ParquetFileReader reader =
+            ParquetFileReader.open(new BeamParquetInputFile(file.openSeekable()), options);
         Filter filter = checkNotNull(options.getRecordFilter(), "filter");
         Configuration hadoopConf = ((HadoopReadOptions) options).getConf();
+        for (String property : options.getPropertyNames()) {
+          hadoopConf.set(property, options.getProperty(property));
+        }
         FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData();
         MessageType fileSchema = parquetFileMetadata.getSchema();
         Map<String, String> fileMetadata = parquetFileMetadata.getKeyValueMetaData();
-
         ReadSupport.ReadContext readContext =
             readSupport.init(
                 new InitContext(
                     hadoopConf, Maps.transformValues(fileMetadata, ImmutableSet::of), fileSchema));
         ColumnIOFactory columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
-        MessageType requestedSchema = readContext.getRequestedSchema();
+
         RecordMaterializer<GenericRecord> recordConverter =
             readSupport.prepareForRead(hadoopConf, fileMetadata, fileSchema, readContext);
-        reader.setRequestedSchema(requestedSchema);

Review comment:
       Is this okay to be skipped?

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -209,6 +223,10 @@ public static ReadFiles readFiles(Schema schema) {
 
       abstract Builder setSchema(Schema schema);
 
+      abstract Builder setProjectionEncoder(Schema schema);
+
+      abstract Builder setProjection(Schema schema);

Review comment:
       ditto.

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -279,6 +312,10 @@ public void populateDisplayData(DisplayData.Builder builder) {
 
       abstract Builder setAvroDataModel(GenericData model);
 
+      abstract Builder setProjectionEncoder(Schema schema);
+
+      abstract Builder setProjection(Schema schema);

Review comment:
       ditto.

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -269,6 +298,10 @@ public void populateDisplayData(DisplayData.Builder builder) {
 
     abstract @Nullable GenericData getAvroDataModel();
 
+    abstract @Nullable Schema getProjectionEncoder();
+
+    abstract @Nullable Schema getProjection();

Review comment:
       ditto.

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -299,9 +344,14 @@ public ReadFiles withSplit() {
     public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile> input) {
       checkNotNull(getSchema(), "Schema can not be null");
       if (isSplittable()) {
+        if (getProjection() == null) {

Review comment:
       To minimize duplication:
   
   ```
   Schema coderSchema = getProjection() == null ? getSchema() : getProjectionEncoder(); 
   return input
                 .apply(ParDo.of(new SplitReadFn(getAvroDataModel(), getProjection())))
                 .setCoder(AvroCoder.of(coderSchema));
   ```

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -336,36 +388,41 @@ public void processElement(
                 + tracker.currentRestriction().getFrom()
                 + " to "
                 + tracker.currentRestriction().getTo());
-        ParquetReadOptions options = HadoopReadOptions.builder(getConfWithModelClass()).build();
-        ParquetFileReader reader =
-            ParquetFileReader.open(new BeamParquetInputFile(file.openSeekable()), options);
+        Configuration conf = getConfWithModelClass();
         GenericData model = null;
         if (modelClass != null) {
           model = (GenericData) modelClass.getMethod("get").invoke(null);
         }
-        ReadSupport<GenericRecord> readSupport = new AvroReadSupport<GenericRecord>(model);
-
+        AvroReadSupport<GenericRecord> readSupport = new AvroReadSupport<GenericRecord>(model);
+        if (requestSchemaString != null) {
+          AvroReadSupport.setRequestedProjection(
+              conf, new Schema.Parser().parse(requestSchemaString));
+        }
+        ParquetReadOptions options = HadoopReadOptions.builder(conf).build();
+        ParquetFileReader reader =
+            ParquetFileReader.open(new BeamParquetInputFile(file.openSeekable()), options);
         Filter filter = checkNotNull(options.getRecordFilter(), "filter");
         Configuration hadoopConf = ((HadoopReadOptions) options).getConf();
+        for (String property : options.getPropertyNames()) {
+          hadoopConf.set(property, options.getProperty(property));

Review comment:
       Just out of curiosity: `hadoopConf` is from `options`, is this necessary to set properties from `options` again?

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -135,6 +135,16 @@
  * ...
  * }</pre>
  *
+ * <p>Reading with projection can be enabled with the projection schema as following. The
+ * projection_schema contains only the column that we would like to read and encoder_schema contains
+ * all field but with the unwanted columns changed to nullable.
+ *

Review comment:
       It would be also great if we could mention what is the expected improvement by projecting columns such as better memory usage or faster reading time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org