You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/27 00:29:12 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #12090: [BEAM-10336] Add SchemaIO abstraction and implement for PubSub

TheNeuralBit commented on a change in pull request #12090:
URL: https://github.com/apache/beam/pull/12090#discussion_r446413871



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/InvalidConfigurationException.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.io;
+
+/** Exception thrown when the request for a table is invalid, such as invalid metadata. */

Review comment:
       ```suggestion
   /** Exception thrown when the configuration for a {@link SchemaIO} is invalid. */
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/SchemaCapableIOProvider.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.io;
+
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+
+public interface SchemaCapableIOProvider {
+  /** Returns an id that uniquely represents this IO. */
+  String identifier();
+
+  /**
+   * Returns the expected schema of the configuration object. Note this is distinct from the schema
+   * of the data source itself.
+   */
+  Schema configurationSchema();
+
+  /**
+   * Produce a SchemaIO given a String representing the data's location, the schema of the data that
+   * resides there, and some IO-specific configuration object.
+   */

Review comment:
       Can you add a note that this can throw an `InvalidConfigurationException`?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
##########
@@ -54,125 +47,29 @@ public String getTableType() {
   }
 
   @Override
-  public BeamSqlTable buildBeamSqlTable(Table tableDefintion) {
-    JSONObject tableProperties = tableDefintion.getProperties();
+  public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
+    JSONObject tableProperties = tableDefinition.getProperties();
     String timestampAttributeKey = tableProperties.getString("timestampAttributeKey");
     String deadLetterQueue = tableProperties.getString("deadLetterQueue");
-    validateDlq(deadLetterQueue);
-
-    Schema schema = tableDefintion.getSchema();
-    validateEventTimestamp(schema);
-
-    PubsubIOTableConfiguration config =
-        PubsubIOTableConfiguration.builder()
-            .setSchema(schema)
-            .setTimestampAttribute(timestampAttributeKey)
-            .setDeadLetterQueue(deadLetterQueue)
-            .setTopic(tableDefintion.getLocation())
-            .setUseFlatSchema(!definesAttributeAndPayload(schema))
-            .build();
-
-    return PubsubIOJsonTable.withConfiguration(config);
-  }
-
-  private void validateEventTimestamp(Schema schema) {
-    if (!fieldPresent(schema, TIMESTAMP_FIELD, TIMESTAMP)) {
-      throw new InvalidTableException(
-          "Unsupported schema specified for Pubsub source in CREATE TABLE."
-              + "CREATE TABLE for Pubsub topic must include at least 'event_timestamp' field of "
-              + "type 'TIMESTAMP'");
-    }
-  }
-
-  private boolean definesAttributeAndPayload(Schema schema) {
-    return fieldPresent(
-            schema, ATTRIBUTES_FIELD, Schema.FieldType.map(VARCHAR.withNullable(false), VARCHAR))
-        && (schema.hasField(PAYLOAD_FIELD)
-            && ROW.equals(schema.getField(PAYLOAD_FIELD).getType().getTypeName()));
-  }
-
-  private boolean fieldPresent(Schema schema, String field, Schema.FieldType expectedType) {
-    return schema.hasField(field)
-        && expectedType.equivalent(
-            schema.getField(field).getType(), Schema.EquivalenceNullablePolicy.IGNORE);
-  }
-
-  private void validateDlq(String deadLetterQueue) {
-    if (deadLetterQueue != null && deadLetterQueue.isEmpty()) {
-      throw new InvalidTableException("Dead letter queue topic name is not specified");
-    }
-  }
-
-  @AutoValue
-  public abstract static class PubsubIOTableConfiguration implements Serializable {
-    public boolean useDlq() {
-      return getDeadLetterQueue() != null;
-    }
-
-    public boolean useTimestampAttribute() {
-      return getTimestampAttribute() != null;
-    }
-
-    /** Determines whether or not the messages should be represented with a flattened schema. */
-    abstract boolean getUseFlatSchema();
-
-    /**
-     * Optional attribute key of the Pubsub message from which to extract the event timestamp.
-     *
-     * <p>This attribute has to conform to the same requirements as in {@link
-     * PubsubIO.Read.Builder#withTimestampAttribute}.
-     *
-     * <p>Short version: it has to be either millis since epoch or string in RFC 3339 format.
-     *
-     * <p>If the attribute is specified then event timestamps will be extracted from the specified
-     * attribute. If it is not specified then message publish timestamp will be used.
-     */
-    @Nullable
-    abstract String getTimestampAttribute();
-
-    /**
-     * Optional topic path which will be used as a dead letter queue.
-     *
-     * <p>Messages that cannot be processed will be sent to this topic. If it is not specified then
-     * exception will be thrown for errors during processing causing the pipeline to crash.
-     */
-    @Nullable
-    abstract String getDeadLetterQueue();
 
-    /**
-     * Pubsub topic name.
-     *
-     * <p>Topic is the only way to specify the Pubsub source. Explicitly specifying the subscription
-     * is not supported at the moment. Subscriptions are automatically created (but not deleted).
-     */
-    abstract String getTopic();
+    Schema schema = tableDefinition.getSchema();
+    String location = tableDefinition.getLocation();
+    Schema dataSchema = tableDefinition.getSchema();
 
-    /**
-     * Table schema, describes Pubsub message schema.
-     *
-     * <p>If {@link #getUseFlatSchema()} is not set, schema must contain exactly fields
-     * 'event_timestamp', 'attributes, and 'payload'. Else, it must contain just 'event_timestamp'.
-     * See {@linkA PubsubMessageToRow} for details.
-     */
-    public abstract Schema getSchema();
+    PubsubSchemaCapableIOProvider ioProvider = new PubsubSchemaCapableIOProvider();
+    Schema configurationSchema = ioProvider.configurationSchema();
 
-    static Builder builder() {
-      return new AutoValue_PubsubJsonTableProvider_PubsubIOTableConfiguration.Builder();
-    }
-
-    @AutoValue.Builder
-    abstract static class Builder {
-      abstract Builder setUseFlatSchema(boolean useFlatSchema);
-
-      abstract Builder setSchema(Schema schema);
-
-      abstract Builder setTimestampAttribute(String timestampAttribute);
-
-      abstract Builder setDeadLetterQueue(String deadLetterQueue);
-
-      abstract Builder setTopic(String topic);
+    Row configurationRow =
+        Row.withSchema(configurationSchema)
+            .withFieldValue("timestampAttributeKey", timestampAttributeKey)
+            .withFieldValue("deadLetterQueue", deadLetterQueue)
+            .build();
 
-      abstract PubsubIOTableConfiguration build();
+    try {
+      SchemaIO pubsubSchemaIO = ioProvider.from(location, configurationRow, dataSchema);
+      return PubsubIOJsonTable.withConfiguration(pubsubSchemaIO, schema);
+    } catch (Exception InvalidConfigurationException) {
+      throw new InvalidTableException("Invalid configuration of table");

Review comment:
       This should just catch `InvalidConfigurationException e`, not any `Exception`.
   
   Also you should include the information from the original exception when re-throwing. Either use `e.getMessage()` to insert the original message into the new one, or add the exception as a cause (`new InvalidTableException(msg, e)`), or both.
   
   I think that should fix the unit test that's failing.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
##########
@@ -54,125 +47,29 @@ public String getTableType() {
   }
 
   @Override
-  public BeamSqlTable buildBeamSqlTable(Table tableDefintion) {
-    JSONObject tableProperties = tableDefintion.getProperties();
+  public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
+    JSONObject tableProperties = tableDefinition.getProperties();
     String timestampAttributeKey = tableProperties.getString("timestampAttributeKey");
     String deadLetterQueue = tableProperties.getString("deadLetterQueue");
-    validateDlq(deadLetterQueue);
-
-    Schema schema = tableDefintion.getSchema();
-    validateEventTimestamp(schema);
-
-    PubsubIOTableConfiguration config =
-        PubsubIOTableConfiguration.builder()
-            .setSchema(schema)
-            .setTimestampAttribute(timestampAttributeKey)
-            .setDeadLetterQueue(deadLetterQueue)
-            .setTopic(tableDefintion.getLocation())
-            .setUseFlatSchema(!definesAttributeAndPayload(schema))
-            .build();
-
-    return PubsubIOJsonTable.withConfiguration(config);
-  }
-
-  private void validateEventTimestamp(Schema schema) {
-    if (!fieldPresent(schema, TIMESTAMP_FIELD, TIMESTAMP)) {
-      throw new InvalidTableException(
-          "Unsupported schema specified for Pubsub source in CREATE TABLE."
-              + "CREATE TABLE for Pubsub topic must include at least 'event_timestamp' field of "
-              + "type 'TIMESTAMP'");
-    }
-  }
-
-  private boolean definesAttributeAndPayload(Schema schema) {
-    return fieldPresent(
-            schema, ATTRIBUTES_FIELD, Schema.FieldType.map(VARCHAR.withNullable(false), VARCHAR))
-        && (schema.hasField(PAYLOAD_FIELD)
-            && ROW.equals(schema.getField(PAYLOAD_FIELD).getType().getTypeName()));
-  }
-
-  private boolean fieldPresent(Schema schema, String field, Schema.FieldType expectedType) {
-    return schema.hasField(field)
-        && expectedType.equivalent(
-            schema.getField(field).getType(), Schema.EquivalenceNullablePolicy.IGNORE);
-  }
-
-  private void validateDlq(String deadLetterQueue) {
-    if (deadLetterQueue != null && deadLetterQueue.isEmpty()) {
-      throw new InvalidTableException("Dead letter queue topic name is not specified");
-    }
-  }
-
-  @AutoValue
-  public abstract static class PubsubIOTableConfiguration implements Serializable {
-    public boolean useDlq() {
-      return getDeadLetterQueue() != null;
-    }
-
-    public boolean useTimestampAttribute() {
-      return getTimestampAttribute() != null;
-    }
-
-    /** Determines whether or not the messages should be represented with a flattened schema. */
-    abstract boolean getUseFlatSchema();
-
-    /**
-     * Optional attribute key of the Pubsub message from which to extract the event timestamp.
-     *
-     * <p>This attribute has to conform to the same requirements as in {@link
-     * PubsubIO.Read.Builder#withTimestampAttribute}.
-     *
-     * <p>Short version: it has to be either millis since epoch or string in RFC 3339 format.
-     *
-     * <p>If the attribute is specified then event timestamps will be extracted from the specified
-     * attribute. If it is not specified then message publish timestamp will be used.
-     */
-    @Nullable
-    abstract String getTimestampAttribute();
-
-    /**
-     * Optional topic path which will be used as a dead letter queue.
-     *
-     * <p>Messages that cannot be processed will be sent to this topic. If it is not specified then
-     * exception will be thrown for errors during processing causing the pipeline to crash.
-     */
-    @Nullable
-    abstract String getDeadLetterQueue();
 
-    /**
-     * Pubsub topic name.
-     *
-     * <p>Topic is the only way to specify the Pubsub source. Explicitly specifying the subscription
-     * is not supported at the moment. Subscriptions are automatically created (but not deleted).
-     */
-    abstract String getTopic();
+    Schema schema = tableDefinition.getSchema();
+    String location = tableDefinition.getLocation();
+    Schema dataSchema = tableDefinition.getSchema();
 
-    /**
-     * Table schema, describes Pubsub message schema.
-     *
-     * <p>If {@link #getUseFlatSchema()} is not set, schema must contain exactly fields
-     * 'event_timestamp', 'attributes, and 'payload'. Else, it must contain just 'event_timestamp'.
-     * See {@linkA PubsubMessageToRow} for details.
-     */
-    public abstract Schema getSchema();

Review comment:
       I don't think we want to lose the information in these doc comments. Could you move it into the javadoc for `PubSubSchemaCapableIOProvider`?
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org