You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/15 16:22:16 UTC

[GitHub] [beam] sclukas77 opened a new pull request #12266: [BEAM-10494] PubsubSchemaCapableIOProvider config inner class rather than row

sclukas77 opened a new pull request #12266:
URL: https://github.com/apache/beam/pull/12266


   Used Schema inference to shift config values to an inferred object rather than a Row object within PubsubSchemaCapableIOProvider. The SchemaIO logic now interacts with the inferred object rather than the row.
   
   R: @robinyqiu 
   ------------------------
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on a change in pull request #12266: [BEAM-10494] PubsubSchemaCapableIOProvider config inner class rather than row

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on a change in pull request #12266:
URL: https://github.com/apache/beam/pull/12266#discussion_r456089799



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RowToPubsubMessage.java
##########
@@ -40,19 +39,14 @@
  * whether config.getValue("timestampAttributeKey") is set.
  */
 class RowToPubsubMessage extends PTransform<PCollection<Row>, PCollection<PubsubMessage>> {
-  private final Row config;
   private final Boolean useTimestampAttribute;
 
-  private RowToPubsubMessage(Row config, Boolean useFlatSchema, Boolean useTimestampAttribute) {
-    checkArgument(useFlatSchema, "RowToPubsubMessage is only supported for flattened schemas.");
-
-    this.config = config;
+  private RowToPubsubMessage(Boolean useTimestampAttribute) {

Review comment:
       Java coding style discourage using boxed primitive types, unless there are reasons. I think here it's better to just use `boolean`. Same below.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RowToPubsubMessage.java
##########
@@ -40,19 +39,14 @@
  * whether config.getValue("timestampAttributeKey") is set.
  */
 class RowToPubsubMessage extends PTransform<PCollection<Row>, PCollection<PubsubMessage>> {
-  private final Row config;
   private final Boolean useTimestampAttribute;
 
-  private RowToPubsubMessage(Row config, Boolean useFlatSchema, Boolean useTimestampAttribute) {
-    checkArgument(useFlatSchema, "RowToPubsubMessage is only supported for flattened schemas.");
-
-    this.config = config;
+  private RowToPubsubMessage(Boolean useTimestampAttribute) {
     this.useTimestampAttribute = useTimestampAttribute;
   }
 
-  public static RowToPubsubMessage fromConfig(
-      Row config, Boolean useFlatSchema, Boolean useTimestampAttribute) {
-    return new RowToPubsubMessage(config, useFlatSchema, useTimestampAttribute);
+  public static RowToPubsubMessage fromConfig(Boolean useTimestampAttribute) {

Review comment:
       Nit: Now that `config` parameter is gone, this class could be better named as something more meaningful like `withTimestampAttribute()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12266: [BEAM-10494] PubsubSchemaCapableIOProvider config inner class rather than row

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12266:
URL: https://github.com/apache/beam/pull/12266#discussion_r456065624



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaCapableIOProvider.java
##########
@@ -155,16 +159,17 @@ private void validateConfigurationSchema(Row configuration) {
   /** An abstraction to create schema aware IOs. */
   @Internal
   private static class PubsubSchemaIO implements SchemaIO, Serializable {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12266: [BEAM-10494] PubsubSchemaCapableIOProvider config inner class rather than row

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12266:
URL: https://github.com/apache/beam/pull/12266#discussion_r456549086



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RowToPubsubMessage.java
##########
@@ -40,19 +39,14 @@
  * whether config.getValue("timestampAttributeKey") is set.
  */
 class RowToPubsubMessage extends PTransform<PCollection<Row>, PCollection<PubsubMessage>> {
-  private final Row config;
   private final Boolean useTimestampAttribute;

Review comment:
       Done

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RowToPubsubMessage.java
##########
@@ -40,19 +39,14 @@
  * whether config.getValue("timestampAttributeKey") is set.
  */
 class RowToPubsubMessage extends PTransform<PCollection<Row>, PCollection<PubsubMessage>> {
-  private final Row config;
   private final Boolean useTimestampAttribute;
 
-  private RowToPubsubMessage(Row config, Boolean useFlatSchema, Boolean useTimestampAttribute) {
-    checkArgument(useFlatSchema, "RowToPubsubMessage is only supported for flattened schemas.");
-
-    this.config = config;
+  private RowToPubsubMessage(Boolean useTimestampAttribute) {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu merged pull request #12266: [BEAM-10494] PubsubSchemaCapableIOProvider config inner class rather than row

Posted by GitBox <gi...@apache.org>.
robinyqiu merged pull request #12266:
URL: https://github.com/apache/beam/pull/12266


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on a change in pull request #12266: [BEAM-10494] PubsubSchemaCapableIOProvider config inner class rather than row

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on a change in pull request #12266:
URL: https://github.com/apache/beam/pull/12266#discussion_r456101925



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RowToPubsubMessage.java
##########
@@ -40,19 +39,14 @@
  * whether config.getValue("timestampAttributeKey") is set.
  */
 class RowToPubsubMessage extends PTransform<PCollection<Row>, PCollection<PubsubMessage>> {
-  private final Row config;
   private final Boolean useTimestampAttribute;
 
-  private RowToPubsubMessage(Row config, Boolean useFlatSchema, Boolean useTimestampAttribute) {
-    checkArgument(useFlatSchema, "RowToPubsubMessage is only supported for flattened schemas.");
-
-    this.config = config;
+  private RowToPubsubMessage(Boolean useTimestampAttribute) {

Review comment:
       And this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12266: [BEAM-10494] PubsubSchemaCapableIOProvider config inner class rather than row

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12266:
URL: https://github.com/apache/beam/pull/12266#discussion_r456099852



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RowToPubsubMessage.java
##########
@@ -40,19 +39,14 @@
  * whether config.getValue("timestampAttributeKey") is set.
  */
 class RowToPubsubMessage extends PTransform<PCollection<Row>, PCollection<PubsubMessage>> {
-  private final Row config;
   private final Boolean useTimestampAttribute;
 
-  private RowToPubsubMessage(Row config, Boolean useFlatSchema, Boolean useTimestampAttribute) {
-    checkArgument(useFlatSchema, "RowToPubsubMessage is only supported for flattened schemas.");
-
-    this.config = config;
+  private RowToPubsubMessage(Boolean useTimestampAttribute) {
     this.useTimestampAttribute = useTimestampAttribute;
   }
 
-  public static RowToPubsubMessage fromConfig(
-      Row config, Boolean useFlatSchema, Boolean useTimestampAttribute) {
-    return new RowToPubsubMessage(config, useFlatSchema, useTimestampAttribute);
+  public static RowToPubsubMessage fromConfig(Boolean useTimestampAttribute) {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12266: [BEAM-10494] PubsubSchemaCapableIOProvider config inner class rather than row

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12266:
URL: https://github.com/apache/beam/pull/12266#discussion_r456065229



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaCapableIOProvider.java
##########
@@ -264,4 +265,22 @@ private static boolean fieldPresent(
               schema.getField(field).getType(), Schema.EquivalenceNullablePolicy.IGNORE);
     }
   }
+
+  @Internal
+  @AutoValue
+  public abstract static class Config implements Serializable {
+    @Nullable
+    public abstract String getTimestampAttributeKey();
+
+    @Nullable
+    public abstract String getDeadLetterQueue();
+
+    private boolean useDlqCheck() {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on a change in pull request #12266: [BEAM-10494] PubsubSchemaCapableIOProvider config inner class rather than row

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on a change in pull request #12266:
URL: https://github.com/apache/beam/pull/12266#discussion_r455492426



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaCapableIOProvider.java
##########
@@ -264,4 +265,22 @@ private static boolean fieldPresent(
               schema.getField(field).getType(), Schema.EquivalenceNullablePolicy.IGNORE);
     }
   }
+
+  @Internal
+  @AutoValue
+  public abstract static class Config implements Serializable {

Review comment:
       Can we make this class private or package-private?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaCapableIOProvider.java
##########
@@ -155,16 +159,17 @@ private void validateConfigurationSchema(Row configuration) {
   /** An abstraction to create schema aware IOs. */
   @Internal
   private static class PubsubSchemaIO implements SchemaIO, Serializable {

Review comment:
       The `@Internal` flag is a warning to users to not depend on the labeled class. This class is not even visible to user because it is private, so the flag is not needed here.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaCapableIOProvider.java
##########
@@ -264,4 +265,22 @@ private static boolean fieldPresent(
               schema.getField(field).getType(), Schema.EquivalenceNullablePolicy.IGNORE);
     }
   }
+
+  @Internal
+  @AutoValue
+  public abstract static class Config implements Serializable {
+    @Nullable
+    public abstract String getTimestampAttributeKey();
+
+    @Nullable
+    public abstract String getDeadLetterQueue();
+
+    private boolean useDlqCheck() {

Review comment:
       Rename to `useDeadLetterQueue` for consistency?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaCapableIOProvider.java
##########
@@ -211,7 +216,11 @@ public POutput expand(PCollection<Row> input) {
           return input
               .apply(
                   RowToPubsubMessage.fromConfig(
-                      config, useFlatSchema, useTimestampAttribute(config)))
+                      new AutoValueSchema()

Review comment:
       Actually, I looked at the `RowToPubsubMessage` class and found 2 of the 3 parameters here (`config` and `useFlatSchema`) are not used at all. What was the reason to keep them there? Can we remove them?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on a change in pull request #12266: [BEAM-10494] PubsubSchemaCapableIOProvider config inner class rather than row

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on a change in pull request #12266:
URL: https://github.com/apache/beam/pull/12266#discussion_r456100965



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RowToPubsubMessage.java
##########
@@ -40,19 +39,14 @@
  * whether config.getValue("timestampAttributeKey") is set.
  */
 class RowToPubsubMessage extends PTransform<PCollection<Row>, PCollection<PubsubMessage>> {
-  private final Row config;
   private final Boolean useTimestampAttribute;

Review comment:
       Here as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12266: [BEAM-10494] PubsubSchemaCapableIOProvider config inner class rather than row

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12266:
URL: https://github.com/apache/beam/pull/12266#discussion_r456066020



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaCapableIOProvider.java
##########
@@ -211,7 +216,11 @@ public POutput expand(PCollection<Row> input) {
           return input
               .apply(
                   RowToPubsubMessage.fromConfig(
-                      config, useFlatSchema, useTimestampAttribute(config)))
+                      new AutoValueSchema()

Review comment:
       There was no reason to keep it from the old logic. Unnecessary variables removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on a change in pull request #12266: [BEAM-10494] PubsubSchemaCapableIOProvider config inner class rather than row

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on a change in pull request #12266:
URL: https://github.com/apache/beam/pull/12266#discussion_r455494296



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaCapableIOProvider.java
##########
@@ -155,16 +159,17 @@ private void validateConfigurationSchema(Row configuration) {
   /** An abstraction to create schema aware IOs. */
   @Internal
   private static class PubsubSchemaIO implements SchemaIO, Serializable {

Review comment:
       The `@Internal` flag is a warning to users to not depend on the labeled class. This class is not even visible to user because it is private, so the flag is not needed here. Same below.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12266: [BEAM-10494] PubsubSchemaCapableIOProvider config inner class rather than row

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12266:
URL: https://github.com/apache/beam/pull/12266#discussion_r456099969



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RowToPubsubMessage.java
##########
@@ -40,19 +39,14 @@
  * whether config.getValue("timestampAttributeKey") is set.
  */
 class RowToPubsubMessage extends PTransform<PCollection<Row>, PCollection<PubsubMessage>> {
-  private final Row config;
   private final Boolean useTimestampAttribute;
 
-  private RowToPubsubMessage(Row config, Boolean useFlatSchema, Boolean useTimestampAttribute) {
-    checkArgument(useFlatSchema, "RowToPubsubMessage is only supported for flattened schemas.");
-
-    this.config = config;
+  private RowToPubsubMessage(Boolean useTimestampAttribute) {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on pull request #12266: [BEAM-10494] PubsubSchemaCapableIOProvider config inner class rather than row

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on pull request #12266:
URL: https://github.com/apache/beam/pull/12266#issuecomment-659696481


   Will merge after all tests pass.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12266: [BEAM-10494] PubsubSchemaCapableIOProvider config inner class rather than row

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12266:
URL: https://github.com/apache/beam/pull/12266#discussion_r456064978



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaCapableIOProvider.java
##########
@@ -264,4 +265,22 @@ private static boolean fieldPresent(
               schema.getField(field).getType(), Schema.EquivalenceNullablePolicy.IGNORE);
     }
   }
+
+  @Internal
+  @AutoValue
+  public abstract static class Config implements Serializable {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org