You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/09 15:36:07 UTC

[GitHub] [beam] sclukas77 commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

sclukas77 commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r452308746



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##########
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements Serializable {
+  protected final SchemaIO schemaIO;
 
-/** {@link ParquetTable} is a {@link BeamSqlTable}. */
-public class ParquetTable extends SchemaBaseBeamTable implements Serializable {
-  private final String filePattern;
+  private SchemaIOTableWrapper(SchemaIO schemaIO) {
+    this.schemaIO = schemaIO;
+  }
 
-  public ParquetTable(Schema beamSchema, String filePattern) {
-    super(beamSchema);
-    this.filePattern = filePattern;
+  static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) {
+    return new SchemaIOTableWrapper(schemaIO);
   }
 
   @Override
-  public PCollection<Row> buildIOReader(PBegin begin) {
-    PTransform<PCollection<GenericRecord>, PCollection<Row>> readConverter =
-        GenericRecordReadConverter.builder().beamSchema(schema).build();
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.UNBOUNDED;
+  }
 
-    return begin
-        .apply("ParquetIORead", ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern))
-        .apply("GenericRecordToRow", readConverter);
+  @Override
+  public Schema getSchema() {
+    return schemaIO.schema();
   }
 
   @Override
-  public PDone buildIOWriter(PCollection<Row> input) {
-    throw new UnsupportedOperationException("Writing to a Parquet file is not supported");
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    PTransform<PBegin, PCollection<Row>> readerTransform = schemaIO.buildReader();
+    return begin.apply(readerTransform);
   }
 
   @Override
-  public PCollection.IsBounded isBounded() {
-    return PCollection.IsBounded.BOUNDED;
+  public POutput buildIOWriter(PCollection<Row> input) {
+    PTransform<PCollection<Row>, POutput> writerTransform = schemaIO.buildWriter();
+    return input.apply(writerTransform);
   }
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-    return BeamTableStatistics.BOUNDED_UNKNOWN;
+    return BeamTableStatistics.UNBOUNDED_UNKNOWN;

Review comment:
       This relies on BeamTableStatistics which is not in the core directory. I could use schemaIO.isBounded() and instantiate the appropriate BeamTableStatistics within SchemaIOTableWrapper based on that value, which would work for pubsub/avro/parquet. But looking ahead, the getTableStatistics(..) method of other IOs such as seqgen and kafka rely on other methods within BeamTableStatistics. Do you think I should make a conversion class to BeamTableStatistics within core in anticipation of these issues or go ahead with using scheamIO.isBounded() regardless?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org