You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "manavgarg (via GitHub)" <gi...@apache.org> on 2023/07/05 20:04:22 UTC

[GitHub] [beam] manavgarg commented on a diff in pull request #27366: Moving Pubsub Transforms to beam

manavgarg commented on code in PR #27366:
URL: https://github.com/apache/beam/pull/27366#discussion_r1253589655


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java:
##########
@@ -32,179 +32,29 @@
 @DefaultSchema(AutoValueSchema.class)
 @AutoValue
 public abstract class PubsubWriteSchemaTransformConfiguration {
+  @SchemaFieldDescription(
+      "The encoding format for the data stored in Pubsub. Valid options are: "
+          + PubsubWriteSchemaTransformProvider.VALID_FORMATS_STR)
+  public abstract String getFormat();
 
-  public static final String DEFAULT_TIMESTAMP_ATTRIBUTE = "event_timestamp";
+  @SchemaFieldDescription(
+      "The name of the topic to write data to. " + "Format: projects/${PROJECT}/topics/${TOPIC}")
+  public abstract String getTopic();
 
   public static Builder builder() {
     return new AutoValue_PubsubWriteSchemaTransformConfiguration.Builder();
   }
 
-  public static TargetConfiguration.Builder targetConfigurationBuilder() {
-    return new AutoValue_PubsubWriteSchemaTransformConfiguration_TargetConfiguration.Builder()
-        .setTimestampAttributeKey(DEFAULT_TIMESTAMP_ATTRIBUTE);
-  }
-
-  public static SourceConfiguration.Builder sourceConfigurationBuilder() {
-    return new AutoValue_PubsubWriteSchemaTransformConfiguration_SourceConfiguration.Builder();
-  }
-
-  /**
-   * Configuration details of the source {@link org.apache.beam.sdk.values.Row} {@link
-   * org.apache.beam.sdk.schemas.Schema}.
-   */
-  @Nullable
-  public abstract SourceConfiguration getSource();
-
-  /** Configuration details of the target {@link PubsubMessage}. */
-  public abstract TargetConfiguration getTarget();
-
-  /**
-   * The topic to which to write Pub/Sub messages.
-   *
-   * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
-   * topic string.
-   */
-  public abstract String getTopic();
-
-  /**
-   * The expected format of the Pub/Sub message.
-   *
-   * <p>Used to retrieve the {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer} from
-   * {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}. See list of supported
-   * values by invoking {@link org.apache.beam.sdk.schemas.io.Providers#loadProviders(Class)}.
-   *
-   * <pre>{@code Providers.loadProviders(PayloadSerializer.class).keySet()}</pre>
-   */
-  @Nullable
-  public abstract String getFormat();
-
-  /**
-   * When writing to Cloud Pub/Sub where unique record identifiers are provided as Pub/Sub message
-   * attributes, specifies the name of the attribute containing the unique identifier.
-   */
-  @Nullable
-  public abstract String getIdAttribute();
-
-  /** Builder for {@link PubsubWriteSchemaTransformConfiguration}. */
   @AutoValue.Builder
   public abstract static class Builder {
+    public abstract Builder setFormat(String format);
 
-    /**
-     * Configuration details of the source {@link org.apache.beam.sdk.values.Row} {@link
-     * org.apache.beam.sdk.schemas.Schema}.
-     */
-    public abstract Builder setSource(SourceConfiguration value);
-
-    /** Configuration details of the target {@link PubsubMessage}. */
-    public abstract Builder setTarget(TargetConfiguration value);
-
-    /**
-     * The topic to which to write Pub/Sub messages.
-     *
-     * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
-     * topic string.
-     */
-    public abstract Builder setTopic(String value);
-
-    /**
-     * The expected format of the Pub/Sub message.
-     *
-     * <p>Used to retrieve the {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer}
-     * from {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}. See list of
-     * supported values by invoking {@link
-     * org.apache.beam.sdk.schemas.io.Providers#loadProviders(Class)}.
-     *
-     * <pre>{@code Providers.loadProviders(PayloadSerializer.class).keySet()}</pre>
-     */
-    public abstract Builder setFormat(String value);
-
-    /**
-     * When reading from Cloud Pub/Sub where unique record identifiers are provided as Pub/Sub
-     * message attributes, specifies the name of the attribute containing the unique identifier.
-     */
-    public abstract Builder setIdAttribute(String value);
+    public abstract Builder setTopic(String topic);
 
     public abstract PubsubWriteSchemaTransformConfiguration build();
   }
 
-  @DefaultSchema(AutoValueSchema.class)
-  @AutoValue
-  public abstract static class SourceConfiguration {
-    /**
-     * The attributes field name of the source {@link org.apache.beam.sdk.values.Row}. {@link
-     * org.apache.beam.sdk.schemas.Schema.FieldType} must be a <code>Map&lt;String, String&gt;
-     * </code>
-     */
-    @Nullable
-    public abstract String getAttributesFieldName();
-
-    /**
-     * The timestamp field name of the source {@link org.apache.beam.sdk.values.Row}. {@link
-     * org.apache.beam.sdk.schemas.Schema.FieldType} must be a {@link
-     * org.apache.beam.sdk.schemas.Schema.FieldType#DATETIME}.
-     */
-    @Nullable
-    public abstract String getTimestampFieldName();
-
-    /**
-     * The payload field name of the source {@link org.apache.beam.sdk.values.Row}. {@link
-     * org.apache.beam.sdk.schemas.Schema.FieldType} must be either {@link
-     * org.apache.beam.sdk.schemas.Schema.FieldType#BYTES} or a {@link
-     * org.apache.beam.sdk.values.Row}. If null, payload serialized from user fields other than
-     * attributes. Not compatible with other payload intended fields.
-     */
-    @Nullable
-    public abstract String getPayloadFieldName();
-
-    @AutoValue.Builder
-    public abstract static class Builder {
-      /**
-       * The attributes field name of the source {@link org.apache.beam.sdk.values.Row}. {@link
-       * org.apache.beam.sdk.schemas.Schema.FieldType} must be a <code>Map&lt;String, String&gt;
-       * </code>
-       */
-      public abstract Builder setAttributesFieldName(String value);
-
-      /**
-       * The timestamp field name of the source {@link org.apache.beam.sdk.values.Row}. {@link
-       * org.apache.beam.sdk.schemas.Schema.FieldType} must be a {@link
-       * org.apache.beam.sdk.schemas.Schema.FieldType#DATETIME}.
-       */
-      public abstract Builder setTimestampFieldName(String value);
-
-      /**
-       * The payload field name of the source {@link org.apache.beam.sdk.values.Row}. {@link
-       * org.apache.beam.sdk.schemas.Schema.FieldType} must be either {@link
-       * org.apache.beam.sdk.schemas.Schema.FieldType#BYTES} or a {@link
-       * org.apache.beam.sdk.values.Row}. If null, payload serialized from user fields other than
-       * attributes. Not compatible with other payload intended fields.
-       */
-      public abstract Builder setPayloadFieldName(String value);
-
-      public abstract SourceConfiguration build();
-    }
-  }
-
-  @DefaultSchema(AutoValueSchema.class)
-  @AutoValue
-  public abstract static class TargetConfiguration {
-
-    /**
-     * The attribute key to assign the {@link PubsubMessage} stringified timestamp value. {@link
-     * #builder()} method defaults value to {@link #DEFAULT_TIMESTAMP_ATTRIBUTE}.
-     */
-    public abstract String getTimestampAttributeKey();
-
-    @AutoValue.Builder
-    public abstract static class Builder {
-
-      /**
-       * The attribute key to assign the {@link PubsubMessage} stringified timestamp value. Defaults
-       * to {@link #DEFAULT_TIMESTAMP_ATTRIBUTE}.
-       */
-      public abstract Builder setTimestampAttributeKey(String value);
-
-      public abstract TargetConfiguration build();
-    }
-  }
+  // public static PubsubWriteSchemaTransformConfiguration create(String format, String topic) {

Review Comment:
   can we remove these comments?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org