You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/08 21:38:41 UTC

[GitHub] [beam] sclukas77 opened a new pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

sclukas77 opened a new pull request #12202:
URL: https://github.com/apache/beam/pull/12202


   Implemented SchemaIO and SchemaCapableIOProvider for Avro and Parquet, shifting logic to core Beam. Created generalized table and tableprovider wrappers in Beam SQL, implementing for Pubsub, Avro, and Parquet.
   
   R:@TheNeuralBit 
   R:@robinyqiu
   
   ------------------------
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r456726616



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSchemaCapableIOProvider.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaIOProvider} for reading and writing Avro files with {@link
+ * AvroIO}.
+ */
+@Internal
+@AutoService(SchemaIOProvider.class)
+public class AvroSchemaCapableIOProvider implements SchemaIOProvider {
+  /** Returns an id that uniquely represents this IO. */
+  @Override
+  public String identifier() {
+    return "avro";
+  }
+
+  /**
+   * Returns the expected schema of the configuration object. Note this is distinct from the schema
+   * of the data source itself. No configuration expected for Avro.
+   */
+  @Override
+  public Schema configurationSchema() {
+    return Schema.builder().build();
+  }
+
+  /**
+   * Produce a SchemaIO given a String representing the data's location, the schema of the data that
+   * resides there, and some IO-specific configuration object.
+   */
+  @Override
+  public AvroSchemaIO from(String location, Row configuration, Schema dataSchema) {
+    return new AvroSchemaIO(location, dataSchema);
+  }
+
+  @Override
+  public boolean requiresDataSchema() {
+    return true;
+  }
+
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.BOUNDED;
+  }
+
+  /** An abstraction to create schema aware IOs. */
+  @Internal
+  private static class AvroSchemaIO implements SchemaIO, Serializable {
+    protected final Schema dataSchema;
+    protected final String location;
+
+    private AvroSchemaIO(String location, Schema dataSchema) {
+      this.dataSchema = dataSchema;
+      this.location = location;
+    }
+
+    @Override
+    public Schema schema() {
+      return dataSchema;
+    }
+
+    @Override
+    public PTransform<PBegin, PCollection<Row>> buildReader() {
+      return new PTransform<PBegin, PCollection<Row>>() {
+        @Override
+        public PCollection<Row> expand(PBegin begin) {
+          return begin
+              .apply(
+                  "AvroIORead",
+                  AvroIO.readGenericRecords(AvroUtils.toAvroSchema(dataSchema, null, null))

Review comment:
       This PR changes the behavior of this line, from setting the second parameter using table name to null. How will this affect the behavior of AvroIO?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on pull request #12202:
URL: https://github.com/apache/beam/pull/12202#issuecomment-659687511


   Ack. Will create a separate PR for refactoring


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r457684674



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A general {@link TableProvider} for IOs for consumption by Beam SQL.
+ *
+ * <p>Can create child classes for IOs to pass {@link #getSchemaIOProvider()} that is specific to

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r456721628



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetSchemaCapableIOProvider.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.parquet;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaIOProvider} for reading and writing parquet files with {@link
+ * ParquetIO}.
+ */
+@Internal
+@AutoService(SchemaIOProvider.class)
+public class ParquetSchemaCapableIOProvider implements SchemaIOProvider {
+  /** Returns an id that uniquely represents this IO. */
+  @Override
+  public String identifier() {
+    return "parquet";
+  }
+
+  /**
+   * Returns the expected schema of the configuration object. Note this is distinct from the schema
+   * of the data source itself. No configuration expected for parquet.
+   */
+  @Override
+  public Schema configurationSchema() {
+    return Schema.builder().build();
+  }
+
+  /**
+   * Produce a SchemaIO given a String representing the data's location, the schema of the data that
+   * resides there, and some IO-specific configuration object.
+   */
+  @Override
+  public ParquetSchemaIO from(String location, Row configuration, Schema dataSchema) {
+    return new ParquetSchemaIO(location, dataSchema);
+  }
+
+  @Override
+  public boolean requiresDataSchema() {
+    return true;
+  }
+
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.BOUNDED;
+  }
+
+  /** An abstraction to create schema aware IOs. */
+  @Internal

Review comment:
       `@Internal` not necessary here. Please check in other places as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12202:
URL: https://github.com/apache/beam/pull/12202#issuecomment-662631025


   Run SQL PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #12202:
URL: https://github.com/apache/beam/pull/12202#issuecomment-658357325


   Didn't have time to look too closely, but once `TableProvider` moved to a service loader model, SQL should not have a `provided` dependency on the specific `TableProvider` implementations. A user should depend on an artifact that performs the registration and SQL does not need any knowledge of it at all.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r452329517



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##########
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements Serializable {
+  protected final SchemaIO schemaIO;
 
-/** {@link ParquetTable} is a {@link BeamSqlTable}. */
-public class ParquetTable extends SchemaBaseBeamTable implements Serializable {
-  private final String filePattern;
+  private SchemaIOTableWrapper(SchemaIO schemaIO) {
+    this.schemaIO = schemaIO;
+  }
 
-  public ParquetTable(Schema beamSchema, String filePattern) {
-    super(beamSchema);
-    this.filePattern = filePattern;
+  static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) {
+    return new SchemaIOTableWrapper(schemaIO);
   }
 
   @Override
-  public PCollection<Row> buildIOReader(PBegin begin) {
-    PTransform<PCollection<GenericRecord>, PCollection<Row>> readConverter =
-        GenericRecordReadConverter.builder().beamSchema(schema).build();
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.UNBOUNDED;
+  }
 
-    return begin
-        .apply("ParquetIORead", ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern))
-        .apply("GenericRecordToRow", readConverter);
+  @Override
+  public Schema getSchema() {
+    return schemaIO.schema();
   }
 
   @Override
-  public PDone buildIOWriter(PCollection<Row> input) {
-    throw new UnsupportedOperationException("Writing to a Parquet file is not supported");
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    PTransform<PBegin, PCollection<Row>> readerTransform = schemaIO.buildReader();
+    return begin.apply(readerTransform);
   }
 
   @Override
-  public PCollection.IsBounded isBounded() {
-    return PCollection.IsBounded.BOUNDED;
+  public POutput buildIOWriter(PCollection<Row> input) {
+    PTransform<PCollection<Row>, POutput> writerTransform = schemaIO.buildWriter();
+    return input.apply(writerTransform);
   }
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-    return BeamTableStatistics.BOUNDED_UNKNOWN;
+    return BeamTableStatistics.UNBOUNDED_UNKNOWN;

Review comment:
       This is a little tricky though since the method is on BeamSqlTable, and not on TableProvider. One possible solution: make SchemaIOTableWrapper an inner (non-static) class of SchemaIOTableProviderWrapper. Then the method with the default logic can be on SchemaIOTableProviderWrapper, and it's sub-classes can override it if they need to.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r452432390



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A general {@link TableProvider} for IOs for consumption by Beam SQL.
+ *
+ * <p>Can create child classes for IOs to pass {@link #schemaCapableIOProvider} that is specific to
+ * the IO.
+ */
+@Internal
+@Experimental
+@AutoService(TableProvider.class)
+public abstract class SchemaCapableIOTableProviderWrapper extends InMemoryMetaTableProvider {

Review comment:
       Ah whoops! Yeah I guess the best we can do is mark it `@Internal` then.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r457684162



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A general {@link TableProvider} for IOs for consumption by Beam SQL.
+ *
+ * <p>Can create child classes for IOs to pass {@link #getSchemaIOProvider()} that is specific to
+ * the IO.
+ */
+@Internal
+@Experimental
+public abstract class SchemaCapableIOTableProviderWrapper extends InMemoryMetaTableProvider
+    implements Serializable {
+  public abstract SchemaIOProvider getSchemaIOProvider();
+
+  @Override
+  public String getTableType() {
+    return getSchemaIOProvider().identifier();
+  }
+
+  @Override
+  public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
+    JSONObject tableProperties = tableDefinition.getProperties();
+
+    try {
+      RowJson.RowJsonDeserializer deserializer =
+          RowJson.RowJsonDeserializer.forSchema(getSchemaIOProvider().configurationSchema())
+              .withNullBehavior(RowJson.RowJsonDeserializer.NullBehavior.ACCEPT_MISSING_OR_NULL);
+
+      Row configurationRow =
+          newObjectMapperWith(deserializer).readValue(tableProperties.toString(), Row.class);
+
+      SchemaIO schemaIO =
+          getSchemaIOProvider()
+              .from(tableDefinition.getLocation(), configurationRow, tableDefinition.getSchema());
+
+      return new SchemaIOTableWrapper(schemaIO);
+    } catch (InvalidConfigurationException | InvalidSchemaException e) {
+      throw new InvalidTableException(e.getMessage());
+    } catch (JsonProcessingException e) {
+      throw new AssertionError(
+          "Failed to re-parse TBLPROPERTIES JSON " + tableProperties.toString());
+    }
+  }
+
+  private BeamTableStatistics getTableStatistics(PipelineOptions options) {
+    if (isBounded().equals(PCollection.IsBounded.BOUNDED)) {
+      return BeamTableStatistics.BOUNDED_UNKNOWN;
+    }
+    return BeamTableStatistics.UNBOUNDED_UNKNOWN;
+  }
+
+  private PCollection.IsBounded isBounded() {
+    return getSchemaIOProvider().isBounded();
+  }
+
+  /** A generalized {@link Table} for IOs to create IO readers and writers. */

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #12202:
URL: https://github.com/apache/beam/pull/12202#issuecomment-661432146


   Drive by comment on commit history: clearly some of them should be squashed into one commit. I am not sure they all should be. After code review is done, it will be helpful to have a separately commit for (1) the abstraction then (2, 3, 4, ...) then implementations of the abstraction. This can enable selective cherry pick and rollback, etc.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r452432390



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A general {@link TableProvider} for IOs for consumption by Beam SQL.
+ *
+ * <p>Can create child classes for IOs to pass {@link #schemaCapableIOProvider} that is specific to
+ * the IO.
+ */
+@Internal
+@Experimental
+@AutoService(TableProvider.class)
+public abstract class SchemaCapableIOTableProviderWrapper extends InMemoryMetaTableProvider {

Review comment:
       Ah whoops! Yeah I guess the best we can do is mark it @Internal then.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on pull request #12202:
URL: https://github.com/apache/beam/pull/12202#issuecomment-659577530


   To further clarify the previous problem:
   
   It is not a bug introduced by this PR. Instead this PR unveil this existing problem that Kenn and Luke explained above (wrong usage of `TableProvider` SPI).
   
   The workaround I suggested changes the code path to the previous state, such that it will not need the unavailable module at run time (only constructor and `getTableType()` are called), which unblocks your current work but does not solve the problem itself.
   
   @sclukas77 Could you please create a JIRA issue to track the `TableProvider` problem, and assign it to yourself, or @TheNeuralBit ? I think fixing that will require quite some refactoring (you will need to move `TableProvider` and all other classes it depends on, directly and indirectly, to sdk:java:core). I think it make sense for that to be in a separate PR and not block this one.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r451872831



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A general {@link TableProvider} for IOs for consumption by Beam SQL.
+ *
+ * <p>Can create child classes for IOs to pass {@link #schemaCapableIOProvider} that is specific to
+ * the IO.
+ */
+@Internal
+@Experimental
+@AutoService(TableProvider.class)
+public abstract class SchemaCapableIOTableProviderWrapper extends InMemoryMetaTableProvider {

Review comment:
       ```suggestion
   abstract class SchemaCapableIOTableProviderWrapper extends InMemoryMetaTableProvider {
   ```
   I think this AutoService annotation is what's causing the Java PreCommit to fail. The `AutoService` annotation makes it so that a call `ServiceLoader.load(TableProvider.class)` will try to instantiate this class if it's in the classpath, and it's not possible to instantiate this since its abstract.
   
   Specifically this is the ServiceLoader call that's biting you:
   https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java#L85-L86
   
   I think we should also make this package-private




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r452308746



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##########
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements Serializable {
+  protected final SchemaIO schemaIO;
 
-/** {@link ParquetTable} is a {@link BeamSqlTable}. */
-public class ParquetTable extends SchemaBaseBeamTable implements Serializable {
-  private final String filePattern;
+  private SchemaIOTableWrapper(SchemaIO schemaIO) {
+    this.schemaIO = schemaIO;
+  }
 
-  public ParquetTable(Schema beamSchema, String filePattern) {
-    super(beamSchema);
-    this.filePattern = filePattern;
+  static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) {
+    return new SchemaIOTableWrapper(schemaIO);
   }
 
   @Override
-  public PCollection<Row> buildIOReader(PBegin begin) {
-    PTransform<PCollection<GenericRecord>, PCollection<Row>> readConverter =
-        GenericRecordReadConverter.builder().beamSchema(schema).build();
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.UNBOUNDED;
+  }
 
-    return begin
-        .apply("ParquetIORead", ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern))
-        .apply("GenericRecordToRow", readConverter);
+  @Override
+  public Schema getSchema() {
+    return schemaIO.schema();
   }
 
   @Override
-  public PDone buildIOWriter(PCollection<Row> input) {
-    throw new UnsupportedOperationException("Writing to a Parquet file is not supported");
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    PTransform<PBegin, PCollection<Row>> readerTransform = schemaIO.buildReader();
+    return begin.apply(readerTransform);
   }
 
   @Override
-  public PCollection.IsBounded isBounded() {
-    return PCollection.IsBounded.BOUNDED;
+  public POutput buildIOWriter(PCollection<Row> input) {
+    PTransform<PCollection<Row>, POutput> writerTransform = schemaIO.buildWriter();
+    return input.apply(writerTransform);
   }
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-    return BeamTableStatistics.BOUNDED_UNKNOWN;
+    return BeamTableStatistics.UNBOUNDED_UNKNOWN;

Review comment:
       This relies on BeamTableStatistics which is not in the core directory. I could use schemaIO.isBounded() and instantiate the appropriate BeamTableStatistics within SchemaIOTableWrapper based on that value, which would work for pubsub/avro/parquet. But looking ahead, the getTableStatistics(..) method of other IOs such as seqgen and kafka rely on other methods within BeamTableStatistics. Do you think I should make a conversion class to BeamTableStatistics within core in anticipation of these issues or go ahead with using scheamIO.isBounded() regardless?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r457684897



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetSchemaCapableIOProvider.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.parquet;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaIOProvider} for reading and writing parquet files with {@link
+ * ParquetIO}.
+ */
+@Internal
+@AutoService(SchemaIOProvider.class)
+public class ParquetSchemaCapableIOProvider implements SchemaIOProvider {
+  /** Returns an id that uniquely represents this IO. */
+  @Override
+  public String identifier() {
+    return "parquet";
+  }
+
+  /**
+   * Returns the expected schema of the configuration object. Note this is distinct from the schema
+   * of the data source itself. No configuration expected for parquet.
+   */
+  @Override
+  public Schema configurationSchema() {
+    return Schema.builder().build();
+  }
+
+  /**
+   * Produce a SchemaIO given a String representing the data's location, the schema of the data that
+   * resides there, and some IO-specific configuration object.
+   */
+  @Override
+  public ParquetSchemaIO from(String location, Row configuration, Schema dataSchema) {
+    return new ParquetSchemaIO(location, dataSchema);
+  }
+
+  @Override
+  public boolean requiresDataSchema() {
+    return true;
+  }
+
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.BOUNDED;
+  }
+
+  /** An abstraction to create schema aware IOs. */
+  @Internal

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r457651700



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSchemaCapableIOProvider.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaIOProvider} for reading and writing Avro files with {@link
+ * AvroIO}.
+ */
+@Internal
+@AutoService(SchemaIOProvider.class)
+public class AvroSchemaCapableIOProvider implements SchemaIOProvider {
+  /** Returns an id that uniquely represents this IO. */
+  @Override
+  public String identifier() {
+    return "avro";
+  }
+
+  /**
+   * Returns the expected schema of the configuration object. Note this is distinct from the schema
+   * of the data source itself. No configuration expected for Avro.
+   */
+  @Override
+  public Schema configurationSchema() {
+    return Schema.builder().build();
+  }
+
+  /**
+   * Produce a SchemaIO given a String representing the data's location, the schema of the data that
+   * resides there, and some IO-specific configuration object.
+   */
+  @Override
+  public AvroSchemaIO from(String location, Row configuration, Schema dataSchema) {
+    return new AvroSchemaIO(location, dataSchema);
+  }
+
+  @Override
+  public boolean requiresDataSchema() {
+    return true;
+  }
+
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.BOUNDED;
+  }
+
+  /** An abstraction to create schema aware IOs. */
+  @Internal
+  private static class AvroSchemaIO implements SchemaIO, Serializable {
+    protected final Schema dataSchema;
+    protected final String location;
+
+    private AvroSchemaIO(String location, Schema dataSchema) {
+      this.dataSchema = dataSchema;
+      this.location = location;
+    }
+
+    @Override
+    public Schema schema() {
+      return dataSchema;
+    }
+
+    @Override
+    public PTransform<PBegin, PCollection<Row>> buildReader() {
+      return new PTransform<PBegin, PCollection<Row>>() {
+        @Override
+        public PCollection<Row> expand(PBegin begin) {
+          return begin
+              .apply(
+                  "AvroIORead",
+                  AvroIO.readGenericRecords(AvroUtils.toAvroSchema(dataSchema, null, null))

Review comment:
       We changed the table name to null here because the general SchemaCapableIOTableProviderWrapper does not pass the table.getName() value to AvroSchemaCapableIOProvider. Since the toAvroSchema(..) method here has a nullable name parameter that is used to create a record, it didn't seem imperative to maintaining functionality. @TheNeuralBit What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r452314893



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A general {@link TableProvider} for IOs for consumption by Beam SQL.
+ *
+ * <p>Can create child classes for IOs to pass {@link #schemaCapableIOProvider} that is specific to
+ * the IO.
+ */
+@Internal
+@Experimental
+@AutoService(TableProvider.class)
+public abstract class SchemaCapableIOTableProviderWrapper extends InMemoryMetaTableProvider {

Review comment:
       The TableProvider stub classes for each IO are within IO specific packages. Since "subpackages" don't have access to package-private parent classes (as the tableproviderwrapper and the IO stub are within different packages), going package-private with the tableproviderwrapper doesn't seem to give access to the IO table provider stubs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r457684377



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
##########
@@ -17,63 +17,32 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;
 
-import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
-
-import com.alibaba.fastjson.JSONObject;
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.auto.service.AutoService;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
-import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
-import org.apache.beam.sdk.extensions.sql.meta.provider.InvalidTableException;
+import org.apache.beam.sdk.extensions.sql.meta.provider.SchemaCapableIOTableProviderWrapper;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubSchemaCapableIOProvider;
-import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
-import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
-import org.apache.beam.sdk.schemas.io.SchemaIO;
-import org.apache.beam.sdk.util.RowJson.RowJsonDeserializer;
-import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
 
 /**
- * {@link TableProvider} for {@link PubsubIOJsonTable} which wraps {@link PubsubIO} for consumption
- * by Beam SQL.
+ * {@link TableProvider} for {@link PubsubIO} for consumption by Beam SQL.
+ *
+ * <p>Passes the {@link PubsubSchemaCapableIOProvider} to the generalized table provider wrapper,
+ * {@link SchemaCapableIOTableProviderWrapper}, for Pubsub specific behavior.
  */
 @Internal
 @Experimental
 @AutoService(TableProvider.class)
-public class PubsubJsonTableProvider extends InMemoryMetaTableProvider {
-
+public class PubsubJsonTableProvider extends SchemaCapableIOTableProviderWrapper {
   @Override
-  public String getTableType() {
-    return "pubsub";
+  public SchemaIOProvider getSchemaIOProvider() {
+    return new PubsubSchemaCapableIOProvider();
   }
 
   @Override
-  public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
-    JSONObject tableProperties = tableDefinition.getProperties();
-    PubsubSchemaCapableIOProvider ioProvider = new PubsubSchemaCapableIOProvider();
-
-    try {
-      RowJsonDeserializer deserializer =
-          RowJsonDeserializer.forSchema(ioProvider.configurationSchema())
-              .withNullBehavior(RowJsonDeserializer.NullBehavior.ACCEPT_MISSING_OR_NULL);
-
-      Row configurationRow =
-          newObjectMapperWith(deserializer).readValue(tableProperties.toString(), Row.class);
-
-      SchemaIO pubsubSchemaIO =
-          ioProvider.from(
-              tableDefinition.getLocation(), configurationRow, tableDefinition.getSchema());
-
-      return PubsubIOJsonTable.fromSchemaIO(pubsubSchemaIO);
-    } catch (InvalidConfigurationException | InvalidSchemaException e) {
-      throw new InvalidTableException(e.getMessage());
-    } catch (JsonProcessingException e) {
-      throw new AssertionError(
-          "Failed to re-parse TBLPROPERTIES JSON " + tableProperties.toString());
-    }
+  public String getTableType() {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12202:
URL: https://github.com/apache/beam/pull/12202#issuecomment-662631227


   Whoops commits are already squashed :)
   
   I'll merge when I get CI green


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12202:
URL: https://github.com/apache/beam/pull/12202#issuecomment-661163456


   Thanks Scott, Robin, Luke and Kenn for navigating the `ServiceLoader`/`TableProvider` issue while I was out. The short-term fix to maintain the status quo in this PR LGTM.
   
   In the long-term, IOs will be able to register implementations of SchemaIO interfaces instead of TableProvider. They are in core Beam so the implementation can be in the same package as the IO.
   
   We may need to figure out something else for the medium-term though, +1 for filing a jira and assigning to me.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12202:
URL: https://github.com/apache/beam/pull/12202#issuecomment-662167402


   Run SQL_Java11 PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r452438068



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##########
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements Serializable {
+  protected final SchemaIO schemaIO;
 
-/** {@link ParquetTable} is a {@link BeamSqlTable}. */
-public class ParquetTable extends SchemaBaseBeamTable implements Serializable {
-  private final String filePattern;
+  private SchemaIOTableWrapper(SchemaIO schemaIO) {
+    this.schemaIO = schemaIO;
+  }
 
-  public ParquetTable(Schema beamSchema, String filePattern) {
-    super(beamSchema);
-    this.filePattern = filePattern;
+  static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) {
+    return new SchemaIOTableWrapper(schemaIO);
   }
 
   @Override
-  public PCollection<Row> buildIOReader(PBegin begin) {
-    PTransform<PCollection<GenericRecord>, PCollection<Row>> readConverter =
-        GenericRecordReadConverter.builder().beamSchema(schema).build();
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.UNBOUNDED;
+  }
 
-    return begin
-        .apply("ParquetIORead", ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern))
-        .apply("GenericRecordToRow", readConverter);
+  @Override
+  public Schema getSchema() {
+    return schemaIO.schema();
   }
 
   @Override
-  public PDone buildIOWriter(PCollection<Row> input) {
-    throw new UnsupportedOperationException("Writing to a Parquet file is not supported");
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    PTransform<PBegin, PCollection<Row>> readerTransform = schemaIO.buildReader();
+    return begin.apply(readerTransform);
   }
 
   @Override
-  public PCollection.IsBounded isBounded() {
-    return PCollection.IsBounded.BOUNDED;
+  public POutput buildIOWriter(PCollection<Row> input) {
+    PTransform<PCollection<Row>, POutput> writerTransform = schemaIO.buildWriter();
+    return input.apply(writerTransform);
   }
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-    return BeamTableStatistics.BOUNDED_UNKNOWN;
+    return BeamTableStatistics.UNBOUNDED_UNKNOWN;

Review comment:
       Yeah we could just make it a property on SchemaCapableIOProvider for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r452329517



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##########
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements Serializable {
+  protected final SchemaIO schemaIO;
 
-/** {@link ParquetTable} is a {@link BeamSqlTable}. */
-public class ParquetTable extends SchemaBaseBeamTable implements Serializable {
-  private final String filePattern;
+  private SchemaIOTableWrapper(SchemaIO schemaIO) {
+    this.schemaIO = schemaIO;
+  }
 
-  public ParquetTable(Schema beamSchema, String filePattern) {
-    super(beamSchema);
-    this.filePattern = filePattern;
+  static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) {
+    return new SchemaIOTableWrapper(schemaIO);
   }
 
   @Override
-  public PCollection<Row> buildIOReader(PBegin begin) {
-    PTransform<PCollection<GenericRecord>, PCollection<Row>> readConverter =
-        GenericRecordReadConverter.builder().beamSchema(schema).build();
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.UNBOUNDED;
+  }
 
-    return begin
-        .apply("ParquetIORead", ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern))
-        .apply("GenericRecordToRow", readConverter);
+  @Override
+  public Schema getSchema() {
+    return schemaIO.schema();
   }
 
   @Override
-  public PDone buildIOWriter(PCollection<Row> input) {
-    throw new UnsupportedOperationException("Writing to a Parquet file is not supported");
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    PTransform<PBegin, PCollection<Row>> readerTransform = schemaIO.buildReader();
+    return begin.apply(readerTransform);
   }
 
   @Override
-  public PCollection.IsBounded isBounded() {
-    return PCollection.IsBounded.BOUNDED;
+  public POutput buildIOWriter(PCollection<Row> input) {
+    PTransform<PCollection<Row>, POutput> writerTransform = schemaIO.buildWriter();
+    return input.apply(writerTransform);
   }
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-    return BeamTableStatistics.BOUNDED_UNKNOWN;
+    return BeamTableStatistics.UNBOUNDED_UNKNOWN;

Review comment:
       This is a little tricky though since the method is on BeamSqlTable, and not on TableProvider. One possible solution: make SchemaIOTableWrapper an inner (non-static) class of SchemaIOTableProviderWrapper. Then the method with the default logic can be on SchemaIOTableProviderWrapper, and the sub-classes will be able to override it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12202:
URL: https://github.com/apache/beam/pull/12202#issuecomment-656932229


   I looked into the test failure. I found that if I change the dependency configuration from `provided` to `compile` here it fixes the test:
   https://github.com/apache/beam/blob/65297802aaaddda66b3fda4bafb15640f0fc3530/sdks/java/extensions/sql/build.gradle#L61
   
   From the stacktrace:
   ```
   java.util.ServiceConfigurationError: org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider: Provider org.apache.beam.sdk.extensions.sql.meta.provider.parquet.ParquetTableProvider could not be instantiated
   	at java.util.ServiceLoader.fail(ServiceLoader.java:232)
   	at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
   	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
   	at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
   	at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
   	at org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchemaFactory$AllProviders.getTableProvider(BeamCalciteSchemaFactory.java:86)
   ...
   Caused by: java.lang.NoClassDefFoundError: org/apache/beam/sdk/io/parquet/ParquetSchemaCapableIOProvider
   	at org.apache.beam.sdk.extensions.sql.meta.provider.parquet.ParquetTableProvider.<init>(ParquetTableProvider.java:47)
   ```
   
   You can see the error is occurring when we try to instantiate a class from the parquet package at runtime, because the class can't be found. It looks like this may have been a problem before your PR, but it didn't come up because we just weren't exercising code that called the parquet package.
   
   TBH I don't have a great handle on the difference between these dependency configurations. My understanding of `compile` vs. `provided` is that `compile` will include the compiled java in the artifact, but `provided` assumes that it will be provided by some other jar on the classpath (useful SO answer: https://stackoverflow.com/questions/30731084/provided-dependency-in-gradle). So it seems what's happening is the parquet package is there when we compile, but nothing is adding it to the classpath when we run JdbcJarTest.
   
   I'm not sure why these IO dependencies are `provided` in the first place. I think maybe the intention is that way users can just include the IOs that they intend to use, but this seems problematic when BeamCalciteSchemaFactory is loading every TableProvider implementation: https://github.com/apache/beam/blob/65297802aaaddda66b3fda4bafb15640f0fc3530/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java#L86
   
   My suggestion would be to just make parquet a `compile`, dependency like we've already done for mongo. (cc @lukecwik and @kennknowles in case they think this is a bad idea).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r458293107



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetSchemaCapableIOProvider.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.parquet;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaIOProvider} for reading and writing parquet files with {@link
+ * ParquetIO}.
+ */
+@Internal
+@AutoService(SchemaIOProvider.class)
+public class ParquetSchemaCapableIOProvider implements SchemaIOProvider {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12202:
URL: https://github.com/apache/beam/pull/12202#issuecomment-657646138


   It seems like we are using ServiceProvider incorrectly since we are using ServiceProvider to load providers dynamically which allows `ParquetTableProvider.java` to live in a project containing (or directly depending on the project containing) `ParquetSchemaCapableIOProvider.java` and still outside of `sdks/java/extensions/sql` project. The interface that needs to be shared should be placed in a package that both `sdks/java/extensions/sql` and the IO packages can depend on.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r458202365



##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetSchemaCapableIOProvider.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.parquet;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaIOProvider} for reading and writing parquet files with {@link
+ * ParquetIO}.
+ */
+@Internal
+@AutoService(SchemaIOProvider.class)
+public class ParquetSchemaCapableIOProvider implements SchemaIOProvider {

Review comment:
       could you remove the "Capable" from all of these class names now that we've switched to `SchemaIOProvider`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r457549098



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenericRecordWriteConverter.java
##########
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.avro;
+package org.apache.beam.sdk.io;

Review comment:
       I see. I wasn't aware that `AvroIO` is in core. Then moving it here make sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r457685188



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java
##########
@@ -18,13 +18,20 @@
 package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
 
 import com.google.auto.service.AutoService;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
-import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
+import org.apache.beam.sdk.extensions.sql.meta.provider.SchemaCapableIOTableProviderWrapper;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.io.parquet.ParquetSchemaCapableIOProvider;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
 
 /**
- * {@link TableProvider} for {@link ParquetTable}.
+ * {@link TableProvider} for {@link ParquetIO} for consumption by Beam SQL.
+ *
+ * <p>Passes the {@link ParquetSchemaCapableIOProvider} to the generalized table provider wrapper,
+ * {@link SchemaCapableIOTableProviderWrapper}, for Parquet specific behavior.
+ *
+ * <p>Passes the {@link ParquetSchemaCapableIOProvider} to the generalized table provider wrapper,

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on pull request #12202:
URL: https://github.com/apache/beam/pull/12202#issuecomment-662015474


   > Drive by comment on commit history: clearly some of them should be squashed into one commit. I am not sure they all should be. After code review is done, it will be helpful to have a separately commit for (1) the abstraction then (2, 3, 4, ...) then implementations of the abstraction. This can enable selective cherry pick and rollback, etc.
   
   ACK, I'll split the commit history into the commits Brian recommended once all the changes are approved.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r452326180



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##########
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements Serializable {
+  protected final SchemaIO schemaIO;
 
-/** {@link ParquetTable} is a {@link BeamSqlTable}. */
-public class ParquetTable extends SchemaBaseBeamTable implements Serializable {
-  private final String filePattern;
+  private SchemaIOTableWrapper(SchemaIO schemaIO) {
+    this.schemaIO = schemaIO;
+  }
 
-  public ParquetTable(Schema beamSchema, String filePattern) {
-    super(beamSchema);
-    this.filePattern = filePattern;
+  static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) {
+    return new SchemaIOTableWrapper(schemaIO);
   }
 
   @Override
-  public PCollection<Row> buildIOReader(PBegin begin) {
-    PTransform<PCollection<GenericRecord>, PCollection<Row>> readConverter =
-        GenericRecordReadConverter.builder().beamSchema(schema).build();
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.UNBOUNDED;
+  }
 
-    return begin
-        .apply("ParquetIORead", ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern))
-        .apply("GenericRecordToRow", readConverter);
+  @Override
+  public Schema getSchema() {
+    return schemaIO.schema();
   }
 
   @Override
-  public PDone buildIOWriter(PCollection<Row> input) {
-    throw new UnsupportedOperationException("Writing to a Parquet file is not supported");
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    PTransform<PBegin, PCollection<Row>> readerTransform = schemaIO.buildReader();
+    return begin.apply(readerTransform);
   }
 
   @Override
-  public PCollection.IsBounded isBounded() {
-    return PCollection.IsBounded.BOUNDED;
+  public POutput buildIOWriter(PCollection<Row> input) {
+    PTransform<PCollection<Row>, POutput> writerTransform = schemaIO.buildWriter();
+    return input.apply(writerTransform);
   }
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-    return BeamTableStatistics.BOUNDED_UNKNOWN;
+    return BeamTableStatistics.UNBOUNDED_UNKNOWN;

Review comment:
       Great question. We should definitely keep the `BeamTableStatistics` in the SQL extensions for now, since it only has meaning for SQL.
   
   The reason this exists is so we can do better when optimizing SQL queries - it's for "cost-based optimization" - here's a [design doc](https://docs.google.com/document/d/1DM_bcfFbIoc_vEoqQxhC7AvHBUDVCAwToC8TYGukkII/edit) about our implementation of it. The idea is we can be smarter with how we decide to perform a SQL query if we know how "big"  each data source is (or how "fast" in the case of an unbounded source). In theory this could be useful information in core Beam as well but we're a long way from doing that kind of optimization there.
   
   I think what we should do is have a default implementation that returns either UNBOUNDED_UNKNOWN or BOUNDED_UNKNOWN based on what SchemaIO tells us it is (which will be sufficient for avro/parquet/pubsub), and then other table providers can override it if they need to.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r451889886



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##########
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements Serializable {

Review comment:
       ```suggestion
   class SchemaIOTableWrapper extends BaseBeamTable implements Serializable {
   ```
   
   I think this can be package-private. It might also make sense to make it an inner class of `SchemaIOTableProviderWrapper`, but I'll leave that up to you

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A general {@link TableProvider} for IOs for consumption by Beam SQL.
+ *
+ * <p>Can create child classes for IOs to pass {@link #schemaCapableIOProvider} that is specific to
+ * the IO.
+ */
+@Internal
+@Experimental
+@AutoService(TableProvider.class)
+public abstract class SchemaCapableIOTableProviderWrapper extends InMemoryMetaTableProvider {

Review comment:
       ```suggestion
   abstract class SchemaCapableIOTableProviderWrapper extends InMemoryMetaTableProvider {
   ```
   I think this AutoService annotation is what's causing the Java PreCommit to fail. The `AutoService` annotation makes it so that a call `ServiceLoader.load(TableProvider.class)` will try to instantiate this class if it's in the classpath, and it's not possible  
   
   Specifically this is the ServiceLoader call that's biting you:
   https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java#L85-L86
   
   I think we should also make this package-private

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSchemaCapableIOProvider.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaCapableIOProvider} for reading and writing JSON payloads with
+ * {@link AvroIO}.

Review comment:
       ```suggestion
    * An implementation of {@link SchemaCapableIOProvider} for reading and writing avro files with
    * {@link AvroIO}.
   ```

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##########
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements Serializable {
+  protected final SchemaIO schemaIO;
 
-/** {@link ParquetTable} is a {@link BeamSqlTable}. */
-public class ParquetTable extends SchemaBaseBeamTable implements Serializable {
-  private final String filePattern;
+  private SchemaIOTableWrapper(SchemaIO schemaIO) {
+    this.schemaIO = schemaIO;
+  }
 
-  public ParquetTable(Schema beamSchema, String filePattern) {
-    super(beamSchema);
-    this.filePattern = filePattern;
+  static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) {
+    return new SchemaIOTableWrapper(schemaIO);
   }
 
   @Override
-  public PCollection<Row> buildIOReader(PBegin begin) {
-    PTransform<PCollection<GenericRecord>, PCollection<Row>> readConverter =
-        GenericRecordReadConverter.builder().beamSchema(schema).build();
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.UNBOUNDED;
+  }

Review comment:
       Hmm this is actually something that will need to be different for each IO. Parquet and Avro are both bounded data sources, while pubsub is unbounded.
   
   Can you add this to the SchemaIO interface and plumb it through here?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##########
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements Serializable {
+  protected final SchemaIO schemaIO;
 
-/** {@link ParquetTable} is a {@link BeamSqlTable}. */
-public class ParquetTable extends SchemaBaseBeamTable implements Serializable {
-  private final String filePattern;
+  private SchemaIOTableWrapper(SchemaIO schemaIO) {
+    this.schemaIO = schemaIO;
+  }
 
-  public ParquetTable(Schema beamSchema, String filePattern) {
-    super(beamSchema);
-    this.filePattern = filePattern;
+  static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) {
+    return new SchemaIOTableWrapper(schemaIO);
   }
 
   @Override
-  public PCollection<Row> buildIOReader(PBegin begin) {
-    PTransform<PCollection<GenericRecord>, PCollection<Row>> readConverter =
-        GenericRecordReadConverter.builder().beamSchema(schema).build();
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.UNBOUNDED;
+  }
 
-    return begin
-        .apply("ParquetIORead", ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern))
-        .apply("GenericRecordToRow", readConverter);
+  @Override
+  public Schema getSchema() {
+    return schemaIO.schema();
   }
 
   @Override
-  public PDone buildIOWriter(PCollection<Row> input) {
-    throw new UnsupportedOperationException("Writing to a Parquet file is not supported");
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    PTransform<PBegin, PCollection<Row>> readerTransform = schemaIO.buildReader();
+    return begin.apply(readerTransform);
   }
 
   @Override
-  public PCollection.IsBounded isBounded() {
-    return PCollection.IsBounded.BOUNDED;
+  public POutput buildIOWriter(PCollection<Row> input) {
+    PTransform<PCollection<Row>, POutput> writerTransform = schemaIO.buildWriter();
+    return input.apply(writerTransform);
   }
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-    return BeamTableStatistics.BOUNDED_UNKNOWN;
+    return BeamTableStatistics.UNBOUNDED_UNKNOWN;

Review comment:
       This will also need to be different for avro/parquet vs. pubsub. It could just be determined from the same method on `SchemaIO`

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/GenericRecordReadConverter.java
##########
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.io.parquet;
 
 import com.google.auto.value.AutoValue;
 import java.io.Serializable;

Review comment:
       Can you make this class package-private? Users may be more tempted to use it now that it's outside of the SQL extensions.
   
   This could be a generally useful transform, we may want to move it outside of the parquet package and make it public... but let's not do that now.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTableProvider.java
##########
@@ -38,15 +42,10 @@
  * LOCATION '/tmp/persons.avro'
  * }</pre>
  */
-@AutoService(TableProvider.class)
-public class AvroTableProvider extends InMemoryMetaTableProvider {
-  @Override
-  public String getTableType() {
-    return "avro";
-  }
-
-  @Override
-  public BeamSqlTable buildBeamSqlTable(Table table) {
-    return new AvroTable(table.getName(), table.getSchema(), table.getLocation());
+@Internal
+@Experimental
+public class AvroTableProvider extends SchemaCapableIOTableProviderWrapper {

Review comment:
       ```suggestion
   @AutoService(TableProvider.class)
   public class AvroTableProvider extends SchemaCapableIOTableProviderWrapper {
   ```
   
   We can keep these annotations the same as they were, since this class should work exactly the same as it used to from the user perspective.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java
##########
@@ -38,15 +42,10 @@
  * LOCATION '/home/admin/users.parquet'
  * }</pre>
  */
-@AutoService(TableProvider.class)
-public class ParquetTableProvider extends InMemoryMetaTableProvider {
-  @Override
-  public String getTableType() {
-    return "parquet";
-  }
-
-  @Override
-  public BeamSqlTable buildBeamSqlTable(Table table) {
-    return new ParquetTable(table.getSchema(), table.getLocation());
+@Internal
+@Experimental
+public class ParquetTableProvider extends SchemaCapableIOTableProviderWrapper {

Review comment:
       ```suggestion
   @AutoService(TableProvider.class)
   public class ParquetTableProvider extends SchemaCapableIOTableProviderWrapper {
   ```
   Here as well

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetSchemaCapableIOProvider.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.parquet;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaCapableIOProvider} for reading and writing JSON payloads with
+ * {@link ParquetIO}.

Review comment:
       ```suggestion
    * An implementation of {@link SchemaCapableIOProvider} for reading and writing parquet files with
    * {@link ParquetIO}.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r456714938



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
##########
@@ -17,63 +17,32 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;
 
-import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
-
-import com.alibaba.fastjson.JSONObject;
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.auto.service.AutoService;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
-import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
-import org.apache.beam.sdk.extensions.sql.meta.provider.InvalidTableException;
+import org.apache.beam.sdk.extensions.sql.meta.provider.SchemaCapableIOTableProviderWrapper;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubSchemaCapableIOProvider;
-import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
-import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
-import org.apache.beam.sdk.schemas.io.SchemaIO;
-import org.apache.beam.sdk.util.RowJson.RowJsonDeserializer;
-import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
 
 /**
- * {@link TableProvider} for {@link PubsubIOJsonTable} which wraps {@link PubsubIO} for consumption
- * by Beam SQL.
+ * {@link TableProvider} for {@link PubsubIO} for consumption by Beam SQL.
+ *
+ * <p>Passes the {@link PubsubSchemaCapableIOProvider} to the generalized table provider wrapper,
+ * {@link SchemaCapableIOTableProviderWrapper}, for Pubsub specific behavior.
  */
 @Internal
 @Experimental
 @AutoService(TableProvider.class)
-public class PubsubJsonTableProvider extends InMemoryMetaTableProvider {
-
+public class PubsubJsonTableProvider extends SchemaCapableIOTableProviderWrapper {
   @Override
-  public String getTableType() {
-    return "pubsub";
+  public SchemaIOProvider getSchemaIOProvider() {
+    return new PubsubSchemaCapableIOProvider();
   }
 
   @Override
-  public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
-    JSONObject tableProperties = tableDefinition.getProperties();
-    PubsubSchemaCapableIOProvider ioProvider = new PubsubSchemaCapableIOProvider();
-
-    try {
-      RowJsonDeserializer deserializer =
-          RowJsonDeserializer.forSchema(ioProvider.configurationSchema())
-              .withNullBehavior(RowJsonDeserializer.NullBehavior.ACCEPT_MISSING_OR_NULL);
-
-      Row configurationRow =
-          newObjectMapperWith(deserializer).readValue(tableProperties.toString(), Row.class);
-
-      SchemaIO pubsubSchemaIO =
-          ioProvider.from(
-              tableDefinition.getLocation(), configurationRow, tableDefinition.getSchema());
-
-      return PubsubIOJsonTable.fromSchemaIO(pubsubSchemaIO);
-    } catch (InvalidConfigurationException | InvalidSchemaException e) {
-      throw new InvalidTableException(e.getMessage());
-    } catch (JsonProcessingException e) {
-      throw new AssertionError(
-          "Failed to re-parse TBLPROPERTIES JSON " + tableProperties.toString());
-    }
+  public String getTableType() {

Review comment:
       Please add a TODO with JIRA link here such that people know this override is only a temporary solution, and we can remove it after the issue tracked by that JIRA is fixed. Like
   
   ```
   // TODO[BEAM-?????]: remove this override after TableProvider problem is fixed
   ```
   
   And we should do the same thing for all classes that extends `SchemaCapableIOTableProviderWrapper`

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A general {@link TableProvider} for IOs for consumption by Beam SQL.
+ *
+ * <p>Can create child classes for IOs to pass {@link #getSchemaIOProvider()} that is specific to
+ * the IO.
+ */
+@Internal
+@Experimental
+public abstract class SchemaCapableIOTableProviderWrapper extends InMemoryMetaTableProvider
+    implements Serializable {
+  public abstract SchemaIOProvider getSchemaIOProvider();
+
+  @Override
+  public String getTableType() {
+    return getSchemaIOProvider().identifier();
+  }
+
+  @Override
+  public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
+    JSONObject tableProperties = tableDefinition.getProperties();
+
+    try {
+      RowJson.RowJsonDeserializer deserializer =
+          RowJson.RowJsonDeserializer.forSchema(getSchemaIOProvider().configurationSchema())
+              .withNullBehavior(RowJson.RowJsonDeserializer.NullBehavior.ACCEPT_MISSING_OR_NULL);
+
+      Row configurationRow =
+          newObjectMapperWith(deserializer).readValue(tableProperties.toString(), Row.class);
+
+      SchemaIO schemaIO =
+          getSchemaIOProvider()
+              .from(tableDefinition.getLocation(), configurationRow, tableDefinition.getSchema());
+
+      return new SchemaIOTableWrapper(schemaIO);
+    } catch (InvalidConfigurationException | InvalidSchemaException e) {
+      throw new InvalidTableException(e.getMessage());
+    } catch (JsonProcessingException e) {
+      throw new AssertionError(
+          "Failed to re-parse TBLPROPERTIES JSON " + tableProperties.toString());
+    }
+  }
+
+  private BeamTableStatistics getTableStatistics(PipelineOptions options) {
+    if (isBounded().equals(PCollection.IsBounded.BOUNDED)) {
+      return BeamTableStatistics.BOUNDED_UNKNOWN;
+    }
+    return BeamTableStatistics.UNBOUNDED_UNKNOWN;
+  }
+
+  private PCollection.IsBounded isBounded() {
+    return getSchemaIOProvider().isBounded();
+  }
+
+  /** A generalized {@link Table} for IOs to create IO readers and writers. */

Review comment:
       `Table` should be `BeamSqlTable` here.
   
   `Table` is actually a class for config, I think we should rename it later..

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A general {@link TableProvider} for IOs for consumption by Beam SQL.
+ *
+ * <p>Can create child classes for IOs to pass {@link #getSchemaIOProvider()} that is specific to
+ * the IO.
+ */
+@Internal
+@Experimental
+public abstract class SchemaCapableIOTableProviderWrapper extends InMemoryMetaTableProvider
+    implements Serializable {
+  public abstract SchemaIOProvider getSchemaIOProvider();
+
+  @Override
+  public String getTableType() {
+    return getSchemaIOProvider().identifier();
+  }
+
+  @Override
+  public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
+    JSONObject tableProperties = tableDefinition.getProperties();
+
+    try {
+      RowJson.RowJsonDeserializer deserializer =
+          RowJson.RowJsonDeserializer.forSchema(getSchemaIOProvider().configurationSchema())
+              .withNullBehavior(RowJson.RowJsonDeserializer.NullBehavior.ACCEPT_MISSING_OR_NULL);
+
+      Row configurationRow =
+          newObjectMapperWith(deserializer).readValue(tableProperties.toString(), Row.class);
+
+      SchemaIO schemaIO =
+          getSchemaIOProvider()
+              .from(tableDefinition.getLocation(), configurationRow, tableDefinition.getSchema());
+
+      return new SchemaIOTableWrapper(schemaIO);
+    } catch (InvalidConfigurationException | InvalidSchemaException e) {
+      throw new InvalidTableException(e.getMessage());
+    } catch (JsonProcessingException e) {
+      throw new AssertionError(
+          "Failed to re-parse TBLPROPERTIES JSON " + tableProperties.toString());
+    }
+  }
+
+  private BeamTableStatistics getTableStatistics(PipelineOptions options) {
+    if (isBounded().equals(PCollection.IsBounded.BOUNDED)) {
+      return BeamTableStatistics.BOUNDED_UNKNOWN;
+    }
+    return BeamTableStatistics.UNBOUNDED_UNKNOWN;
+  }
+
+  private PCollection.IsBounded isBounded() {
+    return getSchemaIOProvider().isBounded();
+  }
+
+  /** A generalized {@link Table} for IOs to create IO readers and writers. */
+  private class SchemaIOTableWrapper extends BaseBeamTable {
+    protected final SchemaIO schemaIO;
+
+    private SchemaIOTableWrapper(SchemaIO schemaIO) {

Review comment:
       We can make this class a static inner class by passing another parameter `PCollection.IsBounded isBounded` to the constructor and save it. This way we can also remove the 2 helper functions above (`getTableStatistics` and `isBounded`).

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A general {@link TableProvider} for IOs for consumption by Beam SQL.
+ *
+ * <p>Can create child classes for IOs to pass {@link #getSchemaIOProvider()} that is specific to

Review comment:
       This line of comment belongs better as `getSchemaIOProvider()` method comment. Like:
   
   ```
   // Subclasses should provide the schemaIOProvider that is specific to its IO.
   public abstract SchemaIOProvider getSchemaIOProvider();
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenericRecordWriteConverter.java
##########
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.avro;
+package org.apache.beam.sdk.io;

Review comment:
       This file is moved to the core module, I believe you want to move it to the io module?

##########
File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetSchemaCapableIOProvider.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.parquet;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaIOProvider} for reading and writing parquet files with {@link
+ * ParquetIO}.
+ */
+@Internal
+@AutoService(SchemaIOProvider.class)
+public class ParquetSchemaCapableIOProvider implements SchemaIOProvider {
+  /** Returns an id that uniquely represents this IO. */
+  @Override
+  public String identifier() {
+    return "parquet";
+  }
+
+  /**
+   * Returns the expected schema of the configuration object. Note this is distinct from the schema
+   * of the data source itself. No configuration expected for parquet.
+   */
+  @Override
+  public Schema configurationSchema() {
+    return Schema.builder().build();
+  }
+
+  /**
+   * Produce a SchemaIO given a String representing the data's location, the schema of the data that
+   * resides there, and some IO-specific configuration object.
+   */
+  @Override
+  public ParquetSchemaIO from(String location, Row configuration, Schema dataSchema) {
+    return new ParquetSchemaIO(location, dataSchema);
+  }
+
+  @Override
+  public boolean requiresDataSchema() {
+    return true;
+  }
+
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.BOUNDED;
+  }
+
+  /** An abstraction to create schema aware IOs. */
+  @Internal

Review comment:
       @Internal not necessary here. Please check in other places as well.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSchemaCapableIOProvider.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaIOProvider} for reading and writing Avro files with {@link
+ * AvroIO}.
+ */
+@Internal
+@AutoService(SchemaIOProvider.class)
+public class AvroSchemaCapableIOProvider implements SchemaIOProvider {
+  /** Returns an id that uniquely represents this IO. */
+  @Override
+  public String identifier() {
+    return "avro";
+  }
+
+  /**
+   * Returns the expected schema of the configuration object. Note this is distinct from the schema
+   * of the data source itself. No configuration expected for Avro.
+   */
+  @Override
+  public Schema configurationSchema() {
+    return Schema.builder().build();
+  }
+
+  /**
+   * Produce a SchemaIO given a String representing the data's location, the schema of the data that
+   * resides there, and some IO-specific configuration object.
+   */
+  @Override
+  public AvroSchemaIO from(String location, Row configuration, Schema dataSchema) {
+    return new AvroSchemaIO(location, dataSchema);
+  }
+
+  @Override
+  public boolean requiresDataSchema() {
+    return true;
+  }
+
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.BOUNDED;
+  }
+
+  /** An abstraction to create schema aware IOs. */
+  @Internal
+  private static class AvroSchemaIO implements SchemaIO, Serializable {
+    protected final Schema dataSchema;
+    protected final String location;
+
+    private AvroSchemaIO(String location, Schema dataSchema) {
+      this.dataSchema = dataSchema;
+      this.location = location;
+    }
+
+    @Override
+    public Schema schema() {
+      return dataSchema;
+    }
+
+    @Override
+    public PTransform<PBegin, PCollection<Row>> buildReader() {
+      return new PTransform<PBegin, PCollection<Row>>() {
+        @Override
+        public PCollection<Row> expand(PBegin begin) {
+          return begin
+              .apply(
+                  "AvroIORead",
+                  AvroIO.readGenericRecords(AvroUtils.toAvroSchema(dataSchema, null, null))

Review comment:
       This PR changes the behavior of this line, from setting the second parameter from table name to null. How will this affect the behavior of AvroIO?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java
##########
@@ -18,13 +18,20 @@
 package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
 
 import com.google.auto.service.AutoService;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
-import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
+import org.apache.beam.sdk.extensions.sql.meta.provider.SchemaCapableIOTableProviderWrapper;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.io.parquet.ParquetSchemaCapableIOProvider;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
 
 /**
- * {@link TableProvider} for {@link ParquetTable}.
+ * {@link TableProvider} for {@link ParquetIO} for consumption by Beam SQL.
+ *
+ * <p>Passes the {@link ParquetSchemaCapableIOProvider} to the generalized table provider wrapper,
+ * {@link SchemaCapableIOTableProviderWrapper}, for Parquet specific behavior.
+ *
+ * <p>Passes the {@link ParquetSchemaCapableIOProvider} to the generalized table provider wrapper,

Review comment:
       Duplicate comment here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r452346520



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableWrapper.java
##########
@@ -15,54 +15,63 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+@Internal
+@Experimental
+/**
+ * A generalized {@link Table} for IOs to create IO readers and writers.
+ */
+public class SchemaIOTableWrapper extends BaseBeamTable implements Serializable {
+  protected final SchemaIO schemaIO;
 
-/** {@link ParquetTable} is a {@link BeamSqlTable}. */
-public class ParquetTable extends SchemaBaseBeamTable implements Serializable {
-  private final String filePattern;
+  private SchemaIOTableWrapper(SchemaIO schemaIO) {
+    this.schemaIO = schemaIO;
+  }
 
-  public ParquetTable(Schema beamSchema, String filePattern) {
-    super(beamSchema);
-    this.filePattern = filePattern;
+  static SchemaIOTableWrapper fromSchemaIO(SchemaIO schemaIO) {
+    return new SchemaIOTableWrapper(schemaIO);
   }
 
   @Override
-  public PCollection<Row> buildIOReader(PBegin begin) {
-    PTransform<PCollection<GenericRecord>, PCollection<Row>> readConverter =
-        GenericRecordReadConverter.builder().beamSchema(schema).build();
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.UNBOUNDED;
+  }
 
-    return begin
-        .apply("ParquetIORead", ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern))
-        .apply("GenericRecordToRow", readConverter);
+  @Override
+  public Schema getSchema() {
+    return schemaIO.schema();
   }
 
   @Override
-  public PDone buildIOWriter(PCollection<Row> input) {
-    throw new UnsupportedOperationException("Writing to a Parquet file is not supported");
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    PTransform<PBegin, PCollection<Row>> readerTransform = schemaIO.buildReader();
+    return begin.apply(readerTransform);
   }
 
   @Override
-  public PCollection.IsBounded isBounded() {
-    return PCollection.IsBounded.BOUNDED;
+  public POutput buildIOWriter(PCollection<Row> input) {
+    PTransform<PCollection<Row>, POutput> writerTransform = schemaIO.buildWriter();
+    return input.apply(writerTransform);
   }
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-    return BeamTableStatistics.BOUNDED_UNKNOWN;
+    return BeamTableStatistics.UNBOUNDED_UNKNOWN;

Review comment:
       If the default logic is within TableProviderWrapper rather than TableWrapper, wouldn't it make sense to shift the SchemaIO.isBounded() method to the SchemaCapableIOProvider interface instead? The default logic within the tableproviderwrapper would otherwise rely on SchemaIOTableWrapper.isBounded()




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r457666825



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A general {@link TableProvider} for IOs for consumption by Beam SQL.
+ *
+ * <p>Can create child classes for IOs to pass {@link #getSchemaIOProvider()} that is specific to
+ * the IO.
+ */
+@Internal
+@Experimental
+public abstract class SchemaCapableIOTableProviderWrapper extends InMemoryMetaTableProvider
+    implements Serializable {
+  public abstract SchemaIOProvider getSchemaIOProvider();
+
+  @Override
+  public String getTableType() {
+    return getSchemaIOProvider().identifier();
+  }
+
+  @Override
+  public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
+    JSONObject tableProperties = tableDefinition.getProperties();
+
+    try {
+      RowJson.RowJsonDeserializer deserializer =
+          RowJson.RowJsonDeserializer.forSchema(getSchemaIOProvider().configurationSchema())
+              .withNullBehavior(RowJson.RowJsonDeserializer.NullBehavior.ACCEPT_MISSING_OR_NULL);
+
+      Row configurationRow =
+          newObjectMapperWith(deserializer).readValue(tableProperties.toString(), Row.class);
+
+      SchemaIO schemaIO =
+          getSchemaIOProvider()
+              .from(tableDefinition.getLocation(), configurationRow, tableDefinition.getSchema());
+
+      return new SchemaIOTableWrapper(schemaIO);
+    } catch (InvalidConfigurationException | InvalidSchemaException e) {
+      throw new InvalidTableException(e.getMessage());
+    } catch (JsonProcessingException e) {
+      throw new AssertionError(
+          "Failed to re-parse TBLPROPERTIES JSON " + tableProperties.toString());
+    }
+  }
+
+  private BeamTableStatistics getTableStatistics(PipelineOptions options) {
+    if (isBounded().equals(PCollection.IsBounded.BOUNDED)) {
+      return BeamTableStatistics.BOUNDED_UNKNOWN;
+    }
+    return BeamTableStatistics.UNBOUNDED_UNKNOWN;
+  }
+
+  private PCollection.IsBounded isBounded() {
+    return getSchemaIOProvider().isBounded();
+  }
+
+  /** A generalized {@link Table} for IOs to create IO readers and writers. */
+  private class SchemaIOTableWrapper extends BaseBeamTable {
+    protected final SchemaIO schemaIO;
+
+    private SchemaIOTableWrapper(SchemaIO schemaIO) {

Review comment:
       The reason why the two helper functions are there is because some IO Table Providers have logic that is more complex than the general form (ie [TextTable](https://github.com/apache/beam/blob/9b0941945545e71a949649309e05e405ca73aea2/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java#L73-L96)), and we were planning on overriding those methods within the TableProvider of that IO.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12202:
URL: https://github.com/apache/beam/pull/12202#issuecomment-661275305


   BEAM-10516 is the jira about the ServiceLoader issue.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit merged pull request #12202: [BEAM-10407,10408] SchemaIOTableProviderWrapper and implementations for Avro and Parquet

Posted by GitBox <gi...@apache.org>.
TheNeuralBit merged pull request #12202:
URL: https://github.com/apache/beam/pull/12202


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r457777076



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSchemaCapableIOProvider.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaIOProvider} for reading and writing Avro files with {@link
+ * AvroIO}.
+ */
+@Internal
+@AutoService(SchemaIOProvider.class)
+public class AvroSchemaCapableIOProvider implements SchemaIOProvider {
+  /** Returns an id that uniquely represents this IO. */
+  @Override
+  public String identifier() {
+    return "avro";
+  }
+
+  /**
+   * Returns the expected schema of the configuration object. Note this is distinct from the schema
+   * of the data source itself. No configuration expected for Avro.
+   */
+  @Override
+  public Schema configurationSchema() {
+    return Schema.builder().build();
+  }
+
+  /**
+   * Produce a SchemaIO given a String representing the data's location, the schema of the data that
+   * resides there, and some IO-specific configuration object.
+   */
+  @Override
+  public AvroSchemaIO from(String location, Row configuration, Schema dataSchema) {
+    return new AvroSchemaIO(location, dataSchema);
+  }
+
+  @Override
+  public boolean requiresDataSchema() {
+    return true;
+  }
+
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.BOUNDED;
+  }
+
+  /** An abstraction to create schema aware IOs. */
+  @Internal
+  private static class AvroSchemaIO implements SchemaIO, Serializable {
+    protected final Schema dataSchema;
+    protected final String location;
+
+    private AvroSchemaIO(String location, Schema dataSchema) {
+      this.dataSchema = dataSchema;
+      this.location = location;
+    }
+
+    @Override
+    public Schema schema() {
+      return dataSchema;
+    }
+
+    @Override
+    public PTransform<PBegin, PCollection<Row>> buildReader() {
+      return new PTransform<PBegin, PCollection<Row>>() {
+        @Override
+        public PCollection<Row> expand(PBegin begin) {
+          return begin
+              .apply(
+                  "AvroIORead",
+                  AvroIO.readGenericRecords(AvroUtils.toAvroSchema(dataSchema, null, null))

Review comment:
       I think it is fine, when the name of the schema isn't specified we just call it "topLevelRecord": https://github.com/apache/beam/blob/f36250f2816da2a0d8350e92f0010615f8491ee0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L327
   
   I don't think the name has any effect on the way we encode/decode the Avro records. (CC @iemejia in case I'm wrong)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #12202:
URL: https://github.com/apache/beam/pull/12202#issuecomment-658358581


   I think Luke said the same thing. You put `TableProvider` interface somewhere everyone can depend on. I believe in your proposal it might end up in up in java core. I don't know if that is necessary or not. I know there were revisions planned to make that more appropriate. Then the implementations depend on core, and the place that wants to load them also depends on core.
   
   BTW it would be better as a `TableProviderRegistrar` so that it decouples the provider interface from the loading/registration logic. Then SQL depends on that


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r457529328



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenericRecordWriteConverter.java
##########
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.avro;
+package org.apache.beam.sdk.io;

Review comment:
       The reason why I moved it here was to be in the same directory as AvroIO and AvroSchemaCapableIOProvider, which happened to be in core. Do you think I should move it to the io module regardless?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iemejia commented on a change in pull request #12202: [BEAM-10407,10408] SchemaIOTableProviderWrapper and implementations for Avro and Parquet

Posted by GitBox <gi...@apache.org>.
iemejia commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r471994485



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSchemaCapableIOProvider.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaIOProvider} for reading and writing Avro files with {@link
+ * AvroIO}.
+ */
+@Internal
+@AutoService(SchemaIOProvider.class)
+public class AvroSchemaCapableIOProvider implements SchemaIOProvider {
+  /** Returns an id that uniquely represents this IO. */
+  @Override
+  public String identifier() {
+    return "avro";
+  }
+
+  /**
+   * Returns the expected schema of the configuration object. Note this is distinct from the schema
+   * of the data source itself. No configuration expected for Avro.
+   */
+  @Override
+  public Schema configurationSchema() {
+    return Schema.builder().build();
+  }
+
+  /**
+   * Produce a SchemaIO given a String representing the data's location, the schema of the data that
+   * resides there, and some IO-specific configuration object.
+   */
+  @Override
+  public AvroSchemaIO from(String location, Row configuration, Schema dataSchema) {
+    return new AvroSchemaIO(location, dataSchema);
+  }
+
+  @Override
+  public boolean requiresDataSchema() {
+    return true;
+  }
+
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.BOUNDED;
+  }
+
+  /** An abstraction to create schema aware IOs. */
+  @Internal
+  private static class AvroSchemaIO implements SchemaIO, Serializable {
+    protected final Schema dataSchema;
+    protected final String location;
+
+    private AvroSchemaIO(String location, Schema dataSchema) {
+      this.dataSchema = dataSchema;
+      this.location = location;
+    }
+
+    @Override
+    public Schema schema() {
+      return dataSchema;
+    }
+
+    @Override
+    public PTransform<PBegin, PCollection<Row>> buildReader() {
+      return new PTransform<PBegin, PCollection<Row>>() {
+        @Override
+        public PCollection<Row> expand(PBegin begin) {
+          return begin
+              .apply(
+                  "AvroIORead",
+                  AvroIO.readGenericRecords(AvroUtils.toAvroSchema(dataSchema, null, null))

Review comment:
       Oups catching up on old stuff, AFAIR in theory it does but in practice it does not, because this is a part of the spec that is specified but nobody enforces specially the Java impl, and if the Java impl does not the others don't for compatibility reasons.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on a change in pull request #12202: [BEAM-10407,10408] Schema Capable IO Table Provider Wrappers

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r456711570



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaCapableIOTableProviderWrapper.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A general {@link TableProvider} for IOs for consumption by Beam SQL.
+ *
+ * <p>Can create child classes for IOs to pass {@link #getSchemaIOProvider()} that is specific to
+ * the IO.
+ */
+@Internal
+@Experimental
+public abstract class SchemaCapableIOTableProviderWrapper extends InMemoryMetaTableProvider
+    implements Serializable {
+  public abstract SchemaIOProvider getSchemaIOProvider();
+
+  @Override
+  public String getTableType() {
+    return getSchemaIOProvider().identifier();
+  }
+
+  @Override
+  public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
+    JSONObject tableProperties = tableDefinition.getProperties();
+
+    try {
+      RowJson.RowJsonDeserializer deserializer =
+          RowJson.RowJsonDeserializer.forSchema(getSchemaIOProvider().configurationSchema())
+              .withNullBehavior(RowJson.RowJsonDeserializer.NullBehavior.ACCEPT_MISSING_OR_NULL);
+
+      Row configurationRow =
+          newObjectMapperWith(deserializer).readValue(tableProperties.toString(), Row.class);
+
+      SchemaIO schemaIO =
+          getSchemaIOProvider()
+              .from(tableDefinition.getLocation(), configurationRow, tableDefinition.getSchema());
+
+      return new SchemaIOTableWrapper(schemaIO);
+    } catch (InvalidConfigurationException | InvalidSchemaException e) {
+      throw new InvalidTableException(e.getMessage());
+    } catch (JsonProcessingException e) {
+      throw new AssertionError(
+          "Failed to re-parse TBLPROPERTIES JSON " + tableProperties.toString());
+    }
+  }
+
+  private BeamTableStatistics getTableStatistics(PipelineOptions options) {
+    if (isBounded().equals(PCollection.IsBounded.BOUNDED)) {
+      return BeamTableStatistics.BOUNDED_UNKNOWN;
+    }
+    return BeamTableStatistics.UNBOUNDED_UNKNOWN;
+  }
+
+  private PCollection.IsBounded isBounded() {
+    return getSchemaIOProvider().isBounded();
+  }
+
+  /** A generalized {@link Table} for IOs to create IO readers and writers. */

Review comment:
       `Table` should be `BeamSqlTable` here.
   
   `Table` is actually a class for config, I think we should rename it to something like `TableDefinition` later... It is very confusing to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org