You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/10/28 18:32:14 UTC

[1/2] incubator-beam git commit: Closes #1190

Repository: incubator-beam
Updated Branches:
  refs/heads/master 9c3e3e7a3 -> e82c5d224


Closes #1190


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e82c5d22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e82c5d22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e82c5d22

Branch: refs/heads/master
Commit: e82c5d2246372041ee7cc362363b2ac959a6bd4a
Parents: 9c3e3e7 cd454aa
Author: Dan Halperin <dh...@google.com>
Authored: Fri Oct 28 11:32:06 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Oct 28 11:32:06 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/PubsubUnboundedSource.java      | 123 +++++++++++++------
 .../apache/beam/sdk/util/PubsubTestClient.java  |  32 +++++
 .../beam/sdk/io/PubsubUnboundedSourceTest.java  |  79 ++++++++++++
 3 files changed, 199 insertions(+), 35 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: Generate PubSub Subscriptions during Pipeline Execution

Posted by dh...@apache.org.
Generate PubSub Subscriptions during Pipeline Execution

Instead of generating a random subscription at apply-time, create one
as required.

If splitIntoBundles is exercised, generate a subscription if there is
none specified, and create a new PubSub source that uses that
subscription. This is otherwise identical to the current behavior.

If splitIntoBundles is not exercised, and createReader is called with no
checkpoint, create a random subscription. Store a reference to this
subscription is the PubSub checkpoint, including when materialized.
This changes the wire format of PubSub checkpoints, and
as such this change is not update-compatible.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cd454aa5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cd454aa5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cd454aa5

Branch: refs/heads/master
Commit: cd454aa502e0a4e122e38c0c50403483832ada29
Parents: 9c3e3e7
Author: Thomas Groh <tg...@google.com>
Authored: Tue Oct 25 14:51:08 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Oct 28 11:32:06 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/PubsubUnboundedSource.java      | 123 +++++++++++++------
 .../apache/beam/sdk/util/PubsubTestClient.java  |  32 +++++
 .../beam/sdk/io/PubsubUnboundedSourceTest.java  |  79 ++++++++++++
 3 files changed, 199 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd454aa5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index 7617689..bfacb71 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -48,6 +48,7 @@ import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PubsubOptions;
@@ -228,6 +229,13 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
   @VisibleForTesting
   static class PubsubCheckpoint<T> implements UnboundedSource.CheckpointMark {
     /**
+     * The {@link SubscriptionPath} to the subscription the reader is reading from. May be
+     * {@code null} if the {@link PubsubUnboundedSource} contains the subscription.
+     */
+    @VisibleForTesting
+    @Nullable String subscriptionPath;
+
+    /**
      * If the checkpoint is for persisting: the reader who's snapshotted state we are persisting.
      * If the checkpoint is for restoring: {@literal null}.
      * Not persisted in durable checkpoint.
@@ -255,13 +263,23 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     final List<String> notYetReadIds;
 
     public PubsubCheckpoint(
-        @Nullable PubsubReader<T> reader, @Nullable List<String> safeToAckIds,
+        @Nullable String subscriptionPath,
+        @Nullable PubsubReader<T> reader,
+        @Nullable List<String> safeToAckIds,
         List<String> notYetReadIds) {
+      this.subscriptionPath = subscriptionPath;
       this.reader = reader;
       this.safeToAckIds = safeToAckIds;
       this.notYetReadIds = notYetReadIds;
     }
 
+    @Nullable
+    private SubscriptionPath getSubscription() {
+      return subscriptionPath == null
+          ? null
+          : PubsubClient.subscriptionPathFromPath(subscriptionPath);
+    }
+
     /**
      * BLOCKING
      * All messages which have been passed downstream have now been durably committed.
@@ -335,22 +353,27 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     }
   }
 
-  /**
-   * The coder for our checkpoints.
-   */
+  /** The coder for our checkpoints. */
   private static class PubsubCheckpointCoder<T> extends AtomicCoder<PubsubCheckpoint<T>> {
+    private static final Coder<String> SUBSCRIPTION_PATH_CODER =
+        NullableCoder.of(StringUtf8Coder.of());
     private static final Coder<List<String>> LIST_CODER = ListCoder.of(StringUtf8Coder.of());
 
     @Override
     public void encode(PubsubCheckpoint<T> value, OutputStream outStream, Context context)
         throws IOException {
+      SUBSCRIPTION_PATH_CODER.encode(
+          value.subscriptionPath,
+          outStream,
+          context.nested());
       LIST_CODER.encode(value.notYetReadIds, outStream, context);
     }
 
     @Override
     public PubsubCheckpoint<T> decode(InputStream inStream, Context context) throws IOException {
+      String path = SUBSCRIPTION_PATH_CODER.decode(inStream, context.nested());
       List<String> notYetReadIds = LIST_CODER.decode(inStream, context);
-      return new PubsubCheckpoint<>(null, null, notYetReadIds);
+      return new PubsubCheckpoint<>(path, null, null, notYetReadIds);
     }
   }
 
@@ -368,6 +391,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
      * For access to topic and checkpointCoder.
      */
     private final PubsubSource<T> outer;
+    @VisibleForTesting
+    final SubscriptionPath subscription;
 
     /**
      * Client on which to talk to Pubsub. Null if closed.
@@ -555,9 +580,10 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     /**
      * Construct a reader.
      */
-    public PubsubReader(PubsubOptions options, PubsubSource<T> outer)
+    public PubsubReader(PubsubOptions options, PubsubSource<T> outer, SubscriptionPath subscription)
         throws IOException, GeneralSecurityException {
       this.outer = outer;
+      this.subscription = subscription;
       pubsubClient =
           outer.outer.pubsubFactory.newClient(outer.outer.timestampLabel, outer.outer.idLabel,
                                               options);
@@ -605,7 +631,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
      * CAUTION: Retains {@code ackIds}.
      */
     void ackBatch(List<String> ackIds) throws IOException {
-      pubsubClient.acknowledge(outer.outer.subscription, ackIds);
+      pubsubClient.acknowledge(subscription, ackIds);
       ackedIds.add(ackIds);
     }
 
@@ -615,7 +641,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
      * with the given {@code ockIds}. Does not retain {@code ackIds}.
      */
     public void nackBatch(long nowMsSinceEpoch, List<String> ackIds) throws IOException {
-      pubsubClient.modifyAckDeadline(outer.outer.subscription, ackIds, 0);
+      pubsubClient.modifyAckDeadline(subscription, ackIds, 0);
       numNacked.add(nowMsSinceEpoch, ackIds.size());
     }
 
@@ -626,7 +652,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
      */
     private void extendBatch(long nowMsSinceEpoch, List<String> ackIds) throws IOException {
       int extensionSec = (ackTimeoutMs * ACK_EXTENSION_PCT) / (100 * 1000);
-      pubsubClient.modifyAckDeadline(outer.outer.subscription, ackIds, extensionSec);
+      pubsubClient.modifyAckDeadline(subscription, ackIds, extensionSec);
       numExtendedDeadlines.add(nowMsSinceEpoch, ackIds.size());
     }
 
@@ -762,7 +788,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
       // BLOCKs until received.
       Collection<PubsubClient.IncomingMessage> receivedMessages =
           pubsubClient.pull(requestTimeMsSinceEpoch,
-                            outer.outer.subscription,
+                            subscription,
                             PULL_BATCH_SIZE, true);
       if (receivedMessages.isEmpty()) {
         // Nothing available yet. Try again later.
@@ -842,7 +868,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
                + "{} recent watermark skew, "
                + "{} recent late messages, "
                + "{} last reported watermark",
-               outer.outer.subscription,
+               subscription,
                numReceived,
                notYetRead.size(),
                notYetReadBytes,
@@ -868,7 +894,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     @Override
     public boolean start() throws IOException {
       // Determine the ack timeout.
-      ackTimeoutMs = pubsubClient.ackDeadlineSeconds(outer.outer.subscription) * 1000;
+      ackTimeoutMs = pubsubClient.ackDeadlineSeconds(subscription) * 1000;
       return advance();
     }
 
@@ -1018,7 +1044,12 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
       for (PubsubClient.IncomingMessage incomingMessage : notYetRead) {
         snapshotNotYetReadIds.add(incomingMessage.ackId);
       }
-      return new PubsubCheckpoint<>(this, snapshotSafeToAckIds, snapshotNotYetReadIds);
+      if (outer.subscriptionPath == null) {
+        // need to include the subscription in case we resume, as it's not stored in the source.
+        return new PubsubCheckpoint<>(
+            subscription.getPath(), this, snapshotSafeToAckIds, snapshotNotYetReadIds);
+      }
+      return new PubsubCheckpoint<>(null, this, snapshotSafeToAckIds, snapshotNotYetReadIds);
     }
 
     @Override
@@ -1034,19 +1065,31 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
   @VisibleForTesting
   static class PubsubSource<T> extends UnboundedSource<T, PubsubCheckpoint<T>> {
     public final PubsubUnboundedSource<T> outer;
+    // The subscription to read from.
+    @VisibleForTesting
+    final SubscriptionPath subscriptionPath;
 
     public PubsubSource(PubsubUnboundedSource<T> outer) {
+      this(outer, outer.getSubscription());
+    }
+
+    private PubsubSource(PubsubUnboundedSource<T> outer, SubscriptionPath subscriptionPath) {
       this.outer = outer;
+      this.subscriptionPath = subscriptionPath;
     }
 
     @Override
     public List<PubsubSource<T>> generateInitialSplits(
         int desiredNumSplits, PipelineOptions options) throws Exception {
       List<PubsubSource<T>> result = new ArrayList<>(desiredNumSplits);
+      PubsubSource<T> splitSource = this;
+      if (subscriptionPath == null) {
+        splitSource = new PubsubSource<>(outer, outer.createRandomSubscription(options));
+      }
       for (int i = 0; i < desiredNumSplits * SCALE_OUT; i++) {
         // Since the source is immutable and Pubsub automatically shards we simply
         // replicate ourselves the requested number of times
-        result.add(this);
+        result.add(splitSource);
       }
       return result;
     }
@@ -1056,10 +1099,20 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
         PipelineOptions options,
         @Nullable PubsubCheckpoint<T> checkpoint) {
       PubsubReader<T> reader;
+      SubscriptionPath subscription = subscriptionPath;
+      if (subscription == null) {
+        if (checkpoint == null) {
+          // This reader has never been started and there was no call to #splitIntoBundles; create
+          // a single random subscription, which will be kept in the checkpoint.
+          subscription = outer.createRandomSubscription(options);
+        } else {
+          subscription = checkpoint.getSubscription();
+        }
+      }
       try {
-        reader = new PubsubReader<>(options.as(PubsubOptions.class), this);
+        reader = new PubsubReader<>(options.as(PubsubOptions.class), this, subscription);
       } catch (GeneralSecurityException | IOException e) {
-        throw new RuntimeException("Unable to subscribe to " + outer.subscription + ": ", e);
+        throw new RuntimeException("Unable to subscribe to " + subscriptionPath + ": ", e);
       }
       if (checkpoint != null) {
         // NACK all messages we may have lost.
@@ -1068,7 +1121,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
           checkpoint.nackAll(reader);
         } catch (IOException e) {
           LOG.error("Pubsub {} cannot have {} lost messages NACKed, ignoring: {}",
-                    outer.subscription, checkpoint.notYetReadIds.size(), e);
+                    subscriptionPath, checkpoint.notYetReadIds.size(), e);
         }
       }
       return reader;
@@ -1272,27 +1325,27 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
 
   @Override
   public PCollection<T> apply(PBegin input) {
-    if (subscription == null) {
-      try {
-        try (PubsubClient pubsubClient =
-                 pubsubFactory.newClient(timestampLabel, idLabel,
-                                         input.getPipeline()
-                                              .getOptions()
-                                              .as(PubsubOptions.class))) {
-          subscription =
-              pubsubClient.createRandomSubscription(project, topic, DEAULT_ACK_TIMEOUT_SEC);
-          LOG.warn("Created subscription {} to topic {}."
-                   + " Note this subscription WILL NOT be deleted when the pipeline terminates",
-                   subscription, topic);
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Failed to create subscription: ", e);
-      }
-    }
-
     return input.getPipeline().begin()
                 .apply(Read.from(new PubsubSource<T>(this)))
                 .apply("PubsubUnboundedSource.Stats",
                     ParDo.of(new StatsFn<T>(pubsubFactory, subscription, timestampLabel, idLabel)));
   }
+
+  private SubscriptionPath createRandomSubscription(PipelineOptions options) {
+    try {
+      try (PubsubClient pubsubClient =
+          pubsubFactory.newClient(timestampLabel, idLabel, options.as(PubsubOptions.class))) {
+        SubscriptionPath subscriptionPath =
+            pubsubClient.createRandomSubscription(project, topic, DEAULT_ACK_TIMEOUT_SEC);
+        LOG.warn(
+            "Created subscription {} to topic {}."
+                + " Note this subscription WILL NOT be deleted when the pipeline terminates",
+            subscriptionPath,
+            topic);
+        return subscriptionPath;
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create subscription: ", e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd454aa5/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
index 3fab151..45477fc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
@@ -213,6 +213,38 @@ public class PubsubTestClient extends PubsubClient {
     };
   }
 
+  public static PubsubTestClientFactory createFactoryForCreateSubscription() {
+    return new PubsubTestClientFactory() {
+      int numCalls = 0;
+
+      @Override
+      public void close() throws IOException {
+        checkState(
+            numCalls == 1, "Expected exactly one subscription to be created, got %s", numCalls);
+      }
+
+      @Override
+      public PubsubClient newClient(
+          @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+          throws IOException {
+        return new PubsubTestClient() {
+          @Override
+          public void createSubscription(
+              TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds)
+              throws IOException {
+            checkState(numCalls == 0, "Expected at most one subscription to be created");
+            numCalls++;
+          }
+        };
+      }
+
+      @Override
+      public String getKind() {
+        return "CreateSubscriptionTest";
+      }
+    };
+  }
+
   /**
    * Return true if in pull mode.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd454aa5/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
index c46eca5..bbc6c12 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
@@ -19,7 +19,12 @@
 package org.apache.beam.sdk.io;
 
 import static junit.framework.TestCase.assertFalse;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -38,12 +43,15 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubCheckpoint;
 import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubReader;
 import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.PubsubClient;
 import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
 import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
 import org.apache.beam.sdk.util.PubsubTestClient;
 import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory;
 import org.joda.time.Instant;
@@ -316,4 +324,75 @@ public class PubsubUnboundedSourceTest {
     assertTrue(dataToMessageNum.isEmpty());
     reader.close();
   }
+
+  @Test
+  public void noSubscriptionSplitIntoBundlesGeneratesSubscription() throws Exception {
+    TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic");
+    factory = PubsubTestClient.createFactoryForCreateSubscription();
+    PubsubUnboundedSource<String> source =
+        new PubsubUnboundedSource<>(
+            factory,
+            PubsubClient.projectPathFromId("my_project"),
+            topicPath,
+            null,
+            StringUtf8Coder.of(),
+            null,
+            null);
+    assertThat(source.getSubscription(), nullValue());
+
+    TestPipeline.create().apply(source);
+    assertThat(source.getSubscription(), nullValue());
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    List<PubsubSource<String>> splits =
+        (new PubsubSource<>(source)).generateInitialSplits(3, options);
+    // We have at least one returned split
+    assertThat(splits, hasSize(greaterThan(0)));
+    for (PubsubSource<String> split : splits) {
+      // Each split is equal
+      assertThat(split, equalTo(splits.get(0)));
+    }
+
+    assertThat(splits.get(0).subscriptionPath, not(nullValue()));
+  }
+
+  @Test
+  public void noSubscriptionNoSplitGeneratesSubscription() throws Exception {
+    TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic");
+    factory = PubsubTestClient.createFactoryForCreateSubscription();
+    PubsubUnboundedSource<String> source =
+        new PubsubUnboundedSource<>(
+            factory,
+            PubsubClient.projectPathFromId("my_project"),
+            topicPath,
+            null,
+            StringUtf8Coder.of(),
+            null,
+            null);
+    assertThat(source.getSubscription(), nullValue());
+
+    TestPipeline.create().apply(source);
+    assertThat(source.getSubscription(), nullValue());
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    PubsubSource<String> actualSource = new PubsubSource<>(source);
+    PubsubReader<String> reader = actualSource.createReader(options, null);
+    SubscriptionPath createdSubscription = reader.subscription;
+    assertThat(createdSubscription, not(nullValue()));
+
+    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    assertThat(checkpoint.subscriptionPath, equalTo(createdSubscription.getPath()));
+
+    checkpoint.finalizeCheckpoint();
+    PubsubCheckpoint<String> deserCheckpoint =
+        CoderUtils.clone(actualSource.getCheckpointMarkCoder(), checkpoint);
+    assertThat(checkpoint.subscriptionPath, not(nullValue()));
+    assertThat(checkpoint.subscriptionPath, equalTo(deserCheckpoint.subscriptionPath));
+
+    PubsubReader<String> readerFromOriginal = actualSource.createReader(options, checkpoint);
+    PubsubReader<String> readerFromDeser = actualSource.createReader(options, deserCheckpoint);
+
+    assertThat(readerFromOriginal.subscription, equalTo(createdSubscription));
+    assertThat(readerFromDeser.subscription, equalTo(createdSubscription));
+  }
 }