You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/17 18:08:17 UTC
[1/3] incubator-beam git commit: Closes #332
Repository: incubator-beam
Updated Branches:
refs/heads/master cc64d654c -> 3edae9b8b
Closes #332
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3edae9b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3edae9b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3edae9b8
Branch: refs/heads/master
Commit: 3edae9b8b4d7afefb5c803c19bb0a1c21ebba89d
Parents: cc64d65 f55fb88
Author: Dan Halperin <dh...@google.com>
Authored: Tue May 17 10:49:22 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 17 11:08:13 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/io/PubsubUnboundedSink.java | 1 +
.../beam/sdk/io/PubsubUnboundedSource.java | 1206 ++++++++++++++++++
.../apache/beam/sdk/util/BucketingFunction.java | 153 +++
.../apache/beam/sdk/util/MovingFunction.java | 153 +++
.../beam/sdk/util/PubsubApiaryClient.java | 7 +
.../org/apache/beam/sdk/util/PubsubClient.java | 9 +
.../apache/beam/sdk/util/PubsubGrpcClient.java | 9 +
.../apache/beam/sdk/util/PubsubTestClient.java | 398 +++---
.../beam/sdk/io/PubsubUnboundedSinkTest.java | 78 +-
.../beam/sdk/io/PubsubUnboundedSourceTest.java | 324 +++++
.../beam/sdk/util/BucketingFunctionTest.java | 104 ++
.../beam/sdk/util/MovingFunctionTest.java | 115 ++
.../beam/sdk/util/PubsubTestClientTest.java | 81 +-
13 files changed, 2414 insertions(+), 224 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: Pub/sub unbounded source
Posted by dh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
index 2cb9a65..b4ef785 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
@@ -26,9 +26,9 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.PubsubClient;
import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
-import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory;
import org.apache.beam.sdk.util.PubsubClient.TopicPath;
import org.apache.beam.sdk.util.PubsubTestClient;
+import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
@@ -39,6 +39,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -70,25 +71,26 @@ public class PubsubUnboundedSinkTest {
}
@Test
- public void sendOneMessage() {
+ public void sendOneMessage() throws IOException {
Set<OutgoingMessage> outgoing =
Sets.newHashSet(new OutgoingMessage(DATA.getBytes(), TIMESTAMP));
- PubsubClientFactory factory =
- PubsubTestClient.createFactoryForPublish(TOPIC, outgoing);
- PubsubUnboundedSink<String> sink =
- new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
- 10);
- TestPipeline p = TestPipeline.create();
- p.apply(Create.of(ImmutableList.of(DATA)))
- .apply(ParDo.of(new Stamp()))
- .apply(sink);
- // Run the pipeline. The PubsubTestClient will assert fail if the actual published
+ try (PubsubTestClientFactory factory =
+ PubsubTestClient.createFactoryForPublish(TOPIC, outgoing)) {
+ PubsubUnboundedSink<String> sink =
+ new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
+ 10);
+ TestPipeline p = TestPipeline.create();
+ p.apply(Create.of(ImmutableList.of(DATA)))
+ .apply(ParDo.of(new Stamp()))
+ .apply(sink);
+ p.run();
+ }
+ // The PubsubTestClientFactory will assert fail on close if the actual published
// message does not match the expected publish message.
- p.run();
}
@Test
- public void sendMoreThanOneBatchByNumMessages() {
+ public void sendMoreThanOneBatchByNumMessages() throws IOException {
Set<OutgoingMessage> outgoing = new HashSet<>();
List<String> data = new ArrayList<>();
int batchSize = 2;
@@ -98,22 +100,23 @@ public class PubsubUnboundedSinkTest {
outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP));
data.add(str);
}
- PubsubClientFactory factory =
- PubsubTestClient.createFactoryForPublish(TOPIC, outgoing);
- PubsubUnboundedSink<String> sink =
- new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
- 10, batchSize, batchBytes, Duration.standardSeconds(2));
- TestPipeline p = TestPipeline.create();
- p.apply(Create.of(data))
- .apply(ParDo.of(new Stamp()))
- .apply(sink);
- // Run the pipeline. The PubsubTestClient will assert fail if the actual published
+ try (PubsubTestClientFactory factory =
+ PubsubTestClient.createFactoryForPublish(TOPIC, outgoing)) {
+ PubsubUnboundedSink<String> sink =
+ new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
+ 10, batchSize, batchBytes, Duration.standardSeconds(2));
+ TestPipeline p = TestPipeline.create();
+ p.apply(Create.of(data))
+ .apply(ParDo.of(new Stamp()))
+ .apply(sink);
+ p.run();
+ }
+ // The PubsubTestClientFactory will assert fail on close if the actual published
// message does not match the expected publish message.
- p.run();
}
@Test
- public void sendMoreThanOneBatchByByteSize() {
+ public void sendMoreThanOneBatchByByteSize() throws IOException {
Set<OutgoingMessage> outgoing = new HashSet<>();
List<String> data = new ArrayList<>();
int batchSize = 100;
@@ -129,17 +132,18 @@ public class PubsubUnboundedSinkTest {
data.add(str);
n += str.length();
}
- PubsubClientFactory factory =
- PubsubTestClient.createFactoryForPublish(TOPIC, outgoing);
- PubsubUnboundedSink<String> sink =
- new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
- 10, batchSize, batchBytes, Duration.standardSeconds(2));
- TestPipeline p = TestPipeline.create();
- p.apply(Create.of(data))
- .apply(ParDo.of(new Stamp()))
- .apply(sink);
- // Run the pipeline. The PubsubTestClient will assert fail if the actual published
+ try (PubsubTestClientFactory factory =
+ PubsubTestClient.createFactoryForPublish(TOPIC, outgoing)) {
+ PubsubUnboundedSink<String> sink =
+ new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
+ 10, batchSize, batchBytes, Duration.standardSeconds(2));
+ TestPipeline p = TestPipeline.create();
+ p.apply(Create.of(data))
+ .apply(ParDo.of(new Stamp()))
+ .apply(sink);
+ p.run();
+ }
+ // The PubsubTestClientFactory will assert fail on close if the actual published
// message does not match the expected publish message.
- p.run();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
new file mode 100644
index 0000000..b265d18
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import static junit.framework.TestCase.assertFalse;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubCheckpoint;
+import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubReader;
+import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubSource;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.PubsubClient;
+import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.util.PubsubTestClient;
+import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory;
+
+import com.google.api.client.util.Clock;
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Test PubsubUnboundedSource.
+ */
+@RunWith(JUnit4.class)
+public class PubsubUnboundedSourceTest {
+ private static final SubscriptionPath SUBSCRIPTION =
+ PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
+ private static final String DATA = "testData";
+ private static final long TIMESTAMP = 1234L;
+ private static final long REQ_TIME = 6373L;
+ private static final String TIMESTAMP_LABEL = "timestamp";
+ private static final String ID_LABEL = "id";
+ private static final String ACK_ID = "testAckId";
+ private static final String RECORD_ID = "testRecordId";
+ private static final int ACK_TIMEOUT_S = 60;
+
+ private AtomicLong now;
+ private Clock clock;
+ private PubsubTestClientFactory factory;
+ private PubsubSource<String> primSource;
+
+ private void setupOneMessage(Iterable<IncomingMessage> incoming) {
+ now = new AtomicLong(REQ_TIME);
+ clock = new Clock() {
+ @Override
+ public long currentTimeMillis() {
+ return now.get();
+ }
+ };
+ factory = PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, incoming);
+ PubsubUnboundedSource<String> source =
+ new PubsubUnboundedSource<>(clock, factory, SUBSCRIPTION, StringUtf8Coder.of(),
+ TIMESTAMP_LABEL, ID_LABEL);
+ primSource = new PubsubSource<>(source);
+ }
+
+ private void setupOneMessage() {
+ setupOneMessage(ImmutableList.of(
+ new IncomingMessage(DATA.getBytes(), TIMESTAMP, 0, ACK_ID, RECORD_ID.getBytes())));
+ }
+
+ @After
+ public void after() throws IOException {
+ factory.close();
+ now = null;
+ clock = null;
+ primSource = null;
+ factory = null;
+ }
+
+ @Test
+ public void checkpointCoderIsSane() throws Exception {
+ setupOneMessage(ImmutableList.<IncomingMessage>of());
+ CoderProperties.coderSerializable(primSource.getCheckpointMarkCoder());
+ // Since we only serialize/deserialize the 'notYetReadIds', and we don't want to make
+ // equals on checkpoints ignore those fields, we'll test serialization and deserialization
+ // of checkpoints in multipleReaders below.
+ }
+
+ @Test
+ public void readOneMessage() throws IOException {
+ setupOneMessage();
+ TestPipeline p = TestPipeline.create();
+ PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+ PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
+ // Read one message.
+ assertTrue(reader.start());
+ assertEquals(DATA, reader.getCurrent());
+ assertFalse(reader.advance());
+ // ACK the message.
+ PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ checkpoint.finalizeCheckpoint();
+ reader.close();
+ }
+
+ @Test
+ public void timeoutAckAndRereadOneMessage() throws IOException {
+ setupOneMessage();
+ TestPipeline p = TestPipeline.create();
+ PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+ PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
+ assertTrue(reader.start());
+ assertEquals(DATA, reader.getCurrent());
+ // Let the ACK deadline for the above expire.
+ now.addAndGet(65 * 1000);
+ pubsubClient.advance();
+ // We'll now receive the same message again.
+ assertTrue(reader.advance());
+ assertEquals(DATA, reader.getCurrent());
+ assertFalse(reader.advance());
+ // Now ACK the message.
+ PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ checkpoint.finalizeCheckpoint();
+ reader.close();
+ }
+
+ @Test
+ public void extendAck() throws IOException {
+ setupOneMessage();
+ TestPipeline p = TestPipeline.create();
+ PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+ PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
+ // Pull the first message but don't take a checkpoint for it.
+ assertTrue(reader.start());
+ assertEquals(DATA, reader.getCurrent());
+ // Extend the ack
+ now.addAndGet(55 * 1000);
+ pubsubClient.advance();
+ assertFalse(reader.advance());
+ // Extend the ack again
+ now.addAndGet(25 * 1000);
+ pubsubClient.advance();
+ assertFalse(reader.advance());
+ // Now ACK the message.
+ PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ checkpoint.finalizeCheckpoint();
+ reader.close();
+ }
+
+ @Test
+ public void timeoutAckExtensions() throws IOException {
+ setupOneMessage();
+ TestPipeline p = TestPipeline.create();
+ PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+ PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
+ // Pull the first message but don't take a checkpoint for it.
+ assertTrue(reader.start());
+ assertEquals(DATA, reader.getCurrent());
+ // Extend the ack.
+ now.addAndGet(55 * 1000);
+ pubsubClient.advance();
+ assertFalse(reader.advance());
+ // Let the ack expire.
+ for (int i = 0; i < 3; i++) {
+ now.addAndGet(25 * 1000);
+ pubsubClient.advance();
+ assertFalse(reader.advance());
+ }
+ // Wait for resend.
+ now.addAndGet(25 * 1000);
+ pubsubClient.advance();
+ // Reread the same message.
+ assertTrue(reader.advance());
+ assertEquals(DATA, reader.getCurrent());
+ // Now ACK the message.
+ PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ checkpoint.finalizeCheckpoint();
+ reader.close();
+ }
+
+ @Test
+ public void multipleReaders() throws IOException {
+ List<IncomingMessage> incoming = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ String data = String.format("data_%d", i);
+ String ackid = String.format("ackid_%d", i);
+ incoming.add(new IncomingMessage(data.getBytes(), TIMESTAMP, 0, ackid, RECORD_ID.getBytes()));
+ }
+ setupOneMessage(incoming);
+ TestPipeline p = TestPipeline.create();
+ PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+ PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
+ // Consume two messages, only read one.
+ assertTrue(reader.start());
+ assertEquals("data_0", reader.getCurrent());
+
+ // Grab checkpoint.
+ PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ checkpoint.finalizeCheckpoint();
+ assertEquals(1, checkpoint.notYetReadIds.size());
+ assertEquals("ackid_1", checkpoint.notYetReadIds.get(0));
+
+ // Read second message.
+ assertTrue(reader.advance());
+ assertEquals("data_1", reader.getCurrent());
+
+ // Restore from checkpoint.
+ byte[] checkpointBytes =
+ CoderUtils.encodeToByteArray(primSource.getCheckpointMarkCoder(), checkpoint);
+ checkpoint = CoderUtils.decodeFromByteArray(primSource.getCheckpointMarkCoder(),
+ checkpointBytes);
+ assertEquals(1, checkpoint.notYetReadIds.size());
+ assertEquals("ackid_1", checkpoint.notYetReadIds.get(0));
+
+ // Re-read second message.
+ reader = primSource.createReader(p.getOptions(), checkpoint);
+ assertTrue(reader.start());
+ assertEquals("data_1", reader.getCurrent());
+
+ // We are done.
+ assertFalse(reader.advance());
+
+ // ACK final message.
+ checkpoint = reader.getCheckpointMark();
+ checkpoint.finalizeCheckpoint();
+ reader.close();
+ }
+
+ private long messageNumToTimestamp(int messageNum) {
+ return TIMESTAMP + messageNum * 100;
+ }
+
+ @Test
+ public void readManyMessages() throws IOException {
+ Map<String, Integer> dataToMessageNum = new HashMap<>();
+
+ final int m = 97;
+ final int n = 10000;
+ List<IncomingMessage> incoming = new ArrayList<>();
+ for (int i = 0; i < n; i++) {
+ // Make the messages timestamps slightly out of order.
+ int messageNum = ((i / m) * m) + (m - 1) - (i % m);
+ String data = String.format("data_%d", messageNum);
+ dataToMessageNum.put(data, messageNum);
+ String recid = String.format("recordid_%d", messageNum);
+ String ackId = String.format("ackid_%d", messageNum);
+ incoming.add(new IncomingMessage(data.getBytes(), messageNumToTimestamp(messageNum), 0,
+ ackId, recid.getBytes()));
+ }
+ setupOneMessage(incoming);
+
+ TestPipeline p = TestPipeline.create();
+ PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+ PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
+
+ for (int i = 0; i < n; i++) {
+ if (i == 0) {
+ assertTrue(reader.start());
+ } else {
+ assertTrue(reader.advance());
+ }
+ // We'll checkpoint and ack within the 2min limit.
+ now.addAndGet(30);
+ pubsubClient.advance();
+ String data = reader.getCurrent();
+ Integer messageNum = dataToMessageNum.remove(data);
+ // No duplicate messages.
+ assertNotNull(messageNum);
+ // Preserve timestamp.
+ assertEquals(new Instant(messageNumToTimestamp(messageNum)), reader.getCurrentTimestamp());
+ // Preserve record id.
+ String recid = String.format("recordid_%d", messageNum);
+ assertArrayEquals(recid.getBytes(), reader.getCurrentRecordId());
+
+ if (i % 1000 == 999) {
+ // Estimated watermark can never get ahead of actual outstanding messages.
+ long watermark = reader.getWatermark().getMillis();
+ long minOutstandingTimestamp = Long.MAX_VALUE;
+ for (Integer outstandingMessageNum : dataToMessageNum.values()) {
+ minOutstandingTimestamp =
+ Math.min(minOutstandingTimestamp, messageNumToTimestamp(outstandingMessageNum));
+ }
+ assertThat(watermark, lessThanOrEqualTo(minOutstandingTimestamp));
+ // Ack messages, but only every other finalization.
+ PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ if (i % 2000 == 1999) {
+ checkpoint.finalizeCheckpoint();
+ }
+ }
+ }
+ // We are done.
+ assertFalse(reader.advance());
+ // We saw each message exactly once.
+ assertTrue(dataToMessageNum.isEmpty());
+ reader.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java
new file mode 100644
index 0000000..c808b4d
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.Combine;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests {@link BucketingFunction}.
+ */
+@RunWith(JUnit4.class)
+public class BucketingFunctionTest {
+
+ private static final long BUCKET_WIDTH = 10;
+ private static final int SIGNIFICANT_BUCKETS = 2;
+ private static final int SIGNIFICANT_SAMPLES = 10;
+
+ private static final Combine.BinaryCombineLongFn SUM =
+ new Combine.BinaryCombineLongFn() {
+ @Override
+ public long apply(long left, long right) {
+ return left + right;
+ }
+
+ @Override
+ public long identity() {
+ return 0;
+ }
+ };
+
+ private BucketingFunction newFunc() {
+ return new
+ BucketingFunction(BUCKET_WIDTH, SIGNIFICANT_BUCKETS,
+ SIGNIFICANT_SAMPLES, SUM);
+ }
+
+ @Test
+ public void significantSamples() {
+ BucketingFunction f = newFunc();
+ assertFalse(f.isSignificant());
+ for (int i = 0; i < SIGNIFICANT_SAMPLES - 1; i++) {
+ f.add(0, 0);
+ assertFalse(f.isSignificant());
+ }
+ f.add(0, 0);
+ assertTrue(f.isSignificant());
+ }
+
+ @Test
+ public void significantBuckets() {
+ BucketingFunction f = newFunc();
+ assertFalse(f.isSignificant());
+ f.add(0, 0);
+ assertFalse(f.isSignificant());
+ f.add(BUCKET_WIDTH, 0);
+ assertTrue(f.isSignificant());
+ }
+
+ @Test
+ public void sum() {
+ BucketingFunction f = newFunc();
+ for (int i = 0; i < 100; i++) {
+ f.add(i, i);
+ assertEquals(((i + 1) * i) / 2, f.get());
+ }
+ }
+
+ @Test
+ public void movingSum() {
+ BucketingFunction f = newFunc();
+ int lost = 0;
+ for (int i = 0; i < 200; i++) {
+ f.add(i, 1);
+ if (i >= 100) {
+ f.remove(i - 100);
+ if (i % BUCKET_WIDTH == BUCKET_WIDTH - 1) {
+ lost += BUCKET_WIDTH;
+ }
+ }
+ assertEquals(i + 1 - lost, f.get());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java
new file mode 100644
index 0000000..2cbc20e
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.Combine;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests {@link MovingFunction}.
+ */
+@RunWith(JUnit4.class)
+public class MovingFunctionTest {
+
+ private static final long SAMPLE_PERIOD = 100;
+ private static final long SAMPLE_UPDATE = 10;
+ private static final int SIGNIFICANT_BUCKETS = 2;
+ private static final int SIGNIFICANT_SAMPLES = 10;
+
+ private static final Combine.BinaryCombineLongFn SUM =
+ new Combine.BinaryCombineLongFn() {
+ @Override
+ public long apply(long left, long right) {
+ return left + right;
+ }
+
+ @Override
+ public long identity() {
+ return 0;
+ }
+ };
+
+ private MovingFunction newFunc() {
+ return new
+ MovingFunction(SAMPLE_PERIOD, SAMPLE_UPDATE, SIGNIFICANT_BUCKETS,
+ SIGNIFICANT_SAMPLES, SUM);
+
+ }
+
+ @Test
+ public void significantSamples() {
+ MovingFunction f = newFunc();
+ assertFalse(f.isSignificant());
+ for (int i = 0; i < SIGNIFICANT_SAMPLES - 1; i++) {
+ f.add(0, 0);
+ assertFalse(f.isSignificant());
+ }
+ f.add(0, 0);
+ assertTrue(f.isSignificant());
+ }
+
+ @Test
+ public void significantBuckets() {
+ MovingFunction f = newFunc();
+ assertFalse(f.isSignificant());
+ f.add(0, 0);
+ assertFalse(f.isSignificant());
+ f.add(SAMPLE_UPDATE, 0);
+ assertTrue(f.isSignificant());
+ }
+
+ @Test
+ public void sum() {
+ MovingFunction f = newFunc();
+ for (int i = 0; i < SAMPLE_PERIOD; i++) {
+ f.add(i, i);
+ assertEquals(((i + 1) * i) / 2, f.get(i));
+ }
+ }
+
+ @Test
+ public void movingSum() {
+ MovingFunction f = newFunc();
+ int lost = 0;
+ for (int i = 0; i < SAMPLE_PERIOD * 2; i++) {
+ f.add(i , 1);
+ if (i >= SAMPLE_PERIOD) {
+ if (i % SAMPLE_UPDATE == 0) {
+ lost += SAMPLE_UPDATE;
+ }
+ }
+ assertEquals(i + 1 - lost, f.get(i));
+ }
+ }
+
+ @Test
+ public void jumpingSum() {
+ MovingFunction f = newFunc();
+ f.add(0, 1);
+ f.add(SAMPLE_PERIOD - 1, 1);
+ assertEquals(2, f.get(SAMPLE_PERIOD - 1));
+ assertEquals(1, f.get(SAMPLE_PERIOD + 3 * SAMPLE_UPDATE));
+ assertEquals(0, f.get(SAMPLE_PERIOD * 2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
index 7d8513b..fedc8bf 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
@@ -24,7 +24,9 @@ import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.util.PubsubClient.TopicPath;
+import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory;
+import com.google.api.client.util.Clock;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -33,6 +35,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Tests for PubsubTestClient.
@@ -50,48 +53,58 @@ public class PubsubTestClientTest {
@Test
public void pullOneMessage() throws IOException {
+ final AtomicLong now = new AtomicLong();
+ Clock clock = new Clock() {
+ @Override
+ public long currentTimeMillis() {
+ return now.get();
+ }
+ };
IncomingMessage expectedIncomingMessage =
new IncomingMessage(DATA.getBytes(), MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID.getBytes());
- try (PubsubTestClient client =
- new PubsubTestClient(null, SUBSCRIPTION, ACK_TIMEOUT_S, null,
- Lists.newArrayList(expectedIncomingMessage))) {
- long now = REQ_TIME;
- client.advanceTo(now);
- List<IncomingMessage> incomingMessages = client.pull(now, SUBSCRIPTION, 1, true);
- assertEquals(1, incomingMessages.size());
- assertEquals(expectedIncomingMessage, incomingMessages.get(0));
- // Timeout on ACK.
- now += (ACK_TIMEOUT_S + 10) * 1000;
- client.advanceTo(now);
- incomingMessages = client.pull(now, SUBSCRIPTION, 1, true);
- assertEquals(1, incomingMessages.size());
- assertEquals(expectedIncomingMessage.withRequestTime(now), incomingMessages.get(0));
- now += 10 * 1000;
- client.advanceTo(now);
- // Extend ack
- client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
- // Timeout on extended ACK
- now += 30 * 1000;
- client.advanceTo(now);
- incomingMessages = client.pull(now, SUBSCRIPTION, 1, true);
- assertEquals(1, incomingMessages.size());
- assertEquals(expectedIncomingMessage.withRequestTime(now), incomingMessages.get(0));
- // Extend ack
- client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
- // Ack
- now += 15 * 1000;
- client.advanceTo(now);
- client.acknowledge(SUBSCRIPTION, ImmutableList.of(ACK_ID));
+ try (PubsubTestClientFactory factory =
+ PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S,
+ Lists.newArrayList(expectedIncomingMessage))) {
+ try (PubsubTestClient client = (PubsubTestClient) factory.newClient(null, null, null)) {
+ now.set(REQ_TIME);
+ client.advance();
+ List<IncomingMessage> incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
+ assertEquals(1, incomingMessages.size());
+ assertEquals(expectedIncomingMessage, incomingMessages.get(0));
+ // Timeout on ACK.
+ now.addAndGet((ACK_TIMEOUT_S + 10) * 1000);
+ client.advance();
+ incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
+ assertEquals(1, incomingMessages.size());
+ assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0));
+ now.addAndGet(10 * 1000);
+ client.advance();
+ // Extend ack
+ client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
+ // Timeout on extended ACK
+ now.addAndGet(30 * 1000);
+ client.advance();
+ incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
+ assertEquals(1, incomingMessages.size());
+ assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0));
+ // Extend ack
+ client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
+ // Ack
+ now.addAndGet(15 * 1000);
+ client.advance();
+ client.acknowledge(SUBSCRIPTION, ImmutableList.of(ACK_ID));
+ }
}
}
@Test
public void publishOneMessage() throws IOException {
OutgoingMessage expectedOutgoingMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME);
- try (PubsubTestClient client =
- new PubsubTestClient(TOPIC, null, ACK_TIMEOUT_S,
- Sets.newHashSet(expectedOutgoingMessage), null)) {
- client.publish(TOPIC, ImmutableList.of(expectedOutgoingMessage));
+ try (PubsubTestClientFactory factory = PubsubTestClient.createFactoryForPublish(TOPIC, Sets
+ .newHashSet(expectedOutgoingMessage))) {
+ try (PubsubTestClient client = (PubsubTestClient) factory.newClient(null, null, null)) {
+ client.publish(TOPIC, ImmutableList.of(expectedOutgoingMessage));
+ }
}
}
}
[3/3] incubator-beam git commit: Pub/sub unbounded source
Posted by dh...@apache.org.
Pub/sub unbounded source
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f55fb887
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f55fb887
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f55fb887
Branch: refs/heads/master
Commit: f55fb887e2b6a67b14d38a56e19c64081c455933
Parents: cc64d65
Author: Mark Shields <ma...@google.com>
Authored: Mon Apr 4 18:10:02 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 17 11:08:13 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/io/PubsubUnboundedSink.java | 1 +
.../beam/sdk/io/PubsubUnboundedSource.java | 1206 ++++++++++++++++++
.../apache/beam/sdk/util/BucketingFunction.java | 153 +++
.../apache/beam/sdk/util/MovingFunction.java | 153 +++
.../beam/sdk/util/PubsubApiaryClient.java | 7 +
.../org/apache/beam/sdk/util/PubsubClient.java | 9 +
.../apache/beam/sdk/util/PubsubGrpcClient.java | 9 +
.../apache/beam/sdk/util/PubsubTestClient.java | 398 +++---
.../beam/sdk/io/PubsubUnboundedSinkTest.java | 78 +-
.../beam/sdk/io/PubsubUnboundedSourceTest.java | 324 +++++
.../beam/sdk/util/BucketingFunctionTest.java | 104 ++
.../beam/sdk/util/MovingFunctionTest.java | 115 ++
.../beam/sdk/util/PubsubTestClientTest.java | 81 +-
13 files changed, 2414 insertions(+), 224 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 6d08a70..7ca2b57 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -147,6 +147,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
elementCounter.addValue(1L);
byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element());
long timestampMsSinceEpoch = c.timestamp().getMillis();
+ // TODO: A random record id should be assigned here.
c.output(KV.of(ThreadLocalRandom.current().nextInt(numShards),
new OutgoingMessage(elementBytes, timestampMsSinceEpoch)));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
new file mode 100644
index 0000000..d635a8a
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -0,0 +1,1206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Sum.SumLongFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.BucketingFunction;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.MovingFunction;
+import org.apache.beam.sdk.util.PubsubClient;
+import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory;
+import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.api.client.util.Clock;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.GeneralSecurityException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nullable;
+
+/**
+ * A PTransform which streams messages from Pubsub.
+ * <ul>
+ * <li>The underlying implementation in an {@link UnboundedSource} which receives messages
+ * in batches and hands them out one at a time.
+ * <li>The watermark (either in Pubsub processing time or custom timestamp time) is estimated
+ * by keeping track of the minimum of the last minutes worth of messages. This assumes Pubsub
+ * delivers the oldest (in Pubsub processing time) available message at least once a minute,
+ * and that custom timestamps are 'mostly' monotonic with Pubsub processing time. Unfortunately
+ * both of those assumptions are fragile. Thus the estimated watermark may get ahead of
+ * the 'true' watermark and cause some messages to be late.
+ * <li>Checkpoints are used both to ACK received messages back to Pubsub (so that they may
+ * be retired on the Pubsub end), and to NACK already consumed messages should a checkpoint
+ * need to be restored (so that Pubsub will resend those messages promptly).
+ * <li>The backlog is determined by each reader using the messages which have been pulled from
+ * Pubsub but not yet consumed downstream. The backlog does not take account of any messages queued
+ * by Pubsub for the subscription. Unfortunately there is currently no API to determine
+ * the size of the Pubsub queue's backlog.
+ * <li>The subscription must already exist.
+ * <li>The subscription timeout is read whenever a reader is started. However it is not
+ * checked thereafter despite the timeout being user-changeable on-the-fly.
+ * <li>We log vital stats every 30 seconds.
+ * <li>Though some background threads may be used by the underlying transport all Pubsub calls
+ * are blocking. We rely on the underlying runner to allow multiple
+ * {@link UnboundedSource.UnboundedReader} instances to execute concurrently and thus hide latency.
+ * </ul>
+ */
+public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> {
+ private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSource.class);
+
+ /**
+ * Coder for checkpoints.
+ */
+ private static final PubsubCheckpointCoder<?> CHECKPOINT_CODER = new PubsubCheckpointCoder<>();
+
+ /**
+ * Maximum number of messages per pull.
+ */
+ private static final int PULL_BATCH_SIZE = 1000;
+
+ /**
+ * Maximum number of ACK ids per ACK or ACK extension call.
+ */
+ private static final int ACK_BATCH_SIZE = 2000;
+
+ /**
+ * Maximum number of messages in flight.
+ */
+ private static final int MAX_IN_FLIGHT = 20000;
+
+ /**
+ * Timeout for round trip from receiving a message to finally ACKing it back to Pubsub.
+ */
+ private static final Duration PROCESSING_TIMEOUT = Duration.standardSeconds(120);
+
+ /**
+ * Percentage of ack timeout by which to extend acks when they are near timeout.
+ */
+ private static final int ACK_EXTENSION_PCT = 50;
+
+ /**
+ * Percentage of ack timeout we should use as a safety margin. We'll try to extend acks
+ * by this margin before the ack actually expires.
+ */
+ private static final int ACK_SAFETY_PCT = 20;
+
+ /**
+ * For stats only: How close we can get to an ack deadline before we risk it being already
+ * considered passed by Pubsub.
+ */
+ private static final Duration ACK_TOO_LATE = Duration.standardSeconds(2);
+
+ /**
+ * Period of samples to determine watermark and other stats.
+ */
+ private static final Duration SAMPLE_PERIOD = Duration.standardMinutes(1);
+
+ /**
+ * Period of updates to determine watermark and other stats.
+ */
+ private static final Duration SAMPLE_UPDATE = Duration.standardSeconds(5);
+
+ /**
+ * Period for logging stats.
+ */
+ private static final Duration LOG_PERIOD = Duration.standardSeconds(30);
+
+ /**
+ * Minimum number of unread messages required before considering updating watermark.
+ */
+ private static final int MIN_WATERMARK_MESSAGES = 10;
+
+ /**
+ * Minimum number of SAMPLE_UPDATE periods over which unread messages shoud be spread
+ * before considering updating watermark.
+ */
+ private static final int MIN_WATERMARK_SPREAD = 2;
+
+ /**
+ * Additional sharding so that we can hide read message latency.
+ */
+ private static final int SCALE_OUT = 4;
+
+ // TODO: Would prefer to use MinLongFn but it is a BinaryCombineFn<Long> rather
+ // than a BinaryCombineLongFn. [BEAM-285]
+ private static final Combine.BinaryCombineLongFn MIN =
+ new Combine.BinaryCombineLongFn() {
+ @Override
+ public long apply(long left, long right) {
+ return Math.min(left, right);
+ }
+
+ @Override
+ public long identity() {
+ return Long.MAX_VALUE;
+ }
+ };
+
+ private static final Combine.BinaryCombineLongFn MAX =
+ new Combine.BinaryCombineLongFn() {
+ @Override
+ public long apply(long left, long right) {
+ return Math.max(left, right);
+ }
+
+ @Override
+ public long identity() {
+ return Long.MIN_VALUE;
+ }
+ };
+
+ private static final Combine.BinaryCombineLongFn SUM = new SumLongFn();
+
+ // ================================================================================
+ // Checkpoint
+ // ================================================================================
+
+ /**
+ * Which messages have been durably committed and thus can now be ACKed.
+ * Which messages have been read but not yet committed, in which case they should be NACKed if
+ * we need to restore.
+ */
+ @VisibleForTesting
+ static class PubsubCheckpoint<T> implements UnboundedSource.CheckpointMark {
+ /**
+ * If the checkpoint is for persisting: the reader who's snapshotted state we are persisting.
+ * If the checkpoint is for restoring: initially {@literal null}, then explicitly set.
+ * Not persisted in durable checkpoint.
+ * CAUTION: Between a checkpoint being taken and {@link #finalizeCheckpoint()} being called
+ * the 'true' active reader may have changed.
+ */
+ @Nullable
+ private PubsubReader<T> reader;
+
+ /**
+ * If the checkpoint is for persisting: The ACK ids of messages which have been passed
+ * downstream since the last checkpoint.
+ * If the checkpoint is for restoring: {@literal null}.
+ * Not persisted in durable checkpoint.
+ */
+ @Nullable
+ private final List<String> safeToAckIds;
+
+ /**
+ * If the checkpoint is for persisting: The ACK ids of messages which have been received
+ * from Pubsub but not yet passed downstream at the time of the snapshot.
+ * If the checkpoint is for restoring: Same, but recovered from durable storage.
+ */
+ @VisibleForTesting
+ final List<String> notYetReadIds;
+
+ public PubsubCheckpoint(
+ @Nullable PubsubReader<T> reader, @Nullable List<String> safeToAckIds,
+ List<String> notYetReadIds) {
+ this.reader = reader;
+ this.safeToAckIds = safeToAckIds;
+ this.notYetReadIds = notYetReadIds;
+ }
+
+ /**
+ * BLOCKING
+ * All messages which have been passed downstream have now been durably committed.
+ * We can ACK them upstream.
+ * CAUTION: This may never be called.
+ */
+ @Override
+ public void finalizeCheckpoint() throws IOException {
+ checkState(reader != null && safeToAckIds != null, "Cannot finalize a restored checkpoint");
+ // Even if the 'true' active reader has changed since the checkpoint was taken we are
+ // fine:
+ // - The underlying Pubsub topic will not have changed, so the following ACKs will still
+ // go to the right place.
+ // - We'll delete the ACK ids from the readers in-flight state, but that only effects
+ // flow control and stats, neither of which are relevant anymore.
+ try {
+ int n = safeToAckIds.size();
+ List<String> batchSafeToAckIds = new ArrayList<>(Math.min(n, ACK_BATCH_SIZE));
+ for (String ackId : safeToAckIds) {
+ batchSafeToAckIds.add(ackId);
+ if (batchSafeToAckIds.size() >= ACK_BATCH_SIZE) {
+ reader.ackBatch(batchSafeToAckIds);
+ n -= batchSafeToAckIds.size();
+ // CAUTION: Don't reuse the same list since ackBatch holds on to it.
+ batchSafeToAckIds = new ArrayList<>(Math.min(n, ACK_BATCH_SIZE));
+ }
+ }
+ if (!batchSafeToAckIds.isEmpty()) {
+ reader.ackBatch(batchSafeToAckIds);
+ }
+ } finally {
+ checkState(reader.numInFlightCheckpoints.decrementAndGet() >= 0,
+ "Miscounted in-flight checkpoints");
+ }
+ }
+
+ /**
+ * BLOCKING
+ * NACK all messages which have been read from Pubsub but not passed downstream.
+ * This way Pubsub will send them again promptly.
+ */
+ public void nackAll(PubsubReader<T> reader) throws IOException {
+ checkState(this.reader == null, "Cannot nackAll on persisting checkpoint");
+ List<String> batchYetToAckIds =
+ new ArrayList<>(Math.min(notYetReadIds.size(), ACK_BATCH_SIZE));
+ for (String ackId : notYetReadIds) {
+ batchYetToAckIds.add(ackId);
+ if (batchYetToAckIds.size() >= ACK_BATCH_SIZE) {
+ long nowMsSinceEpoch = reader.outer.outer.clock.currentTimeMillis();
+ reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds);
+ batchYetToAckIds.clear();
+ }
+ }
+ if (!batchYetToAckIds.isEmpty()) {
+ long nowMsSinceEpoch = reader.outer.outer.clock.currentTimeMillis();
+ reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds);
+ }
+ }
+ }
+
+ /**
+ * The coder for our checkpoints.
+ */
+ private static class PubsubCheckpointCoder<T> extends AtomicCoder<PubsubCheckpoint<T>> {
+ private static final Coder<List<String>> LIST_CODER = ListCoder.of(StringUtf8Coder.of());
+
+ @Override
+ public void encode(PubsubCheckpoint<T> value, OutputStream outStream, Context context)
+ throws IOException {
+ LIST_CODER.encode(value.notYetReadIds, outStream, context);
+ }
+
+ @Override
+ public PubsubCheckpoint<T> decode(InputStream inStream, Context context) throws IOException {
+ List<String> notYetReadIds = LIST_CODER.decode(inStream, context);
+ return new PubsubCheckpoint<>(null, null, notYetReadIds);
+ }
+ }
+
+ // ================================================================================
+ // Reader
+ // ================================================================================
+
+ /**
+ * A reader which keeps track of which messages have been received from Pubsub
+ * but not yet consumed downstream and/or ACKed back to Pubsub.
+ */
+ @VisibleForTesting
+ static class PubsubReader<T> extends UnboundedSource.UnboundedReader<T> {
+ /**
+ * For access to topic and checkpointCoder.
+ */
+ private final PubsubSource<T> outer;
+
+ /**
+ * Client on which to talk to Pubsub. Null if closed.
+ */
+ @Nullable
+ private PubsubClient pubsubClient;
+
+ /**
+ * Ack timeout, in ms, as set on subscription when we first start reading. Not
+ * updated thereafter. -1 if not yet determined.
+ */
+ private int ackTimeoutMs;
+
+ /**
+ * ACK ids of messages we have delivered downstream but not yet ACKed.
+ */
+ private Set<String> safeToAckIds;
+
+ /**
+ * Messages we have received from Pubsub and not yet delivered downstream.
+ * We preserve their order.
+ */
+ private final Queue<PubsubClient.IncomingMessage> notYetRead;
+
+ private static class InFlightState {
+ /**
+ * When request which yielded message was issues.
+ */
+ long requestTimeMsSinceEpoch;
+
+ /**
+ * When Pubsub will consider this message's ACK to timeout and thus it needs to be
+ * extended.
+ */
+ long ackDeadlineMsSinceEpoch;
+
+ public InFlightState(long requestTimeMsSinceEpoch, long ackDeadlineMsSinceEpoch) {
+ this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
+ this.ackDeadlineMsSinceEpoch = ackDeadlineMsSinceEpoch;
+ }
+ }
+
+ /**
+ * Map from ACK ids of messages we have received from Pubsub but not yet ACKed to their
+ * in flight state. Ordered from earliest to latest ACK deadline.
+ */
+ private final LinkedHashMap<String, InFlightState> inFlight;
+
+ /**
+ * Batches of successfully ACKed ids which need to be pruned from the above.
+ * CAUTION: Accessed by both reader and checkpointing threads.
+ */
+ private final Queue<List<String>> ackedIds;
+
+ /**
+ * Byte size of undecoded elements in {@link #notYetRead}.
+ */
+ private long notYetReadBytes;
+
+ /**
+ * Bucketed map from received time (as system time, ms since epoch) to message
+ * timestamps (mssince epoch) of all received but not-yet read messages.
+ * Used to estimate watermark.
+ */
+ private BucketingFunction minUnreadTimestampMsSinceEpoch;
+
+ /**
+ * Minimum of timestamps (ms since epoch) of all recently read messages.
+ * Used to estimate watermark.
+ */
+ private MovingFunction minReadTimestampMsSinceEpoch;
+
+ /**
+ * System time (ms since epoch) we last received a message from Pubsub, or -1 if
+ * not yet received any messages.
+ */
+ private long lastReceivedMsSinceEpoch;
+
+ /**
+ * The last reported watermark (ms since epoch), or beginning of time if none yet reported.
+ */
+ private long lastWatermarkMsSinceEpoch;
+
+ /**
+ * The current message, or {@literal null} if none.
+ */
+ @Nullable
+ private PubsubClient.IncomingMessage current;
+
+ /**
+ * Stats only: System time (ms since epoch) we last logs stats, or -1 if never.
+ */
+ private long lastLogTimestampMsSinceEpoch;
+
+ /**
+ * Stats only: Total number of messages received.
+ */
+ private long numReceived;
+
+ /**
+ * Stats only: Number of messages which have recently been received.
+ */
+ private MovingFunction numReceivedRecently;
+
+ /**
+ * Stats only: Number of messages which have recently had their deadline extended.
+ */
+ private MovingFunction numExtendedDeadlines;
+
+ /**
+ * Stats only: Number of messages which have recenttly had their deadline extended even
+ * though it may be too late to do so.
+ */
+ private MovingFunction numLateDeadlines;
+
+
+ /**
+ * Stats only: Number of messages which have recently been ACKed.
+ */
+ private MovingFunction numAcked;
+
+ /**
+ * Stats only: Number of messages which have recently expired (ACKs were extended for too
+ * long).
+ */
+ private MovingFunction numExpired;
+
+ /**
+ * Stats only: Number of messages which have recently been NACKed.
+ */
+ private MovingFunction numNacked;
+
+ /**
+ * Stats only: Number of message bytes which have recently been read by downstream consumer.
+ */
+ private MovingFunction numReadBytes;
+
+ /**
+ * Stats only: Minimum of timestamp (ms since epoch) of all recently received messages.
+ * Used to estimate timestamp skew. Does not contribute to watermark estimator.
+ */
+ private MovingFunction minReceivedTimestampMsSinceEpoch;
+
+ /**
+ * Stats only: Maximum of timestamp (ms since epoch) of all recently received messages.
+ * Used to estimate timestamp skew.
+ */
+ private MovingFunction maxReceivedTimestampMsSinceEpoch;
+
+ /**
+ * Stats only: Minimum of recent estimated watermarks (ms since epoch).
+ */
+ private MovingFunction minWatermarkMsSinceEpoch;
+
+ /**
+ * Stats ony: Maximum of recent estimated watermarks (ms since epoch).
+ */
+ private MovingFunction maxWatermarkMsSinceEpoch;
+
+ /**
+ * Stats only: Number of messages with timestamps strictly behind the estimated watermark
+ * at the time they are received. These may be considered 'late' by downstream computations.
+ */
+ private MovingFunction numLateMessages;
+
+ /**
+ * Stats only: Current number of checkpoints in flight.
+ * CAUTION: Accessed by both checkpointing and reader threads.
+ */
+ private AtomicInteger numInFlightCheckpoints;
+
+ /**
+ * Stats only: Maximum number of checkpoints in flight at any time.
+ */
+ private int maxInFlightCheckpoints;
+
+ private static MovingFunction newFun(Combine.BinaryCombineLongFn function) {
+ return new MovingFunction(SAMPLE_PERIOD.getMillis(),
+ SAMPLE_UPDATE.getMillis(),
+ MIN_WATERMARK_SPREAD,
+ MIN_WATERMARK_MESSAGES,
+ function);
+ }
+
+ /**
+ * Construct a reader.
+ */
+ public PubsubReader(PubsubOptions options, PubsubSource<T> outer)
+ throws IOException, GeneralSecurityException {
+ this.outer = outer;
+ pubsubClient =
+ outer.outer.pubsubFactory.newClient(outer.outer.timestampLabel, outer.outer.idLabel,
+ options);
+ ackTimeoutMs = -1;
+ safeToAckIds = new HashSet<>();
+ notYetRead = new ArrayDeque<>();
+ inFlight = new LinkedHashMap<>();
+ ackedIds = new ConcurrentLinkedQueue<>();
+ notYetReadBytes = 0;
+ minUnreadTimestampMsSinceEpoch = new BucketingFunction(SAMPLE_UPDATE.getMillis(),
+ MIN_WATERMARK_SPREAD,
+ MIN_WATERMARK_MESSAGES,
+ MIN);
+ minReadTimestampMsSinceEpoch = newFun(MIN);
+ lastReceivedMsSinceEpoch = -1;
+ lastWatermarkMsSinceEpoch = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+ current = null;
+ lastLogTimestampMsSinceEpoch = -1;
+ numReceived = 0L;
+ numReceivedRecently = newFun(SUM);
+ numExtendedDeadlines = newFun(SUM);
+ numLateDeadlines = newFun(SUM);
+ numAcked = newFun(SUM);
+ numExpired = newFun(SUM);
+ numNacked = newFun(SUM);
+ numReadBytes = newFun(SUM);
+ minReceivedTimestampMsSinceEpoch = newFun(MIN);
+ maxReceivedTimestampMsSinceEpoch = newFun(MAX);
+ minWatermarkMsSinceEpoch = newFun(MIN);
+ maxWatermarkMsSinceEpoch = newFun(MAX);
+ numLateMessages = newFun(SUM);
+ numInFlightCheckpoints = new AtomicInteger();
+ maxInFlightCheckpoints = 0;
+ }
+
+ @VisibleForTesting
+ PubsubClient getPubsubClient() {
+ return pubsubClient;
+ }
+
+ /**
+ * BLOCKING
+ * ACK {@code ackIds} back to Pubsub.
+ * CAUTION: May be invoked from a separate checkpointing thread.
+ * CAUTION: Retains {@code ackIds}.
+ */
+ void ackBatch(List<String> ackIds) throws IOException {
+ pubsubClient.acknowledge(outer.outer.subscription, ackIds);
+ ackedIds.add(ackIds);
+ }
+
+ /**
+ * BLOCKING
+ * NACK (ie request deadline extension of 0) receipt of messages from Pubsub
+ * with the given {@code ockIds}. Does not retain {@code ackIds}.
+ */
+ public void nackBatch(long nowMsSinceEpoch, List<String> ackIds) throws IOException {
+ pubsubClient.modifyAckDeadline(outer.outer.subscription, ackIds, 0);
+ numNacked.add(nowMsSinceEpoch, ackIds.size());
+ }
+
+ /**
+ * BLOCKING
+ * Extend the processing deadline for messages from Pubsub with the given {@code ackIds}.
+ * Does not retain {@code ackIds}.
+ */
+ private void extendBatch(long nowMsSinceEpoch, List<String> ackIds) throws IOException {
+ int extensionSec = (ackTimeoutMs * ACK_EXTENSION_PCT) / (100 * 1000);
+ pubsubClient.modifyAckDeadline(outer.outer.subscription, ackIds, extensionSec);
+ numExtendedDeadlines.add(nowMsSinceEpoch, ackIds.size());
+ }
+
+ /**
+ * Return the current time, in ms since epoch.
+ */
+ private long now() {
+ return outer.outer.clock.currentTimeMillis();
+ }
+
+ /**
+ * Messages which have been ACKed (via the checkpoint finalize) are no longer in flight.
+ * This is only used for flow control and stats.
+ */
+ private void retire() throws IOException {
+ long nowMsSinceEpoch = now();
+ while (true) {
+ List<String> ackIds = ackedIds.poll();
+ if (ackIds == null) {
+ return;
+ }
+ numAcked.add(nowMsSinceEpoch, ackIds.size());
+ for (String ackId : ackIds) {
+ inFlight.remove(ackId);
+ safeToAckIds.remove(ackId);
+ }
+ }
+ }
+
+ /**
+ * BLOCKING
+ * Extend deadline for all messages which need it.
+ * CAUTION: If extensions can't keep up with wallclock then we'll never return.
+ */
+ private void extend() throws IOException {
+ while (true) {
+ long nowMsSinceEpoch = now();
+ List<String> assumeExpired = new ArrayList<>();
+ List<String> toBeExtended = new ArrayList<>();
+ List<String> toBeExpired = new ArrayList<>();
+ // Messages will be in increasing deadline order.
+ for (Map.Entry<String, InFlightState> entry : inFlight.entrySet()) {
+ if (entry.getValue().ackDeadlineMsSinceEpoch - (ackTimeoutMs * ACK_SAFETY_PCT) / 100
+ > nowMsSinceEpoch) {
+ // All remaining messages don't need their ACKs to be extended.
+ break;
+ }
+
+ if (entry.getValue().ackDeadlineMsSinceEpoch - ACK_TOO_LATE.getMillis()
+ < nowMsSinceEpoch) {
+ // Pubsub may have already considered this message to have expired.
+ // If so it will (eventually) be made available on a future pull request.
+ // If this message ends up being committed then it will be considered a duplicate
+ // when re-pulled.
+ assumeExpired.add(entry.getKey());
+ continue;
+ }
+
+ if (entry.getValue().requestTimeMsSinceEpoch + PROCESSING_TIMEOUT.getMillis()
+ < nowMsSinceEpoch) {
+ // This message has been in-flight for too long.
+ // Give up on it, otherwise we risk extending its ACK indefinitely.
+ toBeExpired.add(entry.getKey());
+ continue;
+ }
+
+ // Extend the ACK for this message.
+ toBeExtended.add(entry.getKey());
+ if (toBeExtended.size() >= ACK_BATCH_SIZE) {
+ // Enough for one batch.
+ break;
+ }
+ }
+
+ if (assumeExpired.isEmpty() && toBeExtended.isEmpty() && toBeExpired.isEmpty()) {
+ // Nothing to be done.
+ return;
+ }
+
+ if (!assumeExpired.isEmpty()) {
+ // If we didn't make the ACK deadline assume expired and no longer in flight.
+ numLateDeadlines.add(nowMsSinceEpoch, assumeExpired.size());
+ for (String ackId : assumeExpired) {
+ inFlight.remove(ackId);
+ }
+ }
+
+ if (!toBeExpired.isEmpty()) {
+ // Expired messages are no longer considered in flight.
+ numExpired.add(nowMsSinceEpoch, toBeExpired.size());
+ for (String ackId : toBeExpired) {
+ inFlight.remove(ackId);
+ }
+ }
+
+ if (!toBeExtended.isEmpty()) {
+ // Pubsub extends acks from it's notion of current time.
+ // We'll try to track that on our side, but note the deadlines won't necessarily agree.
+ long newDeadlineMsSinceEpoch = nowMsSinceEpoch + (ackTimeoutMs * ACK_EXTENSION_PCT) / 100;
+ for (String ackId : toBeExtended) {
+ // Maintain increasing ack deadline order.
+ InFlightState state = inFlight.remove(ackId);
+ inFlight.put(ackId,
+ new InFlightState(state.requestTimeMsSinceEpoch, newDeadlineMsSinceEpoch));
+ }
+ // BLOCKs until extended.
+ extendBatch(nowMsSinceEpoch, toBeExtended);
+ }
+ }
+ }
+
+ /**
+ * BLOCKING
+ * Fetch another batch of messages from Pubsub.
+ */
+ private void pull() throws IOException {
+ if (inFlight.size() >= MAX_IN_FLIGHT) {
+ // Wait for checkpoint to be finalized before pulling anymore.
+ // There may be lag while checkpoints are persisted and the finalizeCheckpoint method
+ // is invoked. By limiting the in-flight messages we can ensure we don't end up consuming
+ // messages faster than we can checkpoint them.
+ return;
+ }
+
+ long requestTimeMsSinceEpoch = now();
+ long deadlineMsSinceEpoch = requestTimeMsSinceEpoch + ackTimeoutMs;
+
+ // Pull the next batch.
+ // BLOCKs until received.
+ Collection<PubsubClient.IncomingMessage> receivedMessages =
+ pubsubClient.pull(requestTimeMsSinceEpoch,
+ outer.outer.subscription,
+ PULL_BATCH_SIZE, true);
+ if (receivedMessages.isEmpty()) {
+ // Nothing available yet. Try again later.
+ return;
+ }
+
+ lastReceivedMsSinceEpoch = requestTimeMsSinceEpoch;
+
+ // Capture the received messages.
+ for (PubsubClient.IncomingMessage incomingMessage : receivedMessages) {
+ notYetRead.add(incomingMessage);
+ notYetReadBytes += incomingMessage.elementBytes.length;
+ inFlight.put(incomingMessage.ackId,
+ new InFlightState(requestTimeMsSinceEpoch, deadlineMsSinceEpoch));
+ numReceived++;
+ numReceivedRecently.add(requestTimeMsSinceEpoch, 1L);
+ minReceivedTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch,
+ incomingMessage.timestampMsSinceEpoch);
+ maxReceivedTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch,
+ incomingMessage.timestampMsSinceEpoch);
+ minUnreadTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch,
+ incomingMessage.timestampMsSinceEpoch);
+ }
+ }
+
+ /**
+ * Log stats if time to do so.
+ */
+ private void stats() {
+ long nowMsSinceEpoch = now();
+ if (lastLogTimestampMsSinceEpoch < 0) {
+ lastLogTimestampMsSinceEpoch = nowMsSinceEpoch;
+ return;
+ }
+ long deltaMs = nowMsSinceEpoch - lastLogTimestampMsSinceEpoch;
+ if (deltaMs < LOG_PERIOD.getMillis()) {
+ return;
+ }
+
+ String messageSkew = "unknown";
+ long minTimestamp = minReceivedTimestampMsSinceEpoch.get(nowMsSinceEpoch);
+ long maxTimestamp = maxReceivedTimestampMsSinceEpoch.get(nowMsSinceEpoch);
+ if (minTimestamp < Long.MAX_VALUE && maxTimestamp > Long.MIN_VALUE) {
+ messageSkew = (maxTimestamp - minTimestamp) + "ms";
+ }
+
+ String watermarkSkew = "unknown";
+ long minWatermark = minWatermarkMsSinceEpoch.get(nowMsSinceEpoch);
+ long maxWatermark = maxWatermarkMsSinceEpoch.get(nowMsSinceEpoch);
+ if (minWatermark < Long.MAX_VALUE && maxWatermark > Long.MIN_VALUE) {
+ watermarkSkew = (maxWatermark - minWatermark) + "ms";
+ }
+
+ String oldestInFlight = "no";
+ String oldestAckId = Iterables.getFirst(inFlight.keySet(), null);
+ if (oldestAckId != null) {
+ oldestInFlight =
+ (nowMsSinceEpoch - inFlight.get(oldestAckId).requestTimeMsSinceEpoch) + "ms";
+ }
+
+ LOG.info("Pubsub {} has "
+ + "{} received messages, "
+ + "{} current unread messages, "
+ + "{} current unread bytes, "
+ + "{} current in-flight msgs, "
+ + "{} oldest in-flight, "
+ + "{} current in-flight checkpoints, "
+ + "{} max in-flight checkpoints, "
+ + "{}B/s recent read, "
+ + "{} recent received, "
+ + "{} recent extended, "
+ + "{} recent late extended, "
+ + "{} recent ACKed, "
+ + "{} recent NACKed, "
+ + "{} recent expired, "
+ + "{} recent message timestamp skew, "
+ + "{} recent watermark skew, "
+ + "{} recent late messages, "
+ + "{} last reported watermark",
+ outer.outer.subscription,
+ numReceived,
+ notYetRead.size(),
+ notYetReadBytes,
+ inFlight.size(),
+ oldestInFlight,
+ numInFlightCheckpoints.get(),
+ maxInFlightCheckpoints,
+ numReadBytes.get(nowMsSinceEpoch) / (SAMPLE_PERIOD.getMillis() / 1000L),
+ numReceivedRecently.get(nowMsSinceEpoch),
+ numExtendedDeadlines.get(nowMsSinceEpoch),
+ numLateDeadlines.get(nowMsSinceEpoch),
+ numAcked.get(nowMsSinceEpoch),
+ numNacked.get(nowMsSinceEpoch),
+ numExpired.get(nowMsSinceEpoch),
+ messageSkew,
+ watermarkSkew,
+ numLateMessages.get(nowMsSinceEpoch),
+ new Instant(lastWatermarkMsSinceEpoch));
+
+ lastLogTimestampMsSinceEpoch = nowMsSinceEpoch;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ // Determine the ack timeout.
+ ackTimeoutMs = pubsubClient.ackDeadlineSeconds(outer.outer.subscription) * 1000;
+ return advance();
+ }
+
+ /**
+ * BLOCKING
+ * Return {@literal true} if a Pubsub messaage is available, {@literal false} if
+ * none is available at this time or we are over-subscribed. May BLOCK while extending
+ * ACKs or fetching available messages. Will not block waiting for messages.
+ */
+ @Override
+ public boolean advance() throws IOException {
+ // Emit stats.
+ stats();
+
+ if (current != null) {
+ // Current is consumed. It can no longer contribute to holding back the watermark.
+ minUnreadTimestampMsSinceEpoch.remove(current.requestTimeMsSinceEpoch);
+ current = null;
+ }
+
+ // Retire state associated with ACKed messages.
+ retire();
+
+ // Extend all pressing deadlines.
+ // Will BLOCK until done.
+ // If the system is pulling messages only to let them sit in a downsteam queue then
+ // this will have the effect of slowing down the pull rate.
+ // However, if the system is genuinely taking longer to process each message then
+ // the work to extend ACKs would be better done in the background.
+ extend();
+
+ if (notYetRead.isEmpty()) {
+ // Pull another batch.
+ // Will BLOCK until fetch returns, but will not block until a message is available.
+ pull();
+ }
+
+ // Take one message from queue.
+ current = notYetRead.poll();
+ if (current == null) {
+ // Try again later.
+ return false;
+ }
+ notYetReadBytes -= current.elementBytes.length;
+ checkState(notYetReadBytes >= 0);
+ long nowMsSinceEpoch = now();
+ numReadBytes.add(nowMsSinceEpoch, current.elementBytes.length);
+ minReadTimestampMsSinceEpoch.add(nowMsSinceEpoch, current.timestampMsSinceEpoch);
+ if (current.timestampMsSinceEpoch < lastWatermarkMsSinceEpoch) {
+ numLateMessages.add(nowMsSinceEpoch, 1L);
+ }
+
+ // Current message can be considered 'read' and will be persisted by the next
+ // checkpoint. So it is now safe to ACK back to Pubsub.
+ safeToAckIds.add(current.ackId);
+ return true;
+ }
+
+ @Override
+ public T getCurrent() throws NoSuchElementException {
+ if (current == null) {
+ throw new NoSuchElementException();
+ }
+ try {
+ return CoderUtils.decodeFromByteArray(outer.outer.elementCoder, current.elementBytes);
+ } catch (CoderException e) {
+ throw new RuntimeException("Unable to decode element from Pubsub message: ", e);
+ }
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ if (current == null) {
+ throw new NoSuchElementException();
+ }
+ return new Instant(current.timestampMsSinceEpoch);
+ }
+
+ @Override
+ public byte[] getCurrentRecordId() throws NoSuchElementException {
+ if (current == null) {
+ throw new NoSuchElementException();
+ }
+ return current.recordId;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (pubsubClient != null) {
+ pubsubClient.close();
+ pubsubClient = null;
+ }
+ }
+
+ @Override
+ public PubsubSource<T> getCurrentSource() {
+ return outer;
+ }
+
+ @Override
+ public Instant getWatermark() {
+ if (pubsubClient.isEOF() && notYetRead.isEmpty()) {
+ // For testing only: Advance the watermark to the end of time to signal
+ // the test is complete.
+ return BoundedWindow.TIMESTAMP_MAX_VALUE;
+ }
+
+ // NOTE: We'll allow the watermark to go backwards. The underlying runner is responsible
+ // for aggregating all reported watermarks and ensuring the aggregate is latched.
+ // If we attempt to latch locally then it is possible a temporary starvation of one reader
+ // could cause its estimated watermark to fast forward to current system time. Then when
+ // the reader resumes its watermark would be unable to resume tracking.
+ // By letting the underlying runner latch we avoid any problems due to localized starvation.
+ long nowMsSinceEpoch = now();
+ long readMin = minReadTimestampMsSinceEpoch.get(nowMsSinceEpoch);
+ long unreadMin = minUnreadTimestampMsSinceEpoch.get();
+ if (readMin == Long.MAX_VALUE
+ && unreadMin == Long.MAX_VALUE
+ && lastReceivedMsSinceEpoch >= 0
+ && nowMsSinceEpoch > lastReceivedMsSinceEpoch + SAMPLE_PERIOD.getMillis()) {
+ // We don't currently have any unread messages pending, we have not had any messages
+ // read for a while, and we have not received any new messages from Pubsub for a while.
+ // Advance watermark to current time.
+ // TODO: Estimate a timestamp lag.
+ lastWatermarkMsSinceEpoch = nowMsSinceEpoch;
+ } else if (minReadTimestampMsSinceEpoch.isSignificant()
+ || minUnreadTimestampMsSinceEpoch.isSignificant()) {
+ // Take minimum of the timestamps in all unread messages and recently read messages.
+ lastWatermarkMsSinceEpoch = Math.min(readMin, unreadMin);
+ }
+ // else: We're not confident enough to estimate a new watermark. Stick with the old one.
+ minWatermarkMsSinceEpoch.add(nowMsSinceEpoch, lastWatermarkMsSinceEpoch);
+ maxWatermarkMsSinceEpoch.add(nowMsSinceEpoch, lastWatermarkMsSinceEpoch);
+ return new Instant(lastWatermarkMsSinceEpoch);
+ }
+
+ @Override
+ public PubsubCheckpoint<T> getCheckpointMark() {
+ int cur = numInFlightCheckpoints.incrementAndGet();
+ maxInFlightCheckpoints = Math.max(maxInFlightCheckpoints, cur);
+ // It's possible for a checkpoint to be taken but never finalized.
+ // So we simply copy whatever safeToAckIds we currently have.
+ // It's possible a later checkpoint will be taken before an earlier one is finalized,
+ // in which case we'll double ACK messages to Pubsub. However Pubsub is fine with that.
+ List<String> snapshotSafeToAckIds = Lists.newArrayList(safeToAckIds);
+ List<String> snapshotNotYetReadIds = new ArrayList<>(notYetRead.size());
+ for (PubsubClient.IncomingMessage incomingMessage : notYetRead) {
+ snapshotNotYetReadIds.add(incomingMessage.ackId);
+ }
+ return new PubsubCheckpoint<>(this, snapshotSafeToAckIds, snapshotNotYetReadIds);
+ }
+
+ @Override
+ public long getSplitBacklogBytes() {
+ return notYetReadBytes;
+ }
+ }
+
+ // ================================================================================
+ // Source
+ // ================================================================================
+
+ @VisibleForTesting
+ static class PubsubSource<T> extends UnboundedSource<T, PubsubCheckpoint<T>> {
+ public final PubsubUnboundedSource<T> outer;
+
+ public PubsubSource(PubsubUnboundedSource<T> outer) {
+ this.outer = outer;
+ }
+
+ @Override
+ public List<PubsubSource<T>> generateInitialSplits(
+ int desiredNumSplits, PipelineOptions options) throws Exception {
+ List<PubsubSource<T>> result = new ArrayList<>(desiredNumSplits);
+ for (int i = 0; i < desiredNumSplits * SCALE_OUT; i++) {
+ // Since the source is immutable and Pubsub automatically shards we simply
+ // replicate ourselves the requested number of times
+ result.add(this);
+ }
+ return result;
+ }
+
+ @Override
+ public PubsubReader<T> createReader(
+ PipelineOptions options,
+ @Nullable PubsubCheckpoint<T> checkpoint) {
+ PubsubReader<T> reader;
+ try {
+ reader = new PubsubReader<>(options.as(PubsubOptions.class), this);
+ } catch (GeneralSecurityException | IOException e) {
+ throw new RuntimeException("Unable to subscribe to " + outer.subscription + ": ", e);
+ }
+ if (checkpoint != null) {
+ // NACK all messages we may have lost.
+ try {
+ // Will BLOCK until NACKed.
+ checkpoint.nackAll(reader);
+ } catch (IOException e) {
+ LOG.error("Pubsub {} cannot have {} lost messages NACKed, ignoring: {}",
+ outer.subscription, checkpoint.notYetReadIds.size(), e);
+ }
+ }
+ return reader;
+ }
+
+ @Nullable
+ @Override
+ public Coder<PubsubCheckpoint<T>> getCheckpointMarkCoder() {
+ @SuppressWarnings("unchecked") PubsubCheckpointCoder<T> typedCoder =
+ (PubsubCheckpointCoder<T>) CHECKPOINT_CODER;
+ return typedCoder;
+ }
+
+ @Override
+ public Coder<T> getDefaultOutputCoder() {
+ return outer.elementCoder;
+ }
+
+ @Override
+ public void validate() {
+ // Nothing to validate.
+ }
+
+ @Override
+ public boolean requiresDeduping() {
+ // We cannot prevent re-offering already read messages after a restore from checkpoint.
+ return true;
+ }
+ }
+
+ // ================================================================================
+ // StatsFn
+ // ================================================================================
+
+ private static class StatsFn<T> extends DoFn<T, T> {
+ private final Aggregator<Long, Long> elementCounter =
+ createAggregator("elements", new Sum.SumLongFn());
+
+ private final PubsubClientFactory pubsubFactory;
+ private final SubscriptionPath subscription;
+ @Nullable
+ private final String timestampLabel;
+ @Nullable
+ private final String idLabel;
+
+ public StatsFn(
+ PubsubClientFactory pubsubFactory,
+ SubscriptionPath subscription,
+ @Nullable
+ String timestampLabel,
+ @Nullable
+ String idLabel) {
+ this.pubsubFactory = pubsubFactory;
+ this.subscription = subscription;
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ }
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ elementCounter.addValue(1L);
+ c.output(c.element());
+ }
+
+ @Override
+ public void populateDisplayData(Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("subscription", subscription.getPath()));
+ builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
+ builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel));
+ builder.addIfNotNull(DisplayData.item("idLabel", idLabel));
+ }
+ }
+
+ // ================================================================================
+ // PubsubUnboundedSource
+ // ================================================================================
+
+ /**
+ * Clock to use for all timekeeping.
+ */
+ private Clock clock;
+
+ /**
+ * Factory for creating underlying Pubsub transport.
+ */
+ private final PubsubClientFactory pubsubFactory;
+
+ /**
+ * Subscription to read from.
+ */
+ private final SubscriptionPath subscription;
+
+ /**
+ * Coder for elements. Elements are effectively double-encoded: first to a byte array
+ * using this checkpointCoder, then to a base-64 string to conform to Pubsub's payload
+ * conventions.
+ */
+ private final Coder<T> elementCoder;
+
+ /**
+ * Pubsub metadata field holding timestamp of each element, or {@literal null} if should use
+ * Pubsub message publish timestamp instead.
+ */
+ @Nullable
+ private final String timestampLabel;
+
+ /**
+ * Pubsub metadata field holding id for each element, or {@literal null} if need to generate
+ * a unique id ourselves.
+ */
+ @Nullable
+ private final String idLabel;
+
+ /**
+ * Construct an unbounded source to consume from the Pubsub {@code subscription}.
+ */
+ public PubsubUnboundedSource(
+ Clock clock,
+ PubsubClientFactory pubsubFactory,
+ SubscriptionPath subscription,
+ Coder<T> elementCoder,
+ @Nullable String timestampLabel,
+ @Nullable String idLabel) {
+ this.clock = clock;
+ this.pubsubFactory = checkNotNull(pubsubFactory);
+ this.subscription = checkNotNull(subscription);
+ this.elementCoder = checkNotNull(elementCoder);
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ }
+
+ public PubsubClient.SubscriptionPath getSubscription() {
+ return subscription;
+ }
+
+ @Nullable
+ public String getTimestampLabel() {
+ return timestampLabel;
+ }
+
+ @Nullable
+ public String getIdLabel() {
+ return idLabel;
+ }
+
+ public Coder<T> getElementCoder() {
+ return elementCoder;
+ }
+
+ @Override
+ public PCollection<T> apply(PBegin input) {
+ return input.getPipeline().begin()
+ .apply(Read.from(new PubsubSource<T>(this)))
+ .apply(ParDo.named("PubsubUnboundedSource.Stats")
+ .of(new StatsFn<T>(pubsubFactory, subscription,
+ timestampLabel, idLabel)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java
new file mode 100644
index 0000000..ce35c24
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.transforms.Combine;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Keep track of the minimum/maximum/sum of a set of timestamped long values.
+ * For efficiency, bucket values by their timestamp.
+ */
+public class BucketingFunction {
+ private static class Bucket {
+ private int numSamples;
+ private long combinedValue;
+
+ public Bucket(BucketingFunction outer) {
+ numSamples = 0;
+ combinedValue = outer.function.identity();
+ }
+
+ public void add(BucketingFunction outer, long value) {
+ combinedValue = outer.function.apply(combinedValue, value);
+ numSamples++;
+ }
+
+ public boolean remove() {
+ numSamples--;
+ checkState(numSamples >= 0, "Lost count of samples");
+ return numSamples == 0;
+ }
+
+ public long get() {
+ return combinedValue;
+ }
+ }
+
+ /**
+ * How large a time interval to fit within each bucket.
+ */
+ private final long bucketWidthMs;
+
+ /**
+ * How many buckets are considered 'significant'?
+ */
+ private final int numSignificantBuckets;
+
+ /**
+ * How many samples are considered 'significant'?
+ */
+ private final int numSignificantSamples;
+
+ /**
+ * Function for combining sample values.
+ */
+ private final Combine.BinaryCombineLongFn function;
+
+ /**
+ * Active buckets.
+ */
+ private final Map<Long, Bucket> buckets;
+
+ public BucketingFunction(
+ long bucketWidthMs,
+ int numSignificantBuckets,
+ int numSignificantSamples,
+ Combine.BinaryCombineLongFn function) {
+ this.bucketWidthMs = bucketWidthMs;
+ this.numSignificantBuckets = numSignificantBuckets;
+ this.numSignificantSamples = numSignificantSamples;
+ this.function = function;
+ this.buckets = new HashMap<>();
+ }
+
+ /**
+ * Which bucket key corresponds to {@code timeMsSinceEpoch}.
+ */
+ private long key(long timeMsSinceEpoch) {
+ return timeMsSinceEpoch - (timeMsSinceEpoch % bucketWidthMs);
+ }
+
+ /**
+ * Add one sample of {@code value} (to bucket) at {@code timeMsSinceEpoch}.
+ */
+ public void add(long timeMsSinceEpoch, long value) {
+ long key = key(timeMsSinceEpoch);
+ Bucket bucket = buckets.get(key);
+ if (bucket == null) {
+ bucket = new Bucket(this);
+ buckets.put(key, bucket);
+ }
+ bucket.add(this, value);
+ }
+
+ /**
+ * Remove one sample (from bucket) at {@code timeMsSinceEpoch}.
+ */
+ public void remove(long timeMsSinceEpoch) {
+ long key = key(timeMsSinceEpoch);
+ Bucket bucket = buckets.get(key);
+ if (bucket == null) {
+ return;
+ }
+ if (bucket.remove()) {
+ buckets.remove(key);
+ }
+ }
+
+ /**
+ * Return the (bucketized) combined value of all samples.
+ */
+ public long get() {
+ long result = function.identity();
+ for (Bucket bucket : buckets.values()) {
+ result = function.apply(result, bucket.get());
+ }
+ return result;
+ }
+
+ /**
+ * Is the current result 'significant'? Ie is it drawn from enough buckets
+ * or from enough samples?
+ */
+ public boolean isSignificant() {
+ if (buckets.size() >= numSignificantBuckets) {
+ return true;
+ }
+ int totalSamples = 0;
+ for (Bucket bucket : buckets.values()) {
+ totalSamples += bucket.numSamples;
+ }
+ return totalSamples >= numSignificantSamples;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java
new file mode 100644
index 0000000..84ba8b8
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.sdk.transforms.Combine;
+import java.util.Arrays;
+
+/**
+ * Keep track of the moving minimum/maximum/sum of sampled long values. The minimum/maximum/sum
+ * is over at most the last {@link #samplePeriodMs}, and is updated every
+ * {@link #sampleUpdateMs}.
+ */
+public class MovingFunction {
+ /**
+ * How far back to retain samples, in ms.
+ */
+ private final long samplePeriodMs;
+
+ /**
+ * How frequently to update the moving function, in ms.
+ */
+ private final long sampleUpdateMs;
+
+ /**
+ * How many buckets are considered 'significant'?
+ */
+ private final int numSignificantBuckets;
+
+ /**
+ * How many samples are considered 'significant'?
+ */
+ private final int numSignificantSamples;
+
+ /**
+ * Function for combining sample values.
+ */
+ private final Combine.BinaryCombineLongFn function;
+
+ /**
+ * Minimum/maximum/sum of all values per bucket.
+ */
+ private final long[] buckets;
+
+ /**
+ * How many samples have been added to each bucket.
+ */
+ private final int[] numSamples;
+
+ /**
+ * Time of start of current bucket.
+ */
+ private long currentMsSinceEpoch;
+
+ /**
+ * Index of bucket corresponding to above timestamp, or -1 if no entries.
+ */
+ private int currentIndex;
+
+ public MovingFunction(long samplePeriodMs, long sampleUpdateMs,
+ int numSignificantBuckets, int numSignificantSamples,
+ Combine.BinaryCombineLongFn function) {
+ this.samplePeriodMs = samplePeriodMs;
+ this.sampleUpdateMs = sampleUpdateMs;
+ this.numSignificantBuckets = numSignificantBuckets;
+ this.numSignificantSamples = numSignificantSamples;
+ this.function = function;
+ int n = (int) (samplePeriodMs / sampleUpdateMs);
+ buckets = new long[n];
+ Arrays.fill(buckets, function.identity());
+ numSamples = new int[n];
+ Arrays.fill(numSamples, 0);
+ currentMsSinceEpoch = -1;
+ currentIndex = -1;
+ }
+
+ /**
+ * Flush stale values.
+ */
+ private void flush(long nowMsSinceEpoch) {
+ checkArgument(nowMsSinceEpoch >= 0, "Only positive timestamps supported");
+ if (currentIndex < 0) {
+ currentMsSinceEpoch = nowMsSinceEpoch - (nowMsSinceEpoch % sampleUpdateMs);
+ currentIndex = 0;
+ }
+ checkArgument(nowMsSinceEpoch >= currentMsSinceEpoch, "Attempting to move backwards");
+ int newBuckets =
+ Math.min((int) ((nowMsSinceEpoch - currentMsSinceEpoch) / sampleUpdateMs),
+ buckets.length);
+ while (newBuckets > 0) {
+ currentIndex = (currentIndex + 1) % buckets.length;
+ buckets[currentIndex] = function.identity();
+ numSamples[currentIndex] = 0;
+ newBuckets--;
+ currentMsSinceEpoch += sampleUpdateMs;
+ }
+ }
+
+ /**
+ * Add {@code value} at {@code nowMsSinceEpoch}.
+ */
+ public void add(long nowMsSinceEpoch, long value) {
+ flush(nowMsSinceEpoch);
+ buckets[currentIndex] = function.apply(buckets[currentIndex], value);
+ numSamples[currentIndex]++;
+ }
+
+ /**
+ * Return the minimum/maximum/sum of all retained values within {@link #samplePeriodMs}
+ * of {@code nowMsSinceEpoch}.
+ */
+ public long get(long nowMsSinceEpoch) {
+ flush(nowMsSinceEpoch);
+ long result = function.identity();
+ for (int i = 0; i < buckets.length; i++) {
+ result = function.apply(result, buckets[i]);
+ }
+ return result;
+ }
+
+ /**
+ * Is the current result 'significant'? Ie is it drawn from enough buckets
+ * or from enough samples?
+ */
+ public boolean isSignificant() {
+ int totalSamples = 0;
+ int activeBuckets = 0;
+ for (int i = 0; i < buckets.length; i++) {
+ totalSamples += numSamples[i];
+ if (numSamples[i] > 0) {
+ activeBuckets++;
+ }
+ }
+ return activeBuckets >= numSignificantBuckets || totalSamples >= numSignificantSamples;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
index 29d0fd5..aa73d42 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
@@ -136,6 +136,8 @@ public class PubsubApiaryClient extends PubsubClient {
}
if (idLabel != null) {
+ // TODO: The id should be associated with the OutgoingMessage so that it is stable
+ // across retried bundles
attributes.put(idLabel,
Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString());
}
@@ -300,4 +302,9 @@ public class PubsubApiaryClient extends PubsubClient {
Subscription response = pubsub.projects().subscriptions().get(subscription.getPath()).execute();
return response.getAckDeadlineSeconds();
}
+
+ @Override
+ public boolean isEOF() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
index 9c75003..dc4858e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
@@ -286,6 +286,8 @@ public abstract class PubsubClient implements Closeable {
*/
public final long timestampMsSinceEpoch;
+ // TODO: Support a record id.
+
public OutgoingMessage(byte[] elementBytes, long timestampMsSinceEpoch) {
this.elementBytes = elementBytes;
this.timestampMsSinceEpoch = timestampMsSinceEpoch;
@@ -503,4 +505,11 @@ public abstract class PubsubClient implements Closeable {
* @throws IOException
*/
public abstract int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException;
+
+ /**
+ * Return {@literal true} if {@link pull} will always return empty list. Actual clients
+ * will return {@literal false}. Test clients may return {@literal true} to signal that all
+ * expected messages have been pulled and the test may complete.
+ */
+ public abstract boolean isEOF();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
index bb535aa..e759513 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.util;
import static com.google.common.base.Preconditions.checkState;
+import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PubsubOptions;
import com.google.auth.oauth2.GoogleCredentials;
@@ -71,6 +72,9 @@ import javax.annotation.Nullable;
/**
* A helper class for talking to Pubsub via grpc.
+ *
+ * <p>CAUTION: Currently uses the application default credentials and does not respect any
+ * credentials-related arguments in {@link GcpOptions}.
*/
public class PubsubGrpcClient extends PubsubClient {
private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com";
@@ -441,4 +445,9 @@ public class PubsubGrpcClient extends PubsubClient {
Subscription response = subscriberStub().getSubscription(request);
return response.getAckDeadlineSeconds();
}
+
+ @Override
+ public boolean isEOF() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
index 9c3dd85..c1dfa06 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
@@ -18,14 +18,15 @@
package org.apache.beam.sdk.util;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import org.apache.beam.sdk.options.PubsubOptions;
-import com.google.common.annotations.VisibleForTesting;
+import com.google.api.client.util.Clock;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -38,227 +39,308 @@ import javax.annotation.Nullable;
/**
* A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for
* testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline}
- * methods.
+ * methods. Relies on statics to mimic the Pubsub service, though we try to hide that.
*/
public class PubsubTestClient extends PubsubClient {
- public static PubsubClientFactory createFactoryForPublish(
+ /**
+ * Mimic the state of the simulated Pubsub 'service'.
+ *
+ * Note that the {@link PubsubTestClientFactory} is serialized/deserialized even when running
+ * test
+ * pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s to be created from
+ * the same client factory and run in parallel. Thus we can't enforce aliasing of the following
+ * data structures over all clients and must resort to a static.
+ */
+ private static class State {
+ /**
+ * True if has been primed for a test but not yet validated.
+ */
+ boolean isActive;
+
+ /**
+ * Publish mode only: Only publish calls for this topic are allowed.
+ */
+ @Nullable
+ TopicPath expectedTopic;
+
+ /**
+ * Publish mode only: Messages yet to seen in a {@link #publish} call.
+ */
+ @Nullable
+ Set<OutgoingMessage> remainingExpectedOutgoingMessages;
+
+ /**
+ * Pull mode only: Clock from which to get current time.
+ */
+ @Nullable
+ Clock clock;
+
+ /**
+ * Pull mode only: Only pull calls for this subscription are allowed.
+ */
+ @Nullable
+ SubscriptionPath expectedSubscription;
+
+ /**
+ * Pull mode only: Timeout to simulate.
+ */
+ int ackTimeoutSec;
+
+ /**
+ * Pull mode only: Messages waiting to be received by a {@link #pull} call.
+ */
+ @Nullable
+ List<IncomingMessage> remainingPendingIncomingMessages;
+
+ /**
+ * Pull mode only: Messages which have been returned from a {@link #pull} call and
+ * not yet ACKed by an {@link #acknowledge} call.
+ */
+ @Nullable
+ Map<String, IncomingMessage> pendingAckIncomingMessages;
+
+ /**
+ * Pull mode only: When above messages are due to have their ACK deadlines expire.
+ */
+ @Nullable
+ Map<String, Long> ackDeadline;
+ }
+
+ private static final State STATE = new State();
+
+ /** Closing the factory will validate all expected messages were processed. */
+ public interface PubsubTestClientFactory extends PubsubClientFactory, Closeable {
+ }
+
+ /**
+ * Return a factory for testing publishers. Only one factory may be in-flight at a time.
+ * The factory must be closed when the test is complete, at which point final validation will
+ * occur.
+ */
+ public static PubsubTestClientFactory createFactoryForPublish(
final TopicPath expectedTopic,
- final Set<OutgoingMessage> expectedOutgoingMessages) {
- return new PubsubClientFactory() {
+ final Iterable<OutgoingMessage> expectedOutgoingMessages) {
+ synchronized (STATE) {
+ checkState(!STATE.isActive, "Test still in flight");
+ STATE.expectedTopic = expectedTopic;
+ STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages);
+ STATE.isActive = true;
+ }
+ return new PubsubTestClientFactory() {
@Override
public PubsubClient newClient(
@Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
throws IOException {
- return new PubsubTestClient(expectedTopic, null, 0, expectedOutgoingMessages, null);
+ return new PubsubTestClient();
}
@Override
public String getKind() {
return "PublishTest";
}
+
+ @Override
+ public void close() {
+ synchronized (STATE) {
+ checkState(STATE.isActive, "No test still in flight");
+ checkState(STATE.remainingExpectedOutgoingMessages.isEmpty(),
+ "Still waiting for %s messages to be published",
+ STATE.remainingExpectedOutgoingMessages.size());
+ STATE.isActive = false;
+ STATE.remainingExpectedOutgoingMessages = null;
+ }
+ }
};
}
- public static PubsubClientFactory createFactoryForPull(
- @Nullable final SubscriptionPath expectedSubscription,
+ /**
+ * Return a factory for testing subscribers. Only one factory may be in-flight at a time.
+ * The factory must be closed when the test in complete
+ */
+ public static PubsubTestClientFactory createFactoryForPull(
+ final Clock clock,
+ final SubscriptionPath expectedSubscription,
final int ackTimeoutSec,
- @Nullable final List<IncomingMessage> expectedIncomingMessages) {
- return new PubsubClientFactory() {
+ final Iterable<IncomingMessage> expectedIncomingMessages) {
+ synchronized (STATE) {
+ checkState(!STATE.isActive, "Test still in flight");
+ STATE.clock = clock;
+ STATE.expectedSubscription = expectedSubscription;
+ STATE.ackTimeoutSec = ackTimeoutSec;
+ STATE.remainingPendingIncomingMessages = Lists.newArrayList(expectedIncomingMessages);
+ STATE.pendingAckIncomingMessages = new HashMap<>();
+ STATE.ackDeadline = new HashMap<>();
+ STATE.isActive = true;
+ }
+ return new PubsubTestClientFactory() {
@Override
public PubsubClient newClient(
@Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
throws IOException {
- return new PubsubTestClient(null, expectedSubscription, ackTimeoutSec,
- null, expectedIncomingMessages);
+ return new PubsubTestClient();
}
@Override
public String getKind() {
return "PullTest";
}
+
+ @Override
+ public void close() {
+ synchronized (STATE) {
+ checkState(STATE.isActive, "No test still in flight");
+ checkState(STATE.remainingPendingIncomingMessages.isEmpty(),
+ "Still waiting for %s messages to be pulled",
+ STATE.remainingPendingIncomingMessages.size());
+ checkState(STATE.pendingAckIncomingMessages.isEmpty(),
+ "Still waiting for %s messages to be ACKed",
+ STATE.pendingAckIncomingMessages.size());
+ checkState(STATE.ackDeadline.isEmpty(),
+ "Still waiting for %s messages to be ACKed",
+ STATE.ackDeadline.size());
+ STATE.isActive = false;
+ STATE.remainingPendingIncomingMessages = null;
+ STATE.pendingAckIncomingMessages = null;
+ STATE.ackDeadline = null;
+ }
+ }
};
}
/**
- * Only publish calls for this topic are allowed.
- */
- @Nullable
- private TopicPath expectedTopic;
- /**
- * Only pull calls for this subscription are allowed.
- */
- @Nullable
- private SubscriptionPath expectedSubscription;
-
- /**
- * Timeout to simulate.
+ * Return true if in pull mode.
*/
- private int ackTimeoutSec;
-
- /**
- * Messages yet to seen in a {@link #publish} call.
- */
- @Nullable
- private Set<OutgoingMessage> remainingExpectedOutgoingMessages;
-
- /**
- * Messages waiting to be received by a {@link #pull} call.
- */
- @Nullable
- private List<IncomingMessage> remainingPendingIncomingMessages;
-
- /**
- * Messages which have been returned from a {@link #pull} call and
- * not yet ACKed by an {@link #acknowledge} call.
- */
- private Map<String, IncomingMessage> pendingAckIncommingMessages;
-
- /**
- * When above messages are due to have their ACK deadlines expire.
- */
- private Map<String, Long> ackDeadline;
+ private boolean inPullMode() {
+ checkState(STATE.isActive, "No test is active");
+ return STATE.expectedSubscription != null;
+ }
/**
- * Current time.
+ * Return true if in publish mode.
*/
- private long nowMsSinceEpoch;
-
- @VisibleForTesting
- PubsubTestClient(
- @Nullable TopicPath expectedTopic,
- @Nullable SubscriptionPath expectedSubscription,
- int ackTimeoutSec,
- @Nullable Set<OutgoingMessage> expectedOutgoingMessages,
- @Nullable List<IncomingMessage> expectedIncomingMessages) {
- this.expectedTopic = expectedTopic;
- this.expectedSubscription = expectedSubscription;
- this.ackTimeoutSec = ackTimeoutSec;
-
- this.remainingExpectedOutgoingMessages = expectedOutgoingMessages;
- this.remainingPendingIncomingMessages = expectedIncomingMessages;
-
- this.pendingAckIncommingMessages = new HashMap<>();
- this.ackDeadline = new HashMap<>();
- this.nowMsSinceEpoch = Long.MIN_VALUE;
+ private boolean inPublishMode() {
+ checkState(STATE.isActive, "No test is active");
+ return STATE.expectedTopic != null;
}
/**
- * Advance wall-clock time to {@code newNowMsSinceEpoch}. This will simulate Pubsub expiring
+ * For subscription mode only:
+ * Track progression of time according to the {@link Clock} passed . This will simulate Pubsub
+ * expiring
* outstanding ACKs.
*/
- public void advanceTo(long newNowMsSinceEpoch) {
- checkArgument(newNowMsSinceEpoch >= nowMsSinceEpoch,
- "Cannot advance time backwards from %d to %d", nowMsSinceEpoch,
- newNowMsSinceEpoch);
- nowMsSinceEpoch = newNowMsSinceEpoch;
- // Any messages who's ACKs timed out are available for re-pulling.
- Iterator<Map.Entry<String, Long>> deadlineItr = ackDeadline.entrySet().iterator();
- while (deadlineItr.hasNext()) {
- Map.Entry<String, Long> entry = deadlineItr.next();
- if (entry.getValue() <= nowMsSinceEpoch) {
- remainingPendingIncomingMessages.add(pendingAckIncommingMessages.remove(entry.getKey()));
- deadlineItr.remove();
+ public void advance() {
+ synchronized (STATE) {
+ checkState(inPullMode(), "Can only advance in pull mode");
+ // Any messages who's ACKs timed out are available for re-pulling.
+ Iterator<Map.Entry<String, Long>> deadlineItr = STATE.ackDeadline.entrySet().iterator();
+ while (deadlineItr.hasNext()) {
+ Map.Entry<String, Long> entry = deadlineItr.next();
+ if (entry.getValue() <= STATE.clock.currentTimeMillis()) {
+ STATE.remainingPendingIncomingMessages.add(
+ STATE.pendingAckIncomingMessages.remove(entry.getKey()));
+ deadlineItr.remove();
+ }
}
}
}
@Override
public void close() {
- if (remainingExpectedOutgoingMessages != null) {
- checkState(this.remainingExpectedOutgoingMessages.isEmpty(),
- "Failed to pull %d messages", this.remainingExpectedOutgoingMessages.size());
- remainingExpectedOutgoingMessages = null;
- }
- if (remainingPendingIncomingMessages != null) {
- checkState(remainingPendingIncomingMessages.isEmpty(),
- "Failed to publish %d messages", remainingPendingIncomingMessages.size());
- checkState(pendingAckIncommingMessages.isEmpty(),
- "Failed to ACK %d messages", pendingAckIncommingMessages.size());
- checkState(ackDeadline.isEmpty(),
- "Failed to ACK %d messages", ackDeadline.size());
- remainingPendingIncomingMessages = null;
- pendingAckIncommingMessages = null;
- ackDeadline = null;
- }
}
@Override
public int publish(
TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException {
- checkNotNull(expectedTopic, "Missing expected topic");
- checkNotNull(remainingExpectedOutgoingMessages, "Missing expected outgoing messages");
- checkState(topic.equals(expectedTopic), "Topic %s does not match expected %s", topic,
- expectedTopic);
- for (OutgoingMessage outgoingMessage : outgoingMessages) {
- checkState(remainingExpectedOutgoingMessages.remove(outgoingMessage),
- "Unexpeced outgoing message %s", outgoingMessage);
+ synchronized (STATE) {
+ checkState(inPublishMode(), "Can only publish in publish mode");
+ checkState(topic.equals(STATE.expectedTopic), "Topic %s does not match expected %s", topic,
+ STATE.expectedTopic);
+ for (OutgoingMessage outgoingMessage : outgoingMessages) {
+ checkState(STATE.remainingExpectedOutgoingMessages.remove(outgoingMessage),
+ "Unexpected outgoing message %s", outgoingMessage);
+ }
+ return outgoingMessages.size();
}
- return outgoingMessages.size();
}
@Override
public List<IncomingMessage> pull(
long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize,
boolean returnImmediately) throws IOException {
- checkState(requestTimeMsSinceEpoch == nowMsSinceEpoch,
- "Simulated time %d does not match requset time %d", nowMsSinceEpoch,
- requestTimeMsSinceEpoch);
- checkNotNull(expectedSubscription, "Missing expected subscription");
- checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages");
- checkState(subscription.equals(expectedSubscription),
- "Subscription %s does not match expected %s", subscription, expectedSubscription);
- checkState(returnImmediately, "PubsubTestClient only supports returning immediately");
-
- List<IncomingMessage> incomingMessages = new ArrayList<>();
- Iterator<IncomingMessage> pendItr = remainingPendingIncomingMessages.iterator();
- while (pendItr.hasNext()) {
- IncomingMessage incomingMessage = pendItr.next();
- pendItr.remove();
- IncomingMessage incomingMessageWithRequestTime =
- incomingMessage.withRequestTime(requestTimeMsSinceEpoch);
- incomingMessages.add(incomingMessageWithRequestTime);
- pendingAckIncommingMessages.put(incomingMessageWithRequestTime.ackId,
- incomingMessageWithRequestTime);
- ackDeadline.put(incomingMessageWithRequestTime.ackId,
- requestTimeMsSinceEpoch + ackTimeoutSec * 1000);
- if (incomingMessages.size() >= batchSize) {
- break;
+ synchronized (STATE) {
+ checkState(inPullMode(), "Can only pull in pull mode");
+ long now = STATE.clock.currentTimeMillis();
+ checkState(requestTimeMsSinceEpoch == now,
+ "Simulated time %s does not match request time %s", now, requestTimeMsSinceEpoch);
+ checkState(subscription.equals(STATE.expectedSubscription),
+ "Subscription %s does not match expected %s", subscription,
+ STATE.expectedSubscription);
+ checkState(returnImmediately, "Pull only supported if returning immediately");
+
+ List<IncomingMessage> incomingMessages = new ArrayList<>();
+ Iterator<IncomingMessage> pendItr = STATE.remainingPendingIncomingMessages.iterator();
+ while (pendItr.hasNext()) {
+ IncomingMessage incomingMessage = pendItr.next();
+ pendItr.remove();
+ IncomingMessage incomingMessageWithRequestTime =
+ incomingMessage.withRequestTime(requestTimeMsSinceEpoch);
+ incomingMessages.add(incomingMessageWithRequestTime);
+ STATE.pendingAckIncomingMessages.put(incomingMessageWithRequestTime.ackId,
+ incomingMessageWithRequestTime);
+ STATE.ackDeadline.put(incomingMessageWithRequestTime.ackId,
+ requestTimeMsSinceEpoch + STATE.ackTimeoutSec * 1000);
+ if (incomingMessages.size() >= batchSize) {
+ break;
+ }
}
+ return incomingMessages;
}
- return incomingMessages;
}
@Override
public void acknowledge(
SubscriptionPath subscription,
List<String> ackIds) throws IOException {
- checkNotNull(expectedSubscription, "Missing expected subscription");
- checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages");
- checkState(subscription.equals(expectedSubscription),
- "Subscription %s does not match expected %s", subscription, expectedSubscription);
-
- for (String ackId : ackIds) {
- checkState(ackDeadline.remove(ackId) != null,
- "No message with ACK id %s is outstanding", ackId);
- checkState(pendingAckIncommingMessages.remove(ackId) != null,
- "No message with ACK id %s is outstanding", ackId);
+ synchronized (STATE) {
+ checkState(inPullMode(), "Can only acknowledge in pull mode");
+ checkState(subscription.equals(STATE.expectedSubscription),
+ "Subscription %s does not match expected %s", subscription,
+ STATE.expectedSubscription);
+
+ for (String ackId : ackIds) {
+ checkState(STATE.ackDeadline.remove(ackId) != null,
+ "No message with ACK id %s is waiting for an ACK", ackId);
+ checkState(STATE.pendingAckIncomingMessages.remove(ackId) != null,
+ "No message with ACK id %s is waiting for an ACK", ackId);
+ }
}
}
@Override
public void modifyAckDeadline(
SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) throws IOException {
- checkNotNull(expectedSubscription, "Missing expected subscription");
- checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages");
- checkState(subscription.equals(expectedSubscription),
- "Subscription %s does not match expected %s", subscription, expectedSubscription);
-
- for (String ackId : ackIds) {
- checkState(ackDeadline.remove(ackId) != null,
- "No message with ACK id %s is outstanding", ackId);
- checkState(pendingAckIncommingMessages.containsKey(ackId),
- "No message with ACK id %s is outstanding", ackId);
- ackDeadline.put(ackId, nowMsSinceEpoch + deadlineSeconds * 1000);
+ synchronized (STATE) {
+ checkState(inPullMode(), "Can only modify ack deadline in pull mode");
+ checkState(subscription.equals(STATE.expectedSubscription),
+ "Subscription %s does not match expected %s", subscription,
+ STATE.expectedSubscription);
+
+ for (String ackId : ackIds) {
+ if (deadlineSeconds > 0) {
+ checkState(STATE.ackDeadline.remove(ackId) != null,
+ "No message with ACK id %s is waiting for an ACK", ackId);
+ checkState(STATE.pendingAckIncomingMessages.containsKey(ackId),
+ "No message with ACK id %s is waiting for an ACK", ackId);
+ STATE.ackDeadline.put(ackId, STATE.clock.currentTimeMillis() + deadlineSeconds * 1000);
+ } else {
+ checkState(STATE.ackDeadline.remove(ackId) != null,
+ "No message with ACK id %s is waiting for an ACK", ackId);
+ IncomingMessage message = STATE.pendingAckIncomingMessages.remove(ackId);
+ checkState(message != null, "No message with ACK id %s is waiting for an ACK", ackId);
+ STATE.remainingPendingIncomingMessages.add(message);
+ }
+ }
}
}
@@ -296,6 +378,16 @@ public class PubsubTestClient extends PubsubClient {
@Override
public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
- return ackTimeoutSec;
+ synchronized (STATE) {
+ return STATE.ackTimeoutSec;
+ }
+ }
+
+ @Override
+ public boolean isEOF() {
+ synchronized (STATE) {
+ checkState(inPullMode(), "Can only check EOF in pull mode");
+ return STATE.remainingPendingIncomingMessages.isEmpty();
+ }
}
}