You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/22 20:48:30 UTC

[GitHub] [beam] sclukas77 opened a new pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

sclukas77 opened a new pull request #12341:
URL: https://github.com/apache/beam/pull/12341


   Implemented SchemaIO and SchemaIOProvider for DataStoreV1, shifting logic to core Beam. Rewrote DataStoreV1TableProvider unit tests for DataStoreV1SchemaIOProvider.
   
   R:@TheNeuralBit
   R:@robinyqiu
   ------------------------
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12341:
URL: https://github.com/apache/beam/pull/12341#discussion_r470661198



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRow.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Value;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@code PTransform} to perform a conversion of {@link Entity} to {@link Row}. */
+public class EntityToRow extends PTransform<PCollection<Entity>, PCollection<Row>> {
+  private final Schema schema;
+  private final String keyField;
+  private static final Logger LOG = LoggerFactory.getLogger(DataStoreV1SchemaIOProvider.class);
+
+  private EntityToRow(Schema schema, String keyField) {
+    this.schema = schema;
+    this.keyField = keyField;
+
+    if (schema.getFieldNames().contains(keyField)) {
+      if (!schema.getField(keyField).getType().getTypeName().equals(Schema.TypeName.BYTES)) {
+        throw new IllegalStateException(
+            "Field `"
+                + keyField
+                + "` should of type `VARBINARY`. Please change the type or specify a field to"

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12341:
URL: https://github.com/apache/beam/pull/12341#discussion_r466460044



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRow.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Value;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@code PTransform} to perform a conversion of {@code PCollection<Entity>} to {@code
+ * PCollection<Row>}.

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12341:
URL: https://github.com/apache/beam/pull/12341#issuecomment-674310525


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12341:
URL: https://github.com/apache/beam/pull/12341#issuecomment-673032763


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12341:
URL: https://github.com/apache/beam/pull/12341#discussion_r469973877



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java
##########
@@ -80,13 +80,17 @@ public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
     }
   }
 
-  private BeamTableStatistics getTableStatistics(PipelineOptions options) {
+  public BeamTableStatistics getTableStatistics(PipelineOptions options) {
     if (isBounded().equals(PCollection.IsBounded.BOUNDED)) {
       return BeamTableStatistics.BOUNDED_UNKNOWN;
     }
     return BeamTableStatistics.UNBOUNDED_UNKNOWN;
   }
 
+  public BeamTableStatistics getTableStatistics(PipelineOptions options, SchemaIO schemaIO) {
+    return getTableStatistics(options);
+  }
+

Review comment:
       The reason why I added this additional getTableStatistics() function was that DataStoreV1 is the first IO whose getTableStatistics() function relied on the schemaIO data, and this SchemaIOTableWrapper#getTableStatistics function could be overridden in DataStoreV1TableProvider. The other IOs so far have not required the schemaIO. Do you think we should get rid of the other getTableStatistics() function and always require a schemaIO, even when the schemaIO isn't absolutely necessary? Or support both cases?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12341:
URL: https://github.com/apache/beam/pull/12341#discussion_r466461917



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProviderTest.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.io.gcp.datastore.DataStoreV1SchemaIOProvider.DataStoreV1SchemaIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataStoreV1SchemaIOProviderTest {
+  static final String DEFAULT_KEY_FIELD = "__key__";
+  public static final String KEY_FIELD_PROPERTY = "keyField";
+  private DataStoreV1SchemaIOProvider provider = new DataStoreV1SchemaIOProvider();
+
+  @Test
+  public void testGetTableType() {
+    assertEquals("datastoreV1", provider.identifier());
+  }
+
+  @Test
+  public void testBuildBeamSqlTable() {
+    final String location = "projectId/batch_kind";
+    Row configuration = Row.withSchema(generateRowSchema()).addValue(null).build();
+    SchemaIO schemaIO =
+        (new DataStoreV1SchemaIOProvider()).from(location, configuration, generateDataSchema());

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12341:
URL: https://github.com/apache/beam/pull/12341#discussion_r470903947



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java
##########
@@ -80,7 +80,7 @@ public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
     }
   }
 
-  private BeamTableStatistics getTableStatistics(PipelineOptions options) {
+  public BeamTableStatistics getTableStatistics(PipelineOptions options, SchemaIO schemaIO) {

Review comment:
       nit: I think this just needs to be protected so that implementors can override
   ```suggestion
     protected BeamTableStatistics getTableStatistics(PipelineOptions options, SchemaIO schemaIO) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12341:
URL: https://github.com/apache/beam/pull/12341#discussion_r466461747



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProviderTest.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.io.gcp.datastore.DataStoreV1SchemaIOProvider.DataStoreV1SchemaIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataStoreV1SchemaIOProviderTest {
+  static final String DEFAULT_KEY_FIELD = "__key__";
+  public static final String KEY_FIELD_PROPERTY = "keyField";
+  private DataStoreV1SchemaIOProvider provider = new DataStoreV1SchemaIOProvider();
+
+  @Test
+  public void testGetTableType() {
+    assertEquals("datastoreV1", provider.identifier());
+  }
+
+  @Test
+  public void testBuildBeamSqlTable() {
+    final String location = "projectId/batch_kind";
+    Row configuration = Row.withSchema(generateRowSchema()).addValue(null).build();
+    SchemaIO schemaIO =
+        (new DataStoreV1SchemaIOProvider()).from(location, configuration, generateDataSchema());
+
+    assertNotNull(schemaIO);
+    assertTrue(schemaIO instanceof DataStoreV1SchemaIO);
+
+    DataStoreV1SchemaIO dataStoreV1SchemaIO = (DataStoreV1SchemaIO) schemaIO;
+    assertEquals("projectId", dataStoreV1SchemaIO.projectId);
+    assertEquals("batch_kind", dataStoreV1SchemaIO.kind);
+    assertEquals(DEFAULT_KEY_FIELD, dataStoreV1SchemaIO.keyField);
+  }
+
+  @Test
+  public void testTableProperty() {
+    final String location = "projectId/batch_kind";
+
+    Row configuration = Row.withSchema(generateRowSchema()).addValue("field_name").build();
+    SchemaIO schemaIO =
+        (new DataStoreV1SchemaIOProvider()).from(location, configuration, generateDataSchema());
+
+    assertNotNull(schemaIO);
+    assertTrue(schemaIO instanceof DataStoreV1SchemaIO);
+
+    DataStoreV1SchemaIO dataStoreV1SchemaIO = (DataStoreV1SchemaIO) schemaIO;
+    assertEquals("projectId", dataStoreV1SchemaIO.projectId);
+    assertEquals("batch_kind", dataStoreV1SchemaIO.kind);
+    assertEquals("field_name", dataStoreV1SchemaIO.keyField);
+  }
+
+  @Test
+  public void testTableProperty_nullValue_throwsException() {
+    final String location = "projectId/batch_kind";
+    Row configuration = Row.withSchema(generateRowSchema()).addValue("").build();
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            (new DataStoreV1SchemaIOProvider())
+                .from(location, configuration, generateDataSchema()));
+  }
+
+  private static Schema generateDataSchema() {
+    return Schema.builder()
+        .addNullableField("id", Schema.FieldType.INT32)
+        .addNullableField("name", Schema.FieldType.STRING)
+        .build();
+  }
+
+  private Schema generateRowSchema() {
+    return Schema.builder().addNullableField(KEY_FIELD_PROPERTY, Schema.FieldType.STRING).build();
+  }

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12341:
URL: https://github.com/apache/beam/pull/12341#issuecomment-673027784






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12341:
URL: https://github.com/apache/beam/pull/12341#discussion_r466460269



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RowToEntity.java
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Value;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@code PTransform} to perform a conversion of {@code PCollection<Row>} to {@code
+ * PCollection<Entity>}.

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12341:
URL: https://github.com/apache/beam/pull/12341#issuecomment-674322745


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12341:
URL: https://github.com/apache/beam/pull/12341#discussion_r470660945



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProvider.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import com.google.auto.service.AutoService;
+import com.google.datastore.v1.Query;
+import java.io.Serializable;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidLocationException;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaIOProvider} for reading and writing payloads with {@link
+ * DatastoreIO}.
+ */
+@Internal
+@AutoService(SchemaIOProvider.class)
+public class DataStoreV1SchemaIOProvider implements SchemaIOProvider {
+  public static final String KEY_FIELD_PROPERTY = "keyField";
+  static final String DEFAULT_KEY_FIELD = "__key__";
+  private static final Pattern locationPattern = Pattern.compile("(?<projectId>.+)/(?<kind>.+)");
+
+  /** Returns an id that uniquely represents this IO. */
+  @Override
+  public String identifier() {
+    return "datastoreV1";
+  }
+
+  /**
+   * Returns the expected schema of the configuration object. Note this is distinct from the schema
+   * of the data source itself.
+   */

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12341:
URL: https://github.com/apache/beam/pull/12341#discussion_r466462146



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProviderTest.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.io.gcp.datastore.DataStoreV1SchemaIOProvider.DataStoreV1SchemaIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataStoreV1SchemaIOProviderTest {
+  static final String DEFAULT_KEY_FIELD = "__key__";
+  public static final String KEY_FIELD_PROPERTY = "keyField";
+  private DataStoreV1SchemaIOProvider provider = new DataStoreV1SchemaIOProvider();
+
+  @Test
+  public void testGetTableType() {
+    assertEquals("datastoreV1", provider.identifier());
+  }
+
+  @Test
+  public void testBuildBeamSqlTable() {
+    final String location = "projectId/batch_kind";
+    Row configuration = Row.withSchema(generateRowSchema()).addValue(null).build();
+    SchemaIO schemaIO =
+        (new DataStoreV1SchemaIOProvider()).from(location, configuration, generateDataSchema());
+
+    assertNotNull(schemaIO);
+    assertTrue(schemaIO instanceof DataStoreV1SchemaIO);
+
+    DataStoreV1SchemaIO dataStoreV1SchemaIO = (DataStoreV1SchemaIO) schemaIO;
+    assertEquals("projectId", dataStoreV1SchemaIO.projectId);
+    assertEquals("batch_kind", dataStoreV1SchemaIO.kind);
+    assertEquals(DEFAULT_KEY_FIELD, dataStoreV1SchemaIO.keyField);
+  }
+
+  @Test
+  public void testTableProperty() {
+    final String location = "projectId/batch_kind";
+
+    Row configuration = Row.withSchema(generateRowSchema()).addValue("field_name").build();

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12341:
URL: https://github.com/apache/beam/pull/12341#discussion_r461252200



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProviderTest.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.io.gcp.datastore.DataStoreV1SchemaIOProvider.DataStoreV1SchemaIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataStoreV1SchemaIOProviderTest {
+  static final String DEFAULT_KEY_FIELD = "__key__";
+  public static final String KEY_FIELD_PROPERTY = "keyField";
+  private DataStoreV1SchemaIOProvider provider = new DataStoreV1SchemaIOProvider();
+
+  @Test
+  public void testGetTableType() {
+    assertEquals("datastoreV1", provider.identifier());
+  }
+
+  @Test
+  public void testBuildBeamSqlTable() {
+    final String location = "projectId/batch_kind";
+    Row configuration = Row.withSchema(generateRowSchema()).addValue(null).build();
+    SchemaIO schemaIO =
+        (new DataStoreV1SchemaIOProvider()).from(location, configuration, generateDataSchema());
+
+    assertNotNull(schemaIO);
+    assertTrue(schemaIO instanceof DataStoreV1SchemaIO);
+
+    DataStoreV1SchemaIO dataStoreV1SchemaIO = (DataStoreV1SchemaIO) schemaIO;
+    assertEquals("projectId", dataStoreV1SchemaIO.projectId);
+    assertEquals("batch_kind", dataStoreV1SchemaIO.kind);
+    assertEquals(DEFAULT_KEY_FIELD, dataStoreV1SchemaIO.keyField);
+  }
+
+  @Test
+  public void testTableProperty() {
+    final String location = "projectId/batch_kind";
+
+    Row configuration = Row.withSchema(generateRowSchema()).addValue("field_name").build();
+    SchemaIO schemaIO =
+        (new DataStoreV1SchemaIOProvider()).from(location, configuration, generateDataSchema());
+
+    assertNotNull(schemaIO);
+    assertTrue(schemaIO instanceof DataStoreV1SchemaIO);
+
+    DataStoreV1SchemaIO dataStoreV1SchemaIO = (DataStoreV1SchemaIO) schemaIO;
+    assertEquals("projectId", dataStoreV1SchemaIO.projectId);
+    assertEquals("batch_kind", dataStoreV1SchemaIO.kind);
+    assertEquals("field_name", dataStoreV1SchemaIO.keyField);
+  }
+
+  @Test
+  public void testTableProperty_nullValue_throwsException() {
+    final String location = "projectId/batch_kind";
+    Row configuration = Row.withSchema(generateRowSchema()).addValue("").build();
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            (new DataStoreV1SchemaIOProvider())
+                .from(location, configuration, generateDataSchema()));
+  }
+
+  private static Schema generateDataSchema() {
+    return Schema.builder()
+        .addNullableField("id", Schema.FieldType.INT32)
+        .addNullableField("name", Schema.FieldType.STRING)
+        .build();
+  }
+
+  private Schema generateRowSchema() {
+    return Schema.builder().addNullableField(KEY_FIELD_PROPERTY, Schema.FieldType.STRING).build();
+  }

Review comment:
       I think it would be better to get rid of this function and call `provider.configurationSchema()` instead.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RowToEntity.java
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Value;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@code PTransform} to perform a conversion of {@code PCollection<Row>} to {@code
+ * PCollection<Entity>}.

Review comment:
       ```suggestion
    * A {@code PTransform} to perform a conversion of {@link Row} to {@link Entity}.
   ```

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProviderTest.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.io.gcp.datastore.DataStoreV1SchemaIOProvider.DataStoreV1SchemaIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataStoreV1SchemaIOProviderTest {
+  static final String DEFAULT_KEY_FIELD = "__key__";
+  public static final String KEY_FIELD_PROPERTY = "keyField";
+  private DataStoreV1SchemaIOProvider provider = new DataStoreV1SchemaIOProvider();
+
+  @Test
+  public void testGetTableType() {
+    assertEquals("datastoreV1", provider.identifier());
+  }
+
+  @Test
+  public void testBuildBeamSqlTable() {
+    final String location = "projectId/batch_kind";
+    Row configuration = Row.withSchema(generateRowSchema()).addValue(null).build();
+    SchemaIO schemaIO =
+        (new DataStoreV1SchemaIOProvider()).from(location, configuration, generateDataSchema());
+
+    assertNotNull(schemaIO);
+    assertTrue(schemaIO instanceof DataStoreV1SchemaIO);
+
+    DataStoreV1SchemaIO dataStoreV1SchemaIO = (DataStoreV1SchemaIO) schemaIO;
+    assertEquals("projectId", dataStoreV1SchemaIO.projectId);
+    assertEquals("batch_kind", dataStoreV1SchemaIO.kind);
+    assertEquals(DEFAULT_KEY_FIELD, dataStoreV1SchemaIO.keyField);
+  }
+
+  @Test
+  public void testTableProperty() {
+    final String location = "projectId/batch_kind";
+
+    Row configuration = Row.withSchema(generateRowSchema()).addValue("field_name").build();

Review comment:
       I think we're now preferring to use withFieldValue rather than addValue when constructing Row instances now, e.g.:
   `Row.withSchema(provider.configurationSchema()).withFieldValue("keyField", "field_name")`
   
   See the examples in Row.java: https://github.com/apache/beam/blob/956e4eb39a7fedbae05985c759284557dcc3d9ec/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java#L64-L74
   

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRowRowToEntityTest.java
##########
@@ -74,7 +73,7 @@
           .addNullableField("rowArray", array(FieldType.row(NESTED_ROW_SCHEMA)))
           .addNullableField("double", DOUBLE)
           .addNullableField("bytes", BYTES)
-          .addNullableField("string", CalciteUtils.CHAR)
+          .addNullableField("string", CHAR)

Review comment:
       Will this work if we just use `STRING`? (Rather than re-creating `CalciteUtils.CHAR` here)

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProviderTest.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.io.gcp.datastore.DataStoreV1SchemaIOProvider.DataStoreV1SchemaIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataStoreV1SchemaIOProviderTest {
+  static final String DEFAULT_KEY_FIELD = "__key__";
+  public static final String KEY_FIELD_PROPERTY = "keyField";
+  private DataStoreV1SchemaIOProvider provider = new DataStoreV1SchemaIOProvider();
+
+  @Test
+  public void testGetTableType() {
+    assertEquals("datastoreV1", provider.identifier());
+  }
+
+  @Test
+  public void testBuildBeamSqlTable() {
+    final String location = "projectId/batch_kind";
+    Row configuration = Row.withSchema(generateRowSchema()).addValue(null).build();
+    SchemaIO schemaIO =
+        (new DataStoreV1SchemaIOProvider()).from(location, configuration, generateDataSchema());

Review comment:
       I think you could just re-use `provider` here and in the other tests. If you're worried about re-using the same instance in all the tests you could consider initializing `provider` in an `@Before` method instead of initializing statically so that each test will get a fresh instance.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRow.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Value;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@code PTransform} to perform a conversion of {@code PCollection<Entity>} to {@code
+ * PCollection<Row>}.

Review comment:
       ```suggestion
    * A {@code PTransform} to perform a conversion of {@link Entity} to {@link Row}.
   ```

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1TableProvider.java
##########
@@ -39,15 +43,14 @@
  * }</pre>
  */
 @AutoService(TableProvider.class)
-public class DataStoreV1TableProvider extends InMemoryMetaTableProvider {
-
+public class DataStoreV1TableProvider extends SchemaIOTableProviderWrapper {
   @Override
-  public String getTableType() {
-    return "datastoreV1";
+  public SchemaIOProvider getSchemaIOProvider() {
+    return new DataStoreV1SchemaIOProvider();
   }
 
   @Override
-  public BeamSqlTable buildBeamSqlTable(Table table) {
-    return new DataStoreV1Table(table);
+  public String getTableType() {
+    return "datastoreV1";

Review comment:
       Should this also have an implementation for getTableStatistics? It looks like DataStoreV1Table had a non-standard implementation:
   ```
     @Override	
     public BeamTableStatistics getTableStatistics(PipelineOptions options) {	
       long count =	
           DatastoreIO.v1().read().withProjectId(projectId).getNumEntities(options, kind, null);	
   
       if (count < 0) {	
         return BeamTableStatistics.BOUNDED_UNKNOWN;	
       }	
   
       return BeamTableStatistics.createBoundedTableStatistics((double) count);	
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12341:
URL: https://github.com/apache/beam/pull/12341#discussion_r470663519



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java
##########
@@ -80,13 +80,17 @@ public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
     }
   }
 
-  private BeamTableStatistics getTableStatistics(PipelineOptions options) {
+  public BeamTableStatistics getTableStatistics(PipelineOptions options) {
     if (isBounded().equals(PCollection.IsBounded.BOUNDED)) {
       return BeamTableStatistics.BOUNDED_UNKNOWN;
     }
     return BeamTableStatistics.UNBOUNDED_UNKNOWN;
   }
 
+  public BeamTableStatistics getTableStatistics(PipelineOptions options, SchemaIO schemaIO) {
+    return getTableStatistics(options);
+  }
+

Review comment:
       I see what you mean now. I got rid of the old getTableStatistics() function and moved the default implementation to this one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit merged pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
TheNeuralBit merged pull request #12341:
URL: https://github.com/apache/beam/pull/12341


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12341:
URL: https://github.com/apache/beam/pull/12341#issuecomment-673027691


   Run JavaPortabilityApiJava11 PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12341:
URL: https://github.com/apache/beam/pull/12341#discussion_r469443705



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProvider.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import com.google.auto.service.AutoService;
+import com.google.datastore.v1.Query;
+import java.io.Serializable;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidLocationException;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaIOProvider} for reading and writing payloads with {@link
+ * DatastoreIO}.
+ */
+@Internal
+@AutoService(SchemaIOProvider.class)
+public class DataStoreV1SchemaIOProvider implements SchemaIOProvider {
+  public static final String KEY_FIELD_PROPERTY = "keyField";
+  static final String DEFAULT_KEY_FIELD = "__key__";
+  private static final Pattern locationPattern = Pattern.compile("(?<projectId>.+)/(?<kind>.+)");
+
+  /** Returns an id that uniquely represents this IO. */
+  @Override
+  public String identifier() {
+    return "datastoreV1";
+  }
+
+  /**
+   * Returns the expected schema of the configuration object. Note this is distinct from the schema
+   * of the data source itself.
+   */

Review comment:
       Let's describe the configuration parameters here. Something like this (spotless probably has an opinion on how this should be formatted):
   ```suggestion
     /**
      * Returns the expected schema of the configuration object. Note this is distinct from the schema
      * of the data source itself.
      * 
      * <p>Configuration Parameters:
      * <ul>
      *   <li>STRING keyField: The name of the Beam schema field to map the DataStore entity key. Defaults to {@code __key__} if not set or null.
      * </ul>
      */
   ```

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java
##########
@@ -80,13 +80,17 @@ public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
     }
   }
 
-  private BeamTableStatistics getTableStatistics(PipelineOptions options) {
+  public BeamTableStatistics getTableStatistics(PipelineOptions options) {
     if (isBounded().equals(PCollection.IsBounded.BOUNDED)) {
       return BeamTableStatistics.BOUNDED_UNKNOWN;
     }
     return BeamTableStatistics.UNBOUNDED_UNKNOWN;
   }
 
+  public BeamTableStatistics getTableStatistics(PipelineOptions options, SchemaIO schemaIO) {
+    return getTableStatistics(options);
+  }
+

Review comment:
       I don't think this new method is actually called anywhere. Should it be called from `SchemaIOTableWrapper#getTableStatistics`?
   
    It might also be good to just get rid of the other `getTableStatistics` and move the default implementation to this one.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRow.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Value;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@code PTransform} to perform a conversion of {@link Entity} to {@link Row}. */
+public class EntityToRow extends PTransform<PCollection<Entity>, PCollection<Row>> {
+  private final Schema schema;
+  private final String keyField;
+  private static final Logger LOG = LoggerFactory.getLogger(DataStoreV1SchemaIOProvider.class);
+
+  private EntityToRow(Schema schema, String keyField) {
+    this.schema = schema;
+    this.keyField = keyField;
+
+    if (schema.getFieldNames().contains(keyField)) {
+      if (!schema.getField(keyField).getType().getTypeName().equals(Schema.TypeName.BYTES)) {
+        throw new IllegalStateException(
+            "Field `"
+                + keyField
+                + "` should of type `VARBINARY`. Please change the type or specify a field to"

Review comment:
       ```suggestion
                   + "` should of type `BYTES`. Please change the type or specify a field to"
   ```
   Let's change this to the Beam schema type name




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12341:
URL: https://github.com/apache/beam/pull/12341#issuecomment-673014258


   Run SQL PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12341:
URL: https://github.com/apache/beam/pull/12341#discussion_r466461253



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1TableProvider.java
##########
@@ -39,15 +43,14 @@
  * }</pre>
  */
 @AutoService(TableProvider.class)
-public class DataStoreV1TableProvider extends InMemoryMetaTableProvider {
-
+public class DataStoreV1TableProvider extends SchemaIOTableProviderWrapper {
   @Override
-  public String getTableType() {
-    return "datastoreV1";
+  public SchemaIOProvider getSchemaIOProvider() {
+    return new DataStoreV1SchemaIOProvider();
   }
 
   @Override
-  public BeamSqlTable buildBeamSqlTable(Table table) {
-    return new DataStoreV1Table(table);
+  public String getTableType() {
+    return "datastoreV1";

Review comment:
       Done. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12341:
URL: https://github.com/apache/beam/pull/12341#discussion_r466459599



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRowRowToEntityTest.java
##########
@@ -74,7 +73,7 @@
           .addNullableField("rowArray", array(FieldType.row(NESTED_ROW_SCHEMA)))
           .addNullableField("double", DOUBLE)
           .addNullableField("bytes", BYTES)
-          .addNullableField("string", CalciteUtils.CHAR)
+          .addNullableField("string", CHAR)

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12341: [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12341:
URL: https://github.com/apache/beam/pull/12341#issuecomment-674219709


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org