You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/02/28 11:46:23 UTC

[GitHub] [beam] iemejia opened a new pull request #14117: [BEAM-7929] Support column projection for Parquet Tables

iemejia opened a new pull request #14117:
URL: https://github.com/apache/beam/pull/14117


   Third part of the Parquet SQL improvements saga (finally sorry for the delay)
   
   R: @TheNeuralBit 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iemejia commented on a change in pull request #14117: [BEAM-7929] Support column projection for Parquet Tables

Posted by GitBox <gi...@apache.org>.
iemejia commented on a change in pull request #14117:
URL: https://github.com/apache/beam/pull/14117#discussion_r584283424



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
+import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+@Internal
+@SuppressWarnings({"nullness"})
+class ParquetTable extends SchemaBaseBeamTable implements Serializable {
+  private final Table table;
+
+  ParquetTable(Table table) {
+    super(table.getSchema());
+    this.table = table;
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
+    Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+    return begin.apply("ParquetIORead", read).apply("ToRows", Convert.toRows());
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(
+      PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
+    final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
+    Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+    if (!fieldNames.isEmpty()) {
+      Schema projectionSchema = projectSchema(schema, fieldNames);
+      read = read.withProjection(projectionSchema, projectionSchema);

Review comment:
       Requiring two schemas is a bit odd but I will fix this in a future PR in the ParquetIO side




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #14117: [BEAM-7929] Support column projection for Parquet Tables

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #14117:
URL: https://github.com/apache/beam/pull/14117#discussion_r585142707



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
+import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+@Internal
+@SuppressWarnings({"nullness"})
+class ParquetTable extends SchemaBaseBeamTable implements Serializable {
+  private final Table table;
+
+  ParquetTable(Table table) {
+    super(table.getSchema());
+    this.table = table;
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
+    Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+    return begin.apply("ParquetIORead", read).apply("ToRows", Convert.toRows());
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(
+      PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
+    final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
+    Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+    if (!fieldNames.isEmpty()) {
+      Schema projectionSchema = projectSchema(schema, fieldNames);
+      read = read.withProjection(projectionSchema, projectionSchema);

Review comment:
       Yeah this is strange.. unfortunately it looks like it's been part of the public API for a few releases. Shouldn't be too difficult to deprecate and keep the backwards compatible version though.
   
   I feel like this API should just accept a collection of Strings representing column names, we should already have the type information right?
   
   Based on these docs I'm not sure what you have will work, it seems like the two schemas should be different: https://github.com/apache/beam/blob/07e1e02125082d9ec804428f139eb849d79a8ec8/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L150-L152




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #14117: [BEAM-7929] Support column projection for Parquet Tables

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #14117:
URL: https://github.com/apache/beam/pull/14117#discussion_r585142707



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
+import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+@Internal
+@SuppressWarnings({"nullness"})
+class ParquetTable extends SchemaBaseBeamTable implements Serializable {
+  private final Table table;
+
+  ParquetTable(Table table) {
+    super(table.getSchema());
+    this.table = table;
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
+    Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+    return begin.apply("ParquetIORead", read).apply("ToRows", Convert.toRows());
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(
+      PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
+    final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
+    Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+    if (!fieldNames.isEmpty()) {
+      Schema projectionSchema = projectSchema(schema, fieldNames);
+      read = read.withProjection(projectionSchema, projectionSchema);

Review comment:
       Yeah this is strange.. unfortunately it looks like it's been part of the public API for a few releases. Shouldn't be too difficult to deprecate and keep the backwards compatible version though.
   
   I feel like this API should just accept a collection of Strings representing column names, we should already have the type information right?
   
   Based on these docs I'm not sure what you have will work, it seems like the two schemas should be different: https://github.com/apache/beam/blob/master/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L150-L152




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iemejia commented on pull request #14117: [BEAM-7929] Support column projection for Parquet Tables

Posted by GitBox <gi...@apache.org>.
iemejia commented on pull request #14117:
URL: https://github.com/apache/beam/pull/14117#issuecomment-791271960


   Thanks @TheNeuralBit expect filter predicates 'soon'


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iemejia closed pull request #14117: [BEAM-7929] Support column projection for Parquet Tables

Posted by GitBox <gi...@apache.org>.
iemejia closed pull request #14117:
URL: https://github.com/apache/beam/pull/14117


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iemejia commented on a change in pull request #14117: [BEAM-7929] Support column projection for Parquet Tables

Posted by GitBox <gi...@apache.org>.
iemejia commented on a change in pull request #14117:
URL: https://github.com/apache/beam/pull/14117#discussion_r585448320



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
+import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+@Internal
+@SuppressWarnings({"nullness"})
+class ParquetTable extends SchemaBaseBeamTable implements Serializable {
+  private final Table table;
+
+  ParquetTable(Table table) {
+    super(table.getSchema());
+    this.table = table;
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
+    Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+    return begin.apply("ParquetIORead", read).apply("ToRows", Convert.toRows());
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(
+      PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
+    final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
+    Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+    if (!fieldNames.isEmpty()) {
+      Schema projectionSchema = projectSchema(schema, fieldNames);
+      read = read.withProjection(projectionSchema, projectionSchema);

Review comment:
       Done. I won't do the TODO since this is probably my next PR [BEAM-11908](https://issues.apache.org/jira/browse/BEAM-11908) Deprecate .withProjection from ParquetIO
   However I have a doubt I agre with you that the list of Strings (order matters) is the nicest possible API but historically on Beam we try to avoid redoing the APIs of the data sources, so I am wondering if we should not better just let users do that in the HadoopConfiguration object as they do in upstream Parquet. More details in the ticket.
   Tell me what you think and I will take care of (1) future removal or (2) extra API withProjection(conf, fields);

##########
File path: sdks/java/extensions/sql/build.gradle
##########
@@ -79,6 +79,7 @@ dependencies {
   provided project(":sdks:java:io:kafka")
   provided project(":sdks:java:io:google-cloud-platform")
   compile project(":sdks:java:io:mongodb")
+  compile library.java.avro

Review comment:
       I was obliged to do this because of the strict dependency analysis enabled now forces to make use of dependencies explicitly in every module, but Avro in particular should not matter much remember that we leak Avro from `sdks/java/core` so this should not represent any issue.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
+import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+@Internal
+@SuppressWarnings({"nullness"})
+class ParquetTable extends SchemaBaseBeamTable implements Serializable {
+  private final Table table;
+
+  ParquetTable(Table table) {
+    super(table.getSchema());
+    this.table = table;
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
+    Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+    return begin.apply("ParquetIORead", read).apply("ToRows", Convert.toRows());
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(
+      PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
+    final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
+    Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+    if (!fieldNames.isEmpty()) {
+      Schema projectionSchema = projectSchema(schema, fieldNames);
+      read = read.withProjection(projectionSchema, projectionSchema);

Review comment:
       Done. I won't do the TODO since this is probably my next PR [BEAM-11908 Deprecate .withProjection from ParquetIO](https://issues.apache.org/jira/browse/BEAM-11908). 
   I agree with about the list of Strings (order matters) as the nicest possible API but historically on Beam we try to avoid redoing the APIs of the data sources that users can do externally to the IO, so I am wondering if we should not better just let users do that in the HadoopConfiguration object as they do in upstream Parquet/Spark RDD. More details in the ticket.
   
   Tell me what you think and I will take care of (1) future removal + extra javadoc or (2) extra API withProjection(conf, fields);

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java
##########
@@ -39,19 +36,18 @@
  *   favorite_numbers ARRAY<INTEGER>
  * )
  * TYPE 'parquet'
- * LOCATION '/home/admin/users.parquet'
+ * LOCATION '/home/admin/orders/'

Review comment:
       Agree this should be up to the user to specify exactly but there is an issue, you need a 'directory'  for the write so maybe we should specify this via some property like [they discuss here for Hive](https://stackoverflow.com/questions/11269203/when-creating-an-external-table-in-hive-can-i-point-the-location-to-specific-fil)? or do you have other idea(s)?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iemejia commented on pull request #14117: [BEAM-7929] Support column projection for Parquet Tables

Posted by GitBox <gi...@apache.org>.
iemejia commented on pull request #14117:
URL: https://github.com/apache/beam/pull/14117#issuecomment-788901968


   > I'm sad to see a SchemaIOProvider implementation go, but I think it's the right thing to do for now. We need to figure out how it can support projection/predicate pushdown. (Not that hard to add the interfaces, but it's hard to figure out how they'd be useful in core Beam).
   
   I thought SchemaIOProvider's main goal was just to easily wrap IOs for SQL and it does great job for that. I took the other route because more advanced use cases were using the full Table hierarchy.
    
   > My only ask for now is that you add some tests. I think the test case in https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java will actually exercise this code since it's doing a projection. I'm not sure if there's an easy way to verify that in the test though.
   
   I am not sure how to test this in a more detailed way. The current test is indeed exercising the projection, and it covers both the correct schema of the 'projected' collection as well as the results. What we can do maybe is to augment the coverage. I can add a test that does not project and only does `SELECT * FROM...` to cover the previously existing functionality.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iemejia commented on a change in pull request #14117: [BEAM-7929] Support column projection for Parquet Tables

Posted by GitBox <gi...@apache.org>.
iemejia commented on a change in pull request #14117:
URL: https://github.com/apache/beam/pull/14117#discussion_r588129232



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
+import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+@SuppressWarnings({"nullness"})
+class ParquetTable extends SchemaBaseBeamTable implements Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(ParquetTable.class);
+
+  private final Table table;
+
+  ParquetTable(Table table) {
+    super(table.getSchema());
+    this.table = table;
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
+    Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+    return begin.apply("ParquetIORead", read).apply("ToRows", Convert.toRows());
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(
+      PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
+    final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
+    Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+    if (!fieldNames.isEmpty()) {
+      Schema projectionSchema = projectSchema(schema, fieldNames);
+      LOG.info("Projecting fields schema : " + projectionSchema.toString());
+      read = read.withProjection(projectionSchema, projectionSchema);
+    }
+    return begin.apply("ParquetIORead", read).apply("ToRows", Convert.toRows());
+  }
+
+  /** Returns a copy of the {@link Schema} without the fieldNames fields. */

Review comment:
       Thanks I will fix it on merge.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #14117: [BEAM-7929] Support column projection for Parquet Tables

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #14117:
URL: https://github.com/apache/beam/pull/14117#issuecomment-791042902


   > I thought SchemaIOProvider's main goal was just to easily wrap IOs for SQL and it does great job for that. I took the other route because more advanced use cases were using the full Table hierarchy.
   
   The main goal with SchemaIOProvider was to avoid code getting duplicated between ExternalTransform wrappers and TableProvider implementations (e.g. for JDBC). I was also hopeful that it would help move logic for "schema-aware" IOs out of the SQL extension and into the IOs themselves.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #14117: [BEAM-7929] Support column projection for Parquet Tables

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #14117:
URL: https://github.com/apache/beam/pull/14117#discussion_r585146136



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java
##########
@@ -39,19 +36,18 @@
  *   favorite_numbers ARRAY<INTEGER>
  * )
  * TYPE 'parquet'
- * LOCATION '/home/admin/users.parquet'
+ * LOCATION '/home/admin/orders/'

Review comment:
       Thanks for updating this, I guess the original version won't work since we always add a `/*` to the end for reads. This should probably just padd through the location directly instead, so the user can specify a glob if they want. Another follow-on jira I suppose.

##########
File path: sdks/java/extensions/sql/build.gradle
##########
@@ -79,6 +79,7 @@ dependencies {
   provided project(":sdks:java:io:kafka")
   provided project(":sdks:java:io:google-cloud-platform")
   compile project(":sdks:java:io:mongodb")
+  compile library.java.avro

Review comment:
       I suppose this is necessary because of the direct references to `org.apache.avro`? I think if we can push all that complexity into ParquetIO it won't be necessary, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iemejia edited a comment on pull request #14117: [BEAM-7929] Support column projection for Parquet Tables

Posted by GitBox <gi...@apache.org>.
iemejia edited a comment on pull request #14117:
URL: https://github.com/apache/beam/pull/14117#issuecomment-791271960


   Thanks @TheNeuralBit for the review, prepare yourself for the filter predicate review 'soon'


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #14117: [BEAM-7929] Support column projection for Parquet Tables

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #14117:
URL: https://github.com/apache/beam/pull/14117#discussion_r587928424



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
+import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+@SuppressWarnings({"nullness"})
+class ParquetTable extends SchemaBaseBeamTable implements Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(ParquetTable.class);
+
+  private final Table table;
+
+  ParquetTable(Table table) {
+    super(table.getSchema());
+    this.table = table;
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
+    Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+    return begin.apply("ParquetIORead", read).apply("ToRows", Convert.toRows());
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(
+      PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
+    final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
+    Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+    if (!fieldNames.isEmpty()) {
+      Schema projectionSchema = projectSchema(schema, fieldNames);
+      LOG.info("Projecting fields schema : " + projectionSchema.toString());
+      read = read.withProjection(projectionSchema, projectionSchema);
+    }
+    return begin.apply("ParquetIORead", read).apply("ToRows", Convert.toRows());
+  }
+
+  /** Returns a copy of the {@link Schema} without the fieldNames fields. */

Review comment:
       ```suggestion
     /** Returns a copy of the {@link Schema} with only the fieldNames fields. */
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #14117: [BEAM-7929] Support column projection for Parquet Tables

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #14117:
URL: https://github.com/apache/beam/pull/14117#discussion_r585143351



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
+import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+@Internal
+@SuppressWarnings({"nullness"})
+class ParquetTable extends SchemaBaseBeamTable implements Serializable {
+  private final Table table;
+
+  ParquetTable(Table table) {
+    super(table.getSchema());
+    this.table = table;
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
+    Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+    return begin.apply("ParquetIORead", read).apply("ToRows", Convert.toRows());
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(
+      PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
+    final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
+    Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+    if (!fieldNames.isEmpty()) {
+      Schema projectionSchema = projectSchema(schema, fieldNames);
+      read = read.withProjection(projectionSchema, projectionSchema);

Review comment:
       Can you file a jira and add a TODO?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iemejia edited a comment on pull request #14117: [BEAM-7929] Support column projection for Parquet Tables

Posted by GitBox <gi...@apache.org>.
iemejia edited a comment on pull request #14117:
URL: https://github.com/apache/beam/pull/14117#issuecomment-788901968


   > I'm sad to see a SchemaIOProvider implementation go, but I think it's the right thing to do for now. We need to figure out how it can support projection/predicate pushdown. (Not that hard to add the interfaces, but it's hard to figure out how they'd be useful in core Beam).
   
   I thought SchemaIOProvider's main goal was just to easily wrap IOs for SQL and it does great job for that. I took the other route because more advanced use cases were using the full Table hierarchy.
    
   > My only ask for now is that you add some tests. I think the test case in https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java will actually exercise this code since it's doing a projection. I'm not sure if there's an easy way to verify that in the test though.
   
   I will add tests in a second commit that cover all the possible query cases, this should cover all runtime execution paths.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iemejia commented on pull request #14117: [BEAM-7929] Support column projection for Parquet Tables

Posted by GitBox <gi...@apache.org>.
iemejia commented on pull request #14117:
URL: https://github.com/apache/beam/pull/14117#issuecomment-788938034


   Extra assertions for the other execution paths added. PTAL @TheNeuralBit 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org