You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/10/28 18:32:14 UTC
[1/2] incubator-beam git commit: Closes #1190
Repository: incubator-beam
Updated Branches:
refs/heads/master 9c3e3e7a3 -> e82c5d224
Closes #1190
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e82c5d22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e82c5d22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e82c5d22
Branch: refs/heads/master
Commit: e82c5d2246372041ee7cc362363b2ac959a6bd4a
Parents: 9c3e3e7 cd454aa
Author: Dan Halperin <dh...@google.com>
Authored: Fri Oct 28 11:32:06 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Oct 28 11:32:06 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/PubsubUnboundedSource.java | 123 +++++++++++++------
.../apache/beam/sdk/util/PubsubTestClient.java | 32 +++++
.../beam/sdk/io/PubsubUnboundedSourceTest.java | 79 ++++++++++++
3 files changed, 199 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: Generate PubSub Subscriptions during
Pipeline Execution
Posted by dh...@apache.org.
Generate PubSub Subscriptions during Pipeline Execution
Instead of generating a random subscription at apply-time, create one
as required.
If splitIntoBundles is exercised, generate a subscription if there is
none specified, and create a new PubSub source that uses that
subscription. This is otherwise identical to the current behavior.
If splitIntoBundles is not exercised, and createReader is called with no
checkpoint, create a random subscription. Store a reference to this
subscription is the PubSub checkpoint, including when materialized.
This changes the wire format of PubSub checkpoints, and
as such this change is not update-compatible.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cd454aa5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cd454aa5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cd454aa5
Branch: refs/heads/master
Commit: cd454aa502e0a4e122e38c0c50403483832ada29
Parents: 9c3e3e7
Author: Thomas Groh <tg...@google.com>
Authored: Tue Oct 25 14:51:08 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Oct 28 11:32:06 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/PubsubUnboundedSource.java | 123 +++++++++++++------
.../apache/beam/sdk/util/PubsubTestClient.java | 32 +++++
.../beam/sdk/io/PubsubUnboundedSourceTest.java | 79 ++++++++++++
3 files changed, 199 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd454aa5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index 7617689..bfacb71 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -48,6 +48,7 @@ import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PubsubOptions;
@@ -228,6 +229,13 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
@VisibleForTesting
static class PubsubCheckpoint<T> implements UnboundedSource.CheckpointMark {
/**
+ * The {@link SubscriptionPath} to the subscription the reader is reading from. May be
+ * {@code null} if the {@link PubsubUnboundedSource} contains the subscription.
+ */
+ @VisibleForTesting
+ @Nullable String subscriptionPath;
+
+ /**
* If the checkpoint is for persisting: the reader who's snapshotted state we are persisting.
* If the checkpoint is for restoring: {@literal null}.
* Not persisted in durable checkpoint.
@@ -255,13 +263,23 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
final List<String> notYetReadIds;
public PubsubCheckpoint(
- @Nullable PubsubReader<T> reader, @Nullable List<String> safeToAckIds,
+ @Nullable String subscriptionPath,
+ @Nullable PubsubReader<T> reader,
+ @Nullable List<String> safeToAckIds,
List<String> notYetReadIds) {
+ this.subscriptionPath = subscriptionPath;
this.reader = reader;
this.safeToAckIds = safeToAckIds;
this.notYetReadIds = notYetReadIds;
}
+ @Nullable
+ private SubscriptionPath getSubscription() {
+ return subscriptionPath == null
+ ? null
+ : PubsubClient.subscriptionPathFromPath(subscriptionPath);
+ }
+
/**
* BLOCKING
* All messages which have been passed downstream have now been durably committed.
@@ -335,22 +353,27 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
}
}
- /**
- * The coder for our checkpoints.
- */
+ /** The coder for our checkpoints. */
private static class PubsubCheckpointCoder<T> extends AtomicCoder<PubsubCheckpoint<T>> {
+ private static final Coder<String> SUBSCRIPTION_PATH_CODER =
+ NullableCoder.of(StringUtf8Coder.of());
private static final Coder<List<String>> LIST_CODER = ListCoder.of(StringUtf8Coder.of());
@Override
public void encode(PubsubCheckpoint<T> value, OutputStream outStream, Context context)
throws IOException {
+ SUBSCRIPTION_PATH_CODER.encode(
+ value.subscriptionPath,
+ outStream,
+ context.nested());
LIST_CODER.encode(value.notYetReadIds, outStream, context);
}
@Override
public PubsubCheckpoint<T> decode(InputStream inStream, Context context) throws IOException {
+ String path = SUBSCRIPTION_PATH_CODER.decode(inStream, context.nested());
List<String> notYetReadIds = LIST_CODER.decode(inStream, context);
- return new PubsubCheckpoint<>(null, null, notYetReadIds);
+ return new PubsubCheckpoint<>(path, null, null, notYetReadIds);
}
}
@@ -368,6 +391,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
* For access to topic and checkpointCoder.
*/
private final PubsubSource<T> outer;
+ @VisibleForTesting
+ final SubscriptionPath subscription;
/**
* Client on which to talk to Pubsub. Null if closed.
@@ -555,9 +580,10 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
/**
* Construct a reader.
*/
- public PubsubReader(PubsubOptions options, PubsubSource<T> outer)
+ public PubsubReader(PubsubOptions options, PubsubSource<T> outer, SubscriptionPath subscription)
throws IOException, GeneralSecurityException {
this.outer = outer;
+ this.subscription = subscription;
pubsubClient =
outer.outer.pubsubFactory.newClient(outer.outer.timestampLabel, outer.outer.idLabel,
options);
@@ -605,7 +631,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
* CAUTION: Retains {@code ackIds}.
*/
void ackBatch(List<String> ackIds) throws IOException {
- pubsubClient.acknowledge(outer.outer.subscription, ackIds);
+ pubsubClient.acknowledge(subscription, ackIds);
ackedIds.add(ackIds);
}
@@ -615,7 +641,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
* with the given {@code ockIds}. Does not retain {@code ackIds}.
*/
public void nackBatch(long nowMsSinceEpoch, List<String> ackIds) throws IOException {
- pubsubClient.modifyAckDeadline(outer.outer.subscription, ackIds, 0);
+ pubsubClient.modifyAckDeadline(subscription, ackIds, 0);
numNacked.add(nowMsSinceEpoch, ackIds.size());
}
@@ -626,7 +652,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
*/
private void extendBatch(long nowMsSinceEpoch, List<String> ackIds) throws IOException {
int extensionSec = (ackTimeoutMs * ACK_EXTENSION_PCT) / (100 * 1000);
- pubsubClient.modifyAckDeadline(outer.outer.subscription, ackIds, extensionSec);
+ pubsubClient.modifyAckDeadline(subscription, ackIds, extensionSec);
numExtendedDeadlines.add(nowMsSinceEpoch, ackIds.size());
}
@@ -762,7 +788,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
// BLOCKs until received.
Collection<PubsubClient.IncomingMessage> receivedMessages =
pubsubClient.pull(requestTimeMsSinceEpoch,
- outer.outer.subscription,
+ subscription,
PULL_BATCH_SIZE, true);
if (receivedMessages.isEmpty()) {
// Nothing available yet. Try again later.
@@ -842,7 +868,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
+ "{} recent watermark skew, "
+ "{} recent late messages, "
+ "{} last reported watermark",
- outer.outer.subscription,
+ subscription,
numReceived,
notYetRead.size(),
notYetReadBytes,
@@ -868,7 +894,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
@Override
public boolean start() throws IOException {
// Determine the ack timeout.
- ackTimeoutMs = pubsubClient.ackDeadlineSeconds(outer.outer.subscription) * 1000;
+ ackTimeoutMs = pubsubClient.ackDeadlineSeconds(subscription) * 1000;
return advance();
}
@@ -1018,7 +1044,12 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
for (PubsubClient.IncomingMessage incomingMessage : notYetRead) {
snapshotNotYetReadIds.add(incomingMessage.ackId);
}
- return new PubsubCheckpoint<>(this, snapshotSafeToAckIds, snapshotNotYetReadIds);
+ if (outer.subscriptionPath == null) {
+ // need to include the subscription in case we resume, as it's not stored in the source.
+ return new PubsubCheckpoint<>(
+ subscription.getPath(), this, snapshotSafeToAckIds, snapshotNotYetReadIds);
+ }
+ return new PubsubCheckpoint<>(null, this, snapshotSafeToAckIds, snapshotNotYetReadIds);
}
@Override
@@ -1034,19 +1065,31 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
@VisibleForTesting
static class PubsubSource<T> extends UnboundedSource<T, PubsubCheckpoint<T>> {
public final PubsubUnboundedSource<T> outer;
+ // The subscription to read from.
+ @VisibleForTesting
+ final SubscriptionPath subscriptionPath;
public PubsubSource(PubsubUnboundedSource<T> outer) {
+ this(outer, outer.getSubscription());
+ }
+
+ private PubsubSource(PubsubUnboundedSource<T> outer, SubscriptionPath subscriptionPath) {
this.outer = outer;
+ this.subscriptionPath = subscriptionPath;
}
@Override
public List<PubsubSource<T>> generateInitialSplits(
int desiredNumSplits, PipelineOptions options) throws Exception {
List<PubsubSource<T>> result = new ArrayList<>(desiredNumSplits);
+ PubsubSource<T> splitSource = this;
+ if (subscriptionPath == null) {
+ splitSource = new PubsubSource<>(outer, outer.createRandomSubscription(options));
+ }
for (int i = 0; i < desiredNumSplits * SCALE_OUT; i++) {
// Since the source is immutable and Pubsub automatically shards we simply
// replicate ourselves the requested number of times
- result.add(this);
+ result.add(splitSource);
}
return result;
}
@@ -1056,10 +1099,20 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
PipelineOptions options,
@Nullable PubsubCheckpoint<T> checkpoint) {
PubsubReader<T> reader;
+ SubscriptionPath subscription = subscriptionPath;
+ if (subscription == null) {
+ if (checkpoint == null) {
+ // This reader has never been started and there was no call to #splitIntoBundles; create
+ // a single random subscription, which will be kept in the checkpoint.
+ subscription = outer.createRandomSubscription(options);
+ } else {
+ subscription = checkpoint.getSubscription();
+ }
+ }
try {
- reader = new PubsubReader<>(options.as(PubsubOptions.class), this);
+ reader = new PubsubReader<>(options.as(PubsubOptions.class), this, subscription);
} catch (GeneralSecurityException | IOException e) {
- throw new RuntimeException("Unable to subscribe to " + outer.subscription + ": ", e);
+ throw new RuntimeException("Unable to subscribe to " + subscriptionPath + ": ", e);
}
if (checkpoint != null) {
// NACK all messages we may have lost.
@@ -1068,7 +1121,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
checkpoint.nackAll(reader);
} catch (IOException e) {
LOG.error("Pubsub {} cannot have {} lost messages NACKed, ignoring: {}",
- outer.subscription, checkpoint.notYetReadIds.size(), e);
+ subscriptionPath, checkpoint.notYetReadIds.size(), e);
}
}
return reader;
@@ -1272,27 +1325,27 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
@Override
public PCollection<T> apply(PBegin input) {
- if (subscription == null) {
- try {
- try (PubsubClient pubsubClient =
- pubsubFactory.newClient(timestampLabel, idLabel,
- input.getPipeline()
- .getOptions()
- .as(PubsubOptions.class))) {
- subscription =
- pubsubClient.createRandomSubscription(project, topic, DEAULT_ACK_TIMEOUT_SEC);
- LOG.warn("Created subscription {} to topic {}."
- + " Note this subscription WILL NOT be deleted when the pipeline terminates",
- subscription, topic);
- }
- } catch (Exception e) {
- throw new RuntimeException("Failed to create subscription: ", e);
- }
- }
-
return input.getPipeline().begin()
.apply(Read.from(new PubsubSource<T>(this)))
.apply("PubsubUnboundedSource.Stats",
ParDo.of(new StatsFn<T>(pubsubFactory, subscription, timestampLabel, idLabel)));
}
+
+ private SubscriptionPath createRandomSubscription(PipelineOptions options) {
+ try {
+ try (PubsubClient pubsubClient =
+ pubsubFactory.newClient(timestampLabel, idLabel, options.as(PubsubOptions.class))) {
+ SubscriptionPath subscriptionPath =
+ pubsubClient.createRandomSubscription(project, topic, DEAULT_ACK_TIMEOUT_SEC);
+ LOG.warn(
+ "Created subscription {} to topic {}."
+ + " Note this subscription WILL NOT be deleted when the pipeline terminates",
+ subscriptionPath,
+ topic);
+ return subscriptionPath;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create subscription: ", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd454aa5/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
index 3fab151..45477fc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
@@ -213,6 +213,38 @@ public class PubsubTestClient extends PubsubClient {
};
}
+ public static PubsubTestClientFactory createFactoryForCreateSubscription() {
+ return new PubsubTestClientFactory() {
+ int numCalls = 0;
+
+ @Override
+ public void close() throws IOException {
+ checkState(
+ numCalls == 1, "Expected exactly one subscription to be created, got %s", numCalls);
+ }
+
+ @Override
+ public PubsubClient newClient(
+ @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+ throws IOException {
+ return new PubsubTestClient() {
+ @Override
+ public void createSubscription(
+ TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds)
+ throws IOException {
+ checkState(numCalls == 0, "Expected at most one subscription to be created");
+ numCalls++;
+ }
+ };
+ }
+
+ @Override
+ public String getKind() {
+ return "CreateSubscriptionTest";
+ }
+ };
+ }
+
/**
* Return true if in pull mode.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd454aa5/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
index c46eca5..bbc6c12 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
@@ -19,7 +19,12 @@
package org.apache.beam.sdk.io;
import static junit.framework.TestCase.assertFalse;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -38,12 +43,15 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubCheckpoint;
import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubReader;
import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.PubsubClient;
import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
import org.apache.beam.sdk.util.PubsubTestClient;
import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory;
import org.joda.time.Instant;
@@ -316,4 +324,75 @@ public class PubsubUnboundedSourceTest {
assertTrue(dataToMessageNum.isEmpty());
reader.close();
}
+
+ @Test
+ public void noSubscriptionSplitIntoBundlesGeneratesSubscription() throws Exception {
+ TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic");
+ factory = PubsubTestClient.createFactoryForCreateSubscription();
+ PubsubUnboundedSource<String> source =
+ new PubsubUnboundedSource<>(
+ factory,
+ PubsubClient.projectPathFromId("my_project"),
+ topicPath,
+ null,
+ StringUtf8Coder.of(),
+ null,
+ null);
+ assertThat(source.getSubscription(), nullValue());
+
+ TestPipeline.create().apply(source);
+ assertThat(source.getSubscription(), nullValue());
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ List<PubsubSource<String>> splits =
+ (new PubsubSource<>(source)).generateInitialSplits(3, options);
+ // We have at least one returned split
+ assertThat(splits, hasSize(greaterThan(0)));
+ for (PubsubSource<String> split : splits) {
+ // Each split is equal
+ assertThat(split, equalTo(splits.get(0)));
+ }
+
+ assertThat(splits.get(0).subscriptionPath, not(nullValue()));
+ }
+
+ @Test
+ public void noSubscriptionNoSplitGeneratesSubscription() throws Exception {
+ TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic");
+ factory = PubsubTestClient.createFactoryForCreateSubscription();
+ PubsubUnboundedSource<String> source =
+ new PubsubUnboundedSource<>(
+ factory,
+ PubsubClient.projectPathFromId("my_project"),
+ topicPath,
+ null,
+ StringUtf8Coder.of(),
+ null,
+ null);
+ assertThat(source.getSubscription(), nullValue());
+
+ TestPipeline.create().apply(source);
+ assertThat(source.getSubscription(), nullValue());
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ PubsubSource<String> actualSource = new PubsubSource<>(source);
+ PubsubReader<String> reader = actualSource.createReader(options, null);
+ SubscriptionPath createdSubscription = reader.subscription;
+ assertThat(createdSubscription, not(nullValue()));
+
+ PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ assertThat(checkpoint.subscriptionPath, equalTo(createdSubscription.getPath()));
+
+ checkpoint.finalizeCheckpoint();
+ PubsubCheckpoint<String> deserCheckpoint =
+ CoderUtils.clone(actualSource.getCheckpointMarkCoder(), checkpoint);
+ assertThat(checkpoint.subscriptionPath, not(nullValue()));
+ assertThat(checkpoint.subscriptionPath, equalTo(deserCheckpoint.subscriptionPath));
+
+ PubsubReader<String> readerFromOriginal = actualSource.createReader(options, checkpoint);
+ PubsubReader<String> readerFromDeser = actualSource.createReader(options, deserCheckpoint);
+
+ assertThat(readerFromOriginal.subscription, equalTo(createdSubscription));
+ assertThat(readerFromDeser.subscription, equalTo(createdSubscription));
+ }
}