You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/12/16 02:13:02 UTC

[jira] [Commented] (BEAM-454) Validate Pubsub Topic exists when reading

    [ https://issues.apache.org/jira/browse/BEAM-454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16293567#comment-16293567 ] 

ASF GitHub Bot commented on BEAM-454:
-------------------------------------

jkff closed pull request #2989: [BEAM-454] Validate Pubsub Topic exists when reading and writing
URL: https://github.com/apache/beam/pull/2989
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
index cfe36eeb47f..816dbd6aeea 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
@@ -296,6 +296,18 @@ public static TopicPath topicPathFromName(String projectId, String topicName) {
     return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName));
   }
 
+  boolean topicExistsInProject(String projectId, String topicName) throws IOException {
+    if (projectId == null) {
+      throw new IllegalArgumentException("projectId must not be null");
+    }
+    if (topicName == null) {
+      throw new IllegalArgumentException("topicName must not be null");
+    }
+    boolean topicExists = listTopics(projectPathFromId(projectId))
+        .contains(topicPathFromName(projectId, topicName));
+    return topicExists;
+  }
+
   /**
    * A message to be sent to Pubsub.
    *
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubHelpers.java
new file mode 100644
index 00000000000..5995429d2b4
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubHelpers.java
@@ -0,0 +1,28 @@
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import java.io.IOException;
+
+/** A set of helper functions and classes used by {@link PubsubIO}. */
+abstract class PubsubHelpers {
+
+  private static final String RESOURCE_NOT_FOUND_ERROR = "Pubsub %1$s %2$s not found for project"
+      + " \"%3$s\". Please create the %1$s %2$s before pipeline"
+      + " execution. If the %1$s is created by an earlier stage of the pipeline, this"
+      + " validation can be disabled using #withoutValidation.";
+
+  static void validateTopicExists(PubsubClient pubsubClient, String projectId, String topicName)
+      throws IOException {
+    try {
+      boolean topicExists = pubsubClient.topicExistsInProject(projectId, topicName);
+      if (!topicExists) {
+        throw new IllegalArgumentException(
+            String.format(RESOURCE_NOT_FOUND_ERROR, "topic", topicName, projectId));
+      }
+    } catch (IOException ie) {
+      throw new RuntimeException("Was not able to validate options: ", ie);
+    } finally {
+      pubsubClient.close();
+    }
+  }
+
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 048fded462b..56e142fc1c0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -38,10 +38,12 @@
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Read;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -427,7 +429,8 @@ public String asPath() {
 
    /** Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. */
   private static <T> Read<T> read() {
-    return new AutoValue_PubsubIO_Read.Builder<T>().setNeedsAttributes(false).build();
+    return new AutoValue_PubsubIO_Read.Builder<T>().setNeedsAttributes(false).setValidate(true)
+        .build();
   }
 
   /**
@@ -491,7 +494,7 @@ public String asPath() {
 
   /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */
   private static <T> Write<T> write() {
-    return new AutoValue_PubsubIO_Write.Builder<T>().build();
+    return new AutoValue_PubsubIO_Write.Builder<T>().setValidate(true).build();
   }
 
   /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */
@@ -547,6 +550,8 @@ public String asPath() {
     @Nullable
     abstract Coder<T> getCoder();
 
+    abstract boolean getValidate();
+
     /** User function for parsing PubsubMessage object. */
     @Nullable
     abstract SimpleFunction<PubsubMessage, T> getParseFn();
@@ -567,6 +572,8 @@ public String asPath() {
 
       abstract Builder<T> setCoder(Coder<T> coder);
 
+      abstract Builder<T> setValidate(boolean validate);
+
       abstract Builder<T> setParseFn(SimpleFunction<PubsubMessage, T> parseFn);
 
       abstract Builder<T> setNeedsAttributes(boolean needsAttributes);
@@ -574,6 +581,13 @@ public String asPath() {
       abstract Read<T> build();
     }
 
+    /**
+     * Disable validation that the topic exists prior to pipeline submission.
+     */
+    public Read withoutValidation() {
+      return toBuilder().setValidate(false).build();
+    }
+
     /**
      * Reads from the given subscription.
      *
@@ -724,6 +738,20 @@ public String asPath() {
       return input.apply(source).apply(MapElements.via(getParseFn()));
     }
 
+    @Override
+    public void validate(PipelineOptions options) {
+      if (getValidate() && getTopicProvider() != null) {
+        PubsubOptions pubsubOpts = options.as(PubsubOptions.class);
+        try (PubsubClient pubsubClient = FACTORY.newClient(getTimestampAttribute(),
+            getIdAttribute(), pubsubOpts)) {
+          PubsubTopic pubsubTopic = getTopicProvider().get();
+          PubsubHelpers.validateTopicExists(pubsubClient, pubsubTopic.project, pubsubTopic.topic);
+        } catch (IOException ie) {
+          throw new RuntimeException("Was not able to validate options: ", ie);
+        }
+      }
+    }
+
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
@@ -764,6 +792,8 @@ private PubsubIO() {}
     @Nullable
     abstract String getIdAttribute();
 
+    abstract boolean getValidate();
+
     /** The format function for input PubsubMessage objects. */
     @Nullable
     abstract SimpleFunction<T, PubsubMessage> getFormatFn();
@@ -780,6 +810,8 @@ private PubsubIO() {}
 
       abstract Builder<T> setFormatFn(SimpleFunction<T, PubsubMessage> formatFn);
 
+      abstract Builder<T> setValidate(boolean validate);
+
       abstract Write<T> build();
     }
 
@@ -828,6 +860,13 @@ private PubsubIO() {}
       return toBuilder().setIdAttribute(idAttribute).build();
     }
 
+    /**
+     * Disable validation that the topic exists prior to pipeline submission.
+     */
+    public Write<T> withoutValidation() {
+      return toBuilder().setValidate(false).build();
+    }
+
     /**
      * Used to write a PubSub message together with PubSub attributes. The user-supplied format
      * function translates the input type T to a PubsubMessage object, which is used by the sink
@@ -857,6 +896,20 @@ public PDone expand(PCollection<T> input) {
       throw new RuntimeException(); // cases are exhaustive.
     }
 
+    @Override
+    public void validate(PipelineOptions options) {
+      if (getValidate() && getTopicProvider() != null) {
+        PubsubOptions pubsubOpts = options.as(PubsubOptions.class);
+        try (PubsubClient pubsubClient = FACTORY.newClient(getTimestampAttribute(),
+            getIdAttribute(), pubsubOpts)) {
+          PubsubTopic pubsubTopic = getTopicProvider().get();
+          PubsubHelpers.validateTopicExists(pubsubClient, pubsubTopic.project, pubsubTopic.topic);
+        } catch (IOException ie) {
+          throw new RuntimeException("Was not able to validate options: ", ie);
+        }
+      }
+    }
+
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Validate Pubsub Topic exists when reading
> -----------------------------------------
>
>                 Key: BEAM-454
>                 URL: https://issues.apache.org/jira/browse/BEAM-454
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-gcp
>            Reporter: Frances Perry
>            Assignee: Borisa Zivkovic
>            Priority: Minor
>              Labels: newbie, starter
>
> When reading from Pubsub, we should validate the pubsub topic exists at graph construction time (similar to the way we validate a BQ dataset and table exist).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)