You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/11 12:11:51 UTC

[1/2] beam git commit: PubsubIO: remove support for BoundedReader

Repository: beam
Updated Branches:
  refs/heads/master c58f4f89b -> 84a96297c


PubsubIO: remove support for BoundedReader

Google Cloud Pub/Sub is not currently that useful in bounded mode --
it's a streaming source. Years ago, before the DirectRunner supported
unbounded PCollections and sources, however, we were unable to run the
streaming source in any SDK -- so we added a trivial bounded mode for
testing.

That trivial mode is no longer necessary. Additionally, it may confuse
users into thinking it's reliable (it's not), performant (it's not),
or has well defined semantics (it doesn't) -- it's really intended just
for testing.

Now that the DirectRunner supports everything we need -- unbounded
PCollections, non-blocking execution with cancelation, etc. -- we can
delete the bounded mode.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5f7c772c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5f7c772c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5f7c772c

Branch: refs/heads/master
Commit: 5f7c772cc4d21b220fa3b5dcec8b7d5bdba8685f
Parents: c58f4f8
Author: Dan Halperin <dh...@google.com>
Authored: Fri Apr 7 14:50:42 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 11 05:11:41 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |  11 -
 .../java/org/apache/beam/sdk/io/PubsubIO.java   | 219 ++-----------------
 .../org/apache/beam/sdk/io/PubsubIOTest.java    |  12 +-
 3 files changed, 22 insertions(+), 220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5f7c772c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 7212d4f..f789769 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -84,8 +84,6 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.FileBasedSink;
-import org.apache.beam.sdk.io.PubsubIO.Read.PubsubBoundedReader;
-import org.apache.beam.sdk.io.PubsubIO.Write.PubsubBoundedWriter;
 import org.apache.beam.sdk.io.PubsubUnboundedSink;
 import org.apache.beam.sdk.io.PubsubUnboundedSource;
 import org.apache.beam.sdk.io.Read;
@@ -304,15 +302,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
             PTransformOverride.of(
                 PTransformMatchers.emptyFlatten(), EmptyFlattenAsCreateFactory.instance()));
     if (streaming) {
-      // In streaming mode must use either the custom Pubsub unbounded source/sink or
-      // defer to Windmill's built-in implementation.
-      for (Class<? extends DoFn> unsupported :
-          ImmutableSet.of(PubsubBoundedReader.class, PubsubBoundedWriter.class)) {
-        overridesBuilder.add(
-            PTransformOverride.of(
-                PTransformMatchers.parDoWithFnType(unsupported),
-                UnsupportedOverrideFactory.withMessage(getUnsupportedMessage(unsupported, true))));
-      }
       if (!hasExperiment(options, "enable_custom_pubsub_source")) {
         overridesBuilder.add(
             PTransformOverride.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/5f7c772c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index c1ad353..67ab2ec 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
-import com.google.common.base.Strings;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -36,7 +35,6 @@ import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -46,7 +44,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.PubsubClient;
-import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
 import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
 import org.apache.beam.sdk.util.PubsubClient.ProjectPath;
 import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
@@ -55,7 +52,6 @@ import org.apache.beam.sdk.util.PubsubJsonClient;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
-import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -471,14 +467,9 @@ public class PubsubIO {
   }
 
   /**
-   * A {@link PTransform} that continuously reads from a Cloud Pub/Sub stream and
+   * A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream and
    * returns a {@link PCollection} of {@link String Strings} containing the items from
    * the stream.
-   *
-   * <p>When running with a {@link PipelineRunner} that only supports bounded
-   * {@link PCollection PCollections}, only a bounded portion of the input Pub/Sub stream
-   * can be processed. As such, either {@link PubsubIO.Read#maxNumRecords(int)} or
-   * {@link PubsubIO.Read#maxReadTime(Duration)} must be set.
    */
   public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
 
@@ -502,23 +493,16 @@ public class PubsubIO {
     @Nullable
     private final Coder<T> coder;
 
-    /** Stop after reading this many records. */
-    private final int maxNumRecords;
-
-    /** Stop after reading for this much time. */
-    @Nullable
-    private final Duration maxReadTime;
-
     /** User function for parsing PubsubMessage object. */
     SimpleFunction<PubsubMessage, T> parseFn;
 
     private Read() {
-      this(null, null, null, null, null, null, 0, null, null);
+      this(null, null, null, null, null, null, null);
     }
 
     private Read(String name, ValueProvider<PubsubSubscription> subscription,
         ValueProvider<PubsubTopic> topic, String timestampLabel, Coder<T> coder,
-        String idLabel, int maxNumRecords, Duration maxReadTime,
+        String idLabel,
         SimpleFunction<PubsubMessage, T> parseFn) {
       super(name);
       this.subscription = subscription;
@@ -526,8 +510,6 @@ public class PubsubIO {
       this.timestampLabel = timestampLabel;
       this.coder = coder;
       this.idLabel = idLabel;
-      this.maxNumRecords = maxNumRecords;
-      this.maxReadTime = maxReadTime;
       this.parseFn = parseFn;
     }
 
@@ -558,8 +540,7 @@ public class PubsubIO {
       }
       return new Read<>(
           name, NestedValueProvider.of(subscription, new SubscriptionTranslator()),
-          null /* reset topic to null */, timestampLabel, coder, idLabel, maxNumRecords,
-          maxReadTime, parseFn);
+          null /* reset topic to null */, timestampLabel, coder, idLabel, parseFn);
     }
 
     /**
@@ -587,7 +568,7 @@ public class PubsubIO {
       }
       return new Read<>(name, null /* reset subscription to null */,
           NestedValueProvider.of(topic, new TopicTranslator()),
-          timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn);
+          timestampLabel, coder, idLabel, parseFn);
     }
 
     /**
@@ -622,7 +603,7 @@ public class PubsubIO {
      */
     public Read<T> timestampLabel(String timestampLabel) {
       return new Read<>(
-          name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+          name, subscription, topic, timestampLabel, coder, idLabel,
           parseFn);
     }
 
@@ -638,7 +619,7 @@ public class PubsubIO {
      */
     public Read<T> idLabel(String idLabel) {
       return new Read<>(
-          name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+          name, subscription, topic, timestampLabel, coder, idLabel,
           parseFn);
     }
 
@@ -650,7 +631,7 @@ public class PubsubIO {
      */
     public Read<T> withCoder(Coder<T> coder) {
       return new Read<>(
-          name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+          name, subscription, topic, timestampLabel, coder, idLabel,
           parseFn);
     }
 
@@ -663,33 +644,6 @@ public class PubsubIO {
     public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) {
       return new Read<T>(
           name, subscription, topic, timestampLabel, coder, idLabel,
-          maxNumRecords, maxReadTime, parseFn);
-    }
-
-    /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
-     * records that will be read. The transform produces a <i>bounded</i> {@link PCollection}.
-     *
-     * <p>Either this option or {@link #maxReadTime(Duration)} must be set in order to create a
-     * bounded source.
-     */
-    public Read<T> maxNumRecords(int maxNumRecords) {
-      return new Read<>(
-          name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
-          parseFn);
-    }
-
-    /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
-     * duration during which records will be read.  The transform produces a <i>bounded</i>
-     * {@link PCollection}.
-     *
-     * <p>Either this option or {@link #maxNumRecords(int)} must be set in order to create a
-     * bounded source.
-     */
-    public Read<T> maxReadTime(Duration maxReadTime) {
-      return new Read<>(
-          name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
           parseFn);
     }
 
@@ -708,27 +662,18 @@ public class PubsubIO {
             + "the withCoder method.");
       }
 
-      boolean boundedOutput = getMaxNumRecords() > 0 || getMaxReadTime() != null;
-
-      if (boundedOutput) {
-        return input.getPipeline().begin()
-            .apply(Create.of((Void) null).withCoder(VoidCoder.of()))
-            .apply(ParDo.of(new PubsubBoundedReader()))
-            .setCoder(coder);
-      } else {
-        @Nullable ValueProvider<ProjectPath> projectPath =
-            topic == null ? null : NestedValueProvider.of(topic, new ProjectPathTranslator());
-        @Nullable ValueProvider<TopicPath> topicPath =
-            topic == null ? null : NestedValueProvider.of(topic, new TopicPathTranslator());
-        @Nullable ValueProvider<SubscriptionPath> subscriptionPath =
-            subscription == null
-                ? null
-                : NestedValueProvider.of(subscription, new SubscriptionPathTranslator());
-        return input.getPipeline().begin()
-            .apply(new PubsubUnboundedSource<T>(
-                FACTORY, projectPath, topicPath, subscriptionPath,
-                coder, timestampLabel, idLabel, parseFn));
-      }
+      @Nullable ValueProvider<ProjectPath> projectPath =
+          topic == null ? null : NestedValueProvider.of(topic, new ProjectPathTranslator());
+      @Nullable ValueProvider<TopicPath> topicPath =
+          topic == null ? null : NestedValueProvider.of(topic, new TopicPathTranslator());
+      @Nullable ValueProvider<SubscriptionPath> subscriptionPath =
+          subscription == null
+              ? null
+              : NestedValueProvider.of(subscription, new SubscriptionPathTranslator());
+      PubsubUnboundedSource<T> source = new PubsubUnboundedSource<T>(
+              FACTORY, projectPath, topicPath, subscriptionPath,
+              coder, timestampLabel, idLabel, parseFn);
+      return input.getPipeline().apply(source);
     }
 
     @Override
@@ -736,12 +681,6 @@ public class PubsubIO {
       super.populateDisplayData(builder);
       populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
 
-      builder
-          .addIfNotNull(DisplayData.item("maxReadTime", maxReadTime)
-              .withLabel("Maximum Read Time"))
-          .addIfNotDefault(DisplayData.item("maxNumRecords", maxNumRecords)
-              .withLabel("Maximum Read Records"), 0);
-
       if (subscription != null) {
         String subscriptionString = subscription.isAccessible()
             ? subscription.get().asPath() : subscription.toString();
@@ -811,21 +750,6 @@ public class PubsubIO {
     }
 
     /**
-     * Get the maximum number of records to read.
-     */
-    public int getMaxNumRecords() {
-      return maxNumRecords;
-    }
-
-    /**
-     * Get the maximum read time.
-     */
-    @Nullable
-    public Duration getMaxReadTime() {
-      return maxReadTime;
-    }
-
-    /**
      * Get the parse function used for PubSub attributes.
      */
     @Nullable
@@ -833,109 +757,6 @@ public class PubsubIO {
       return parseFn;
     }
 
-    /**
-     * Default reader when Pubsub subscription has some form of upper bound.
-     *
-     * <p>TODO: Consider replacing with BoundedReadFromUnboundedSource on top
-     * of PubsubUnboundedSource.
-     *
-     * <p>Public so can be suppressed by runners.
-     */
-    public class PubsubBoundedReader extends DoFn<Void, T> {
-
-      private static final int DEFAULT_PULL_SIZE = 100;
-      private static final int ACK_TIMEOUT_SEC = 60;
-
-      @ProcessElement
-      public void processElement(ProcessContext c) throws IOException {
-        try (PubsubClient pubsubClient =
-            FACTORY.newClient(timestampLabel, idLabel,
-                c.getPipelineOptions().as(PubsubOptions.class))) {
-
-          PubsubClient.SubscriptionPath subscriptionPath;
-          if (getSubscription() == null) {
-            TopicPath topicPath =
-                PubsubClient.topicPathFromName(getTopic().project, getTopic().topic);
-            // The subscription will be registered under this pipeline's project if we know it.
-            // Otherwise we'll fall back to the topic's project.
-            // Note that they don't need to be the same.
-            String projectId =
-                c.getPipelineOptions().as(PubsubOptions.class).getProject();
-            if (Strings.isNullOrEmpty(projectId)) {
-              projectId = getTopic().project;
-            }
-            ProjectPath projectPath = PubsubClient.projectPathFromId(projectId);
-            try {
-              subscriptionPath =
-                  pubsubClient.createRandomSubscription(projectPath, topicPath, ACK_TIMEOUT_SEC);
-            } catch (Exception e) {
-              throw new RuntimeException("Failed to create subscription: ", e);
-            }
-          } else {
-            subscriptionPath =
-                PubsubClient.subscriptionPathFromName(getSubscription().project,
-                    getSubscription().subscription);
-          }
-
-          Instant endTime = (getMaxReadTime() == null)
-              ? new Instant(Long.MAX_VALUE) : Instant.now().plus(getMaxReadTime());
-
-          List<IncomingMessage> messages = new ArrayList<>();
-
-          Throwable finallyBlockException = null;
-          try {
-            while ((getMaxNumRecords() == 0 || messages.size() < getMaxNumRecords())
-                && Instant.now().isBefore(endTime)) {
-              int batchSize = DEFAULT_PULL_SIZE;
-              if (getMaxNumRecords() > 0) {
-                batchSize = Math.min(batchSize, getMaxNumRecords() - messages.size());
-              }
-
-              List<IncomingMessage> batchMessages =
-                  pubsubClient.pull(System.currentTimeMillis(), subscriptionPath, batchSize,
-                      false);
-              List<String> ackIds = new ArrayList<>();
-              for (IncomingMessage message : batchMessages) {
-                messages.add(message);
-                ackIds.add(message.ackId);
-              }
-              if (ackIds.size() != 0) {
-                pubsubClient.acknowledge(subscriptionPath, ackIds);
-              }
-            }
-          } catch (IOException e) {
-            throw new RuntimeException("Unexpected exception while reading from Pubsub: ", e);
-          } finally {
-            if (getSubscription() == null) {
-              try {
-                pubsubClient.deleteSubscription(subscriptionPath);
-              } catch (Exception e) {
-                finallyBlockException = e;
-              }
-            }
-          }
-          if (finallyBlockException != null) {
-            throw new RuntimeException("Failed to delete subscription: ", finallyBlockException);
-          }
-
-          for (IncomingMessage message : messages) {
-            T element = null;
-            if (parseFn != null) {
-              element = parseFn.apply(new PubsubMessage(
-                  message.elementBytes, message.attributes));
-            } else {
-              element = CoderUtils.decodeFromByteArray(getCoder(), message.elementBytes);
-            }
-            c.outputWithTimestamp(element, new Instant(message.timestampMsSinceEpoch));
-          }
-        }
-      }
-
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        builder.delegate(Read.this);
-      }
-    }
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/5f7c772c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index 1538db2..c996409 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -95,17 +95,13 @@ public class PubsubIOTest {
     PubsubIO.Read<String> read = PubsubIO.<String>read()
         .topic(StaticValueProvider.of(topic))
         .timestampLabel("myTimestamp")
-        .idLabel("myId")
-        .maxNumRecords(1234)
-        .maxReadTime(maxReadTime);
+        .idLabel("myId");
 
     DisplayData displayData = DisplayData.from(read);
 
     assertThat(displayData, hasDisplayItem("topic", topic));
     assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
     assertThat(displayData, hasDisplayItem("idLabel", "myId"));
-    assertThat(displayData, hasDisplayItem("maxNumRecords", 1234));
-    assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime));
   }
 
   @Test
@@ -116,17 +112,13 @@ public class PubsubIOTest {
     PubsubIO.Read<String> read = PubsubIO.<String>read()
         .subscription(StaticValueProvider.of(subscription))
         .timestampLabel("myTimestamp")
-        .idLabel("myId")
-        .maxNumRecords(1234)
-        .maxReadTime(maxReadTime);
+        .idLabel("myId");
 
     DisplayData displayData = DisplayData.from(read);
 
     assertThat(displayData, hasDisplayItem("subscription", subscription));
     assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
     assertThat(displayData, hasDisplayItem("idLabel", "myId"));
-    assertThat(displayData, hasDisplayItem("maxNumRecords", 1234));
-    assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime));
   }
 
   @Test


[2/2] beam git commit: This closes #2471

Posted by dh...@apache.org.
This closes #2471


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/84a96297
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/84a96297
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/84a96297

Branch: refs/heads/master
Commit: 84a96297c4f790249890cd41775472240181cb62
Parents: c58f4f8 5f7c772
Author: Dan Halperin <dh...@google.com>
Authored: Tue Apr 11 05:11:44 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 11 05:11:44 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |  11 -
 .../java/org/apache/beam/sdk/io/PubsubIO.java   | 219 ++-----------------
 .../org/apache/beam/sdk/io/PubsubIOTest.java    |  12 +-
 3 files changed, 22 insertions(+), 220 deletions(-)
----------------------------------------------------------------------