You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/20 22:58:24 UTC
[1/3] beam git commit: Add PubSub attributes support to PubsubIO.
Repository: beam
Updated Branches:
refs/heads/master a6810372b -> c6e46b655
http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
index 5bc1664..7a4be62 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
@@ -19,10 +19,14 @@
package org.apache.beam.sdk.io;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.hash.Hashing;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.PubsubUnboundedSink.RecordIdMethod;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -32,6 +36,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.PubsubClient;
import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
import org.apache.beam.sdk.util.PubsubClient.TopicPath;
@@ -49,9 +54,11 @@ import org.junit.runners.JUnit4;
* Test PubsubUnboundedSink.
*/
@RunWith(JUnit4.class)
-public class PubsubUnboundedSinkTest {
+public class PubsubUnboundedSinkTest implements Serializable {
private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
private static final String DATA = "testData";
+ private static final Map<String, String> ATTRIBUTES =
+ ImmutableMap.<String, String>builder().put("a", "b").put("c", "d").build();
private static final long TIMESTAMP = 1234L;
private static final String TIMESTAMP_LABEL = "timestamp";
private static final String ID_LABEL = "id";
@@ -69,11 +76,12 @@ public class PubsubUnboundedSinkTest {
}
@Rule
- public TestPipeline p = TestPipeline.create();
+ public transient TestPipeline p = TestPipeline.create();
@Test
public void saneCoder() throws Exception {
- OutgoingMessage message = new OutgoingMessage(DATA.getBytes(), TIMESTAMP, getRecordId(DATA));
+ OutgoingMessage message = new OutgoingMessage(
+ DATA.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(DATA));
CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message);
CoderProperties.coderSerializable(PubsubUnboundedSink.CODER);
}
@@ -82,7 +90,10 @@ public class PubsubUnboundedSinkTest {
@Category(NeedsRunner.class)
public void sendOneMessage() throws IOException {
List<OutgoingMessage> outgoing =
- ImmutableList.of(new OutgoingMessage(DATA.getBytes(), TIMESTAMP, getRecordId(DATA)));
+ ImmutableList.of(new OutgoingMessage(
+ DATA.getBytes(),
+ ATTRIBUTES,
+ TIMESTAMP, getRecordId(DATA)));
int batchSize = 1;
int batchBytes = 1;
try (PubsubTestClientFactory factory =
@@ -91,8 +102,14 @@ public class PubsubUnboundedSinkTest {
PubsubUnboundedSink<String> sink =
new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(),
TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
- Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC);
-
+ Duration.standardSeconds(2),
+ new SimpleFunction<String, PubsubIO.PubsubMessage>() {
+ @Override
+ public PubsubIO.PubsubMessage apply(String input) {
+ return new PubsubIO.PubsubMessage(input.getBytes(), ATTRIBUTES);
+ }
+ },
+ RecordIdMethod.DETERMINISTIC);
p.apply(Create.of(ImmutableList.of(DATA)))
.apply(ParDo.of(new Stamp()))
.apply(sink);
@@ -111,7 +128,8 @@ public class PubsubUnboundedSinkTest {
int batchBytes = 1000;
for (int i = 0; i < batchSize * 10; i++) {
String str = String.valueOf(i);
- outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP, getRecordId(str)));
+ outgoing.add(new OutgoingMessage(
+ str.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(str)));
data.add(str);
}
try (PubsubTestClientFactory factory =
@@ -120,8 +138,7 @@ public class PubsubUnboundedSinkTest {
PubsubUnboundedSink<String> sink =
new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(),
TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
- Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC);
-
+ Duration.standardSeconds(2), null, RecordIdMethod.DETERMINISTIC);
p.apply(Create.of(data))
.apply(ParDo.of(new Stamp()))
.apply(sink);
@@ -145,7 +162,8 @@ public class PubsubUnboundedSinkTest {
sb.append(String.valueOf(n));
}
String str = sb.toString();
- outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP, getRecordId(str)));
+ outgoing.add(new OutgoingMessage(
+ str.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(str)));
data.add(str);
n += str.length();
}
@@ -156,8 +174,7 @@ public class PubsubUnboundedSinkTest {
new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC),
StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2),
- RecordIdMethod.DETERMINISTIC);
-
+ null, RecordIdMethod.DETERMINISTIC);
p.apply(Create.of(data))
.apply(ParDo.of(new Stamp()))
.apply(sink);
http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
index 601e2c8..478ecd1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
@@ -98,13 +98,13 @@ public class PubsubUnboundedSourceTest {
PubsubUnboundedSource<String> source =
new PubsubUnboundedSource<>(
clock, factory, null, null, StaticValueProvider.of(SUBSCRIPTION),
- StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL);
+ StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, null);
primSource = new PubsubSource<>(source);
}
private void setupOneMessage() {
setupOneMessage(ImmutableList.of(
- new IncomingMessage(DATA.getBytes(), TIMESTAMP, 0, ACK_ID, RECORD_ID)));
+ new IncomingMessage(DATA.getBytes(), null, TIMESTAMP, 0, ACK_ID, RECORD_ID)));
}
@After
@@ -217,7 +217,7 @@ public class PubsubUnboundedSourceTest {
for (int i = 0; i < 2; i++) {
String data = String.format("data_%d", i);
String ackid = String.format("ackid_%d", i);
- incoming.add(new IncomingMessage(data.getBytes(), TIMESTAMP, 0, ackid, RECORD_ID));
+ incoming.add(new IncomingMessage(data.getBytes(), null, TIMESTAMP, 0, ackid, RECORD_ID));
}
setupOneMessage(incoming);
PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
@@ -275,7 +275,7 @@ public class PubsubUnboundedSourceTest {
dataToMessageNum.put(data, messageNum);
String recid = String.format("recordid_%d", messageNum);
String ackId = String.format("ackid_%d", messageNum);
- incoming.add(new IncomingMessage(data.getBytes(), messageNumToTimestamp(messageNum), 0,
+ incoming.add(new IncomingMessage(data.getBytes(), null, messageNumToTimestamp(messageNum), 0,
ackId, recid));
}
setupOneMessage(incoming);
@@ -337,6 +337,7 @@ public class PubsubUnboundedSourceTest {
null,
StringUtf8Coder.of(),
null,
+ null,
null);
assertThat(source.getSubscription(), nullValue());
@@ -367,6 +368,7 @@ public class PubsubUnboundedSourceTest {
null,
StringUtf8Coder.of(),
null,
+ null,
null);
assertThat(source.getSubscription(), nullValue());
http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
index 9767cde..6d4cf4e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
@@ -42,6 +42,7 @@ import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
@@ -76,6 +77,8 @@ public class PubsubGrpcClientTest {
private static final String DATA = "testData";
private static final String RECORD_ID = "testRecordId";
private static final String ACK_ID = "testAckId";
+ private static final Map<String, String> ATTRIBUTES =
+ ImmutableMap.<String, String>builder().put("a", "b").put("c", "d").build();
@Before
public void setup() {
@@ -111,6 +114,7 @@ public class PubsubGrpcClientTest {
.setData(
ByteString.copyFrom(DATA.getBytes()))
.setPublishTime(timestamp)
+ .putAllAttributes(ATTRIBUTES)
.putAllAttributes(
ImmutableMap.of(TIMESTAMP_LABEL,
String.valueOf(MESSAGE_TIME),
@@ -160,6 +164,7 @@ public class PubsubGrpcClientTest {
PubsubMessage expectedPubsubMessage =
PubsubMessage.newBuilder()
.setData(ByteString.copyFrom(DATA.getBytes()))
+ .putAllAttributes(ATTRIBUTES)
.putAllAttributes(
ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
ID_LABEL, RECORD_ID))
@@ -190,7 +195,8 @@ public class PubsubGrpcClientTest {
.build()
.start();
try {
- OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME, RECORD_ID);
+ OutgoingMessage actualMessage = new OutgoingMessage(
+ DATA.getBytes(), ATTRIBUTES, MESSAGE_TIME, RECORD_ID);
int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
assertEquals(1, n);
assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived));
http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
index 72fb9a2..17e1870 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
@@ -125,7 +125,8 @@ public class PubsubJsonClientTest {
.publish(expectedTopic, expectedRequest)
.execute()))
.thenReturn(expectedResponse);
- OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME, RECORD_ID);
+ OutgoingMessage actualMessage = new OutgoingMessage(
+ DATA.getBytes(), null, MESSAGE_TIME, RECORD_ID);
int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
assertEquals(1, n);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
index b9b1d3f..a1b7daf 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
@@ -61,7 +61,7 @@ public class PubsubTestClientTest {
}
};
IncomingMessage expectedIncomingMessage =
- new IncomingMessage(DATA.getBytes(), MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID);
+ new IncomingMessage(DATA.getBytes(), null, MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID);
try (PubsubTestClientFactory factory =
PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S,
Lists.newArrayList(expectedIncomingMessage))) {
@@ -100,7 +100,7 @@ public class PubsubTestClientTest {
@Test
public void publishOneMessage() throws IOException {
OutgoingMessage expectedOutgoingMessage =
- new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME, MESSAGE_ID);
+ new OutgoingMessage(DATA.getBytes(), null, MESSAGE_TIME, MESSAGE_ID);
try (PubsubTestClientFactory factory =
PubsubTestClient.createFactoryForPublish(
TOPIC,
[3/3] beam git commit: Closes #1358
Posted by dh...@apache.org.
Closes #1358
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c6e46b65
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c6e46b65
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c6e46b65
Branch: refs/heads/master
Commit: c6e46b655f3a3a1c6003e1bccdb52ae50efa2882
Parents: a681037 f032fac
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jan 20 14:57:57 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jan 20 14:57:57 2017 -0800
----------------------------------------------------------------------
.../beam/examples/complete/game/GameStats.java | 5 +-
.../examples/complete/game/LeaderBoard.java | 5 +-
.../beam/runners/dataflow/DataflowRunner.java | 30 +-
.../java/org/apache/beam/sdk/io/PubsubIO.java | 783 +++++++++----------
.../apache/beam/sdk/io/PubsubUnboundedSink.java | 72 +-
.../beam/sdk/io/PubsubUnboundedSource.java | 67 +-
.../apache/beam/sdk/transforms/GroupByKey.java | 2 +-
.../org/apache/beam/sdk/util/PropertyNames.java | 1 +
.../org/apache/beam/sdk/util/PubsubClient.java | 28 +-
.../apache/beam/sdk/util/PubsubGrpcClient.java | 6 +-
.../apache/beam/sdk/util/PubsubJsonClient.java | 4 +-
.../apache/beam/sdk/util/PubsubTestClient.java | 6 +-
.../org/apache/beam/sdk/io/PubsubIOTest.java | 41 +-
.../beam/sdk/io/PubsubUnboundedSinkTest.java | 41 +-
.../beam/sdk/io/PubsubUnboundedSourceTest.java | 10 +-
.../beam/sdk/util/PubsubGrpcClientTest.java | 8 +-
.../beam/sdk/util/PubsubJsonClientTest.java | 3 +-
.../beam/sdk/util/PubsubTestClientTest.java | 4 +-
18 files changed, 643 insertions(+), 473 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: Add PubSub attributes support to PubsubIO.
Posted by dh...@apache.org.
Add PubSub attributes support to PubsubIO.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f032facb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f032facb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f032facb
Branch: refs/heads/master
Commit: f032facb4889f9183f8c6bfeda9fee9c0e4b7979
Parents: a681037
Author: Reuven Lax <re...@relax-macbookpro.roam.corp.google.com>
Authored: Sun Nov 13 20:29:31 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jan 20 14:57:56 2017 -0800
----------------------------------------------------------------------
.../beam/examples/complete/game/GameStats.java | 5 +-
.../examples/complete/game/LeaderBoard.java | 5 +-
.../beam/runners/dataflow/DataflowRunner.java | 30 +-
.../java/org/apache/beam/sdk/io/PubsubIO.java | 783 +++++++++----------
.../apache/beam/sdk/io/PubsubUnboundedSink.java | 72 +-
.../beam/sdk/io/PubsubUnboundedSource.java | 67 +-
.../apache/beam/sdk/transforms/GroupByKey.java | 2 +-
.../org/apache/beam/sdk/util/PropertyNames.java | 1 +
.../org/apache/beam/sdk/util/PubsubClient.java | 28 +-
.../apache/beam/sdk/util/PubsubGrpcClient.java | 6 +-
.../apache/beam/sdk/util/PubsubJsonClient.java | 4 +-
.../apache/beam/sdk/util/PubsubTestClient.java | 6 +-
.../org/apache/beam/sdk/io/PubsubIOTest.java | 41 +-
.../beam/sdk/io/PubsubUnboundedSinkTest.java | 41 +-
.../beam/sdk/io/PubsubUnboundedSourceTest.java | 10 +-
.../beam/sdk/util/PubsubGrpcClientTest.java | 8 +-
.../beam/sdk/util/PubsubJsonClientTest.java | 3 +-
.../beam/sdk/util/PubsubTestClientTest.java | 4 +-
18 files changed, 643 insertions(+), 473 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 74f1b30..c880061 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -24,6 +24,7 @@ import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
@@ -252,7 +253,9 @@ public class GameStats extends LeaderBoard {
// Read Events from Pub/Sub using custom timestamps
PCollection<GameActionInfo> rawEvents = pipeline
- .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
+ .apply(PubsubIO.<String>read()
+ .timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic())
+ .withCoder(StringUtf8Coder.of()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
// Extract username/score pairs from the event stream
http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 519bd5f..35b586b 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -27,6 +27,7 @@ import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
@@ -190,7 +191,9 @@ public class LeaderBoard extends HourlyTeamScore {
// Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub
// data elements, and parse the data.
PCollection<GameActionInfo> gameEvents = pipeline
- .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
+ .apply(PubsubIO.<String>read()
+ .timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic())
+ .withCoder(StringUtf8Coder.of()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
gameEvents.apply("CalculateTeamScores",
http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index d21da59..5fdbc83 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -21,6 +21,8 @@ import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
+import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
import static org.apache.beam.sdk.util.WindowedValue.valueInEmptyWindows;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -99,6 +101,7 @@ import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StandardCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.PubsubIO;
@@ -336,8 +339,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
builder.put(Window.Bound.class, AssignWindows.class);
// In streaming mode must use either the custom Pubsub unbounded source/sink or
// defer to Windmill's built-in implementation.
- builder.put(PubsubIO.Read.Bound.PubsubBoundedReader.class, UnsupportedIO.class);
- builder.put(PubsubIO.Write.Bound.PubsubBoundedWriter.class, UnsupportedIO.class);
+ builder.put(PubsubIO.Read.PubsubBoundedReader.class, UnsupportedIO.class);
+ builder.put(PubsubIO.Write.PubsubBoundedWriter.class, UnsupportedIO.class);
if (options.getExperiments() == null
|| !options.getExperiments().contains("enable_custom_pubsub_source")) {
builder.put(PubsubUnboundedSource.class, StreamingPubsubIORead.class);
@@ -2149,6 +2152,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
if (overriddenTransform.getIdLabel() != null) {
stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
}
+ if (overriddenTransform.getWithAttributesParseFn() != null) {
+ stepContext.addInput(
+ PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN,
+ byteArrayToJsonString(
+ serializeToByteArray(overriddenTransform.getWithAttributesParseFn())));
+ }
stepContext.addOutput(context.getOutput(transform));
}
}
@@ -2218,8 +2227,17 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
if (overriddenTransform.getIdLabel() != null) {
stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
}
- stepContext.addEncodingInput(
- WindowedValue.getValueOnlyCoder(overriddenTransform.getElementCoder()));
+ if (overriddenTransform.getFormatFn() != null) {
+ stepContext.addInput(
+ PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN,
+ byteArrayToJsonString(serializeToByteArray(overriddenTransform.getFormatFn())));
+ // No coder is needed in this case since the formatFn formats directly into a byte[],
+ // however the Dataflow backend require a coder to be set.
+ stepContext.addEncodingInput(WindowedValue.getValueOnlyCoder(VoidCoder.of()));
+ } else if (overriddenTransform.getElementCoder() != null) {
+ stepContext.addEncodingInput(WindowedValue.getValueOnlyCoder(
+ overriddenTransform.getElementCoder()));
+ }
stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
}
}
@@ -2738,7 +2756,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
*/
@SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
public UnsupportedIO(DataflowRunner runner,
- PubsubIO.Read.Bound<?>.PubsubBoundedReader doFn) {
+ PubsubIO.Read<?>.PubsubBoundedReader doFn) {
this.doFn = doFn;
}
@@ -2747,7 +2765,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
*/
@SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
public UnsupportedIO(DataflowRunner runner,
- PubsubIO.Write.Bound<?>.PubsubBoundedWriter doFn) {
+ PubsubIO.Write<?>.PubsubBoundedWriter doFn) {
this.doFn = doFn;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 9b5edd1..2802871 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.Strings;
@@ -24,11 +25,11 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.options.ValueProvider;
@@ -40,6 +41,7 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.util.CoderUtils;
@@ -74,9 +76,6 @@ public class PubsubIO {
/** Factory for creating pubsub client to manage transport. */
private static final PubsubClient.PubsubClientFactory FACTORY = PubsubJsonClient.FACTORY;
- /** The default {@link Coder} used to translate to/from Cloud Pub/Sub messages. */
- public static final Coder<String> DEFAULT_PUBSUB_CODER = StringUtf8Coder.of();
-
/**
* Project IDs must contain 6-63 lowercase letters, digits, or dashes.
* IDs must start with a letter and may not end with a dash.
@@ -154,6 +153,43 @@ public class PubsubIO {
}
/**
+ * Class representing a Pub/Sub message. Each message contains a single message payload and
+ * a map of attached attributes.
+ */
+ public static class PubsubMessage {
+ private byte[] message;
+ private Map<String, String> attributes;
+
+ public PubsubMessage(byte[] message, Map<String, String> attributes) {
+ this.message = message;
+ this.attributes = attributes;
+ }
+
+ /**
+ * Returns the main PubSub message.
+ */
+ public byte[] getMessage() {
+ return message;
+ }
+
+ /**
+ * Returns the given attribute value. If not such attribute exists, returns null.
+ */
+ @Nullable
+ public String getAttribute(String attribute) {
+ checkNotNull(attribute, "attribute");
+ return attributes.get(attribute);
+ }
+
+ /**
+ * Returns the full map of attributes. This is an unmodifiable map.
+ */
+ public Map<String, String> getAttributeMap() {
+ return attributes;
+ }
+ }
+
+ /**
* Class representing a Cloud Pub/Sub Subscription.
*/
public static class PubsubSubscription implements Serializable {
@@ -417,6 +453,14 @@ public class PubsubIO {
}
}
+ public static <T> Read<T> read() {
+ return new Read<>();
+ }
+
+ public static <T> Write<T> write() {
+ return new Write<>();
+ }
+
/**
* A {@link PTransform} that continuously reads from a Cloud Pub/Sub stream and
* returns a {@link PCollection} of {@link String Strings} containing the items from
@@ -424,140 +468,10 @@ public class PubsubIO {
*
* <p>When running with a {@link PipelineRunner} that only supports bounded
* {@link PCollection PCollections}, only a bounded portion of the input Pub/Sub stream
- * can be processed. As such, either {@link Bound#maxNumRecords(int)} or
- * {@link Bound#maxReadTime(Duration)} must be set.
+ * can be processed. As such, either {@link PubsubIO.Read#maxNumRecords(int)} or
+ * {@link PubsubIO.Read#maxReadTime(Duration)} must be set.
*/
- public static class Read {
-
- /**
- * Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive
- * with {@link #subscription(String)}.
- *
- * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format
- * of the {@code topic} string.
- *
- * <p>The Beam runner will start reading data published on this topic from the time the pipeline
- * is started. Any data published on the topic before the pipeline is started will not be read
- * by the runner.
- */
- public static Bound<String> topic(String topic) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).topic(StaticValueProvider.of(topic));
- }
-
- /**
- * Like {@code topic()} but with a {@link ValueProvider}.
- */
- public static Bound<String> topic(ValueProvider<String> topic) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).topic(topic);
- }
-
- /**
- * Creates and returns a transform for reading from a specific Cloud Pub/Sub subscription.
- * Mutually exclusive with {@link #topic(String)}.
- *
- * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format
- * of the {@code subscription} string.
- */
- public static Bound<String> subscription(String subscription) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).subscription(StaticValueProvider.of(subscription));
- }
-
- /**
- * Like {@code topic()} but with a {@link ValueProvider}.
- */
- public static Bound<String> subscription(ValueProvider<String> subscription) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).subscription(subscription);
- }
-
- /**
- * Creates and returns a transform reading from Cloud Pub/Sub where record timestamps are
- * expected to be provided as Pub/Sub message attributes. The {@code timestampLabel}
- * parameter specifies the name of the attribute that contains the timestamp.
- *
- * <p>The timestamp value is expected to be represented in the attribute as either:
- *
- * <ul>
- * <li>a numerical value representing the number of milliseconds since the Unix epoch. For
- * example, if using the Joda time classes, {@link Instant#getMillis()} returns the correct
- * value for this attribute.
- * <li>a String in RFC 3339 format. For example, {@code 2015-10-29T23:41:41.123Z}. The
- * sub-second component of the timestamp is optional, and digits beyond the first three
- * (i.e., time units smaller than milliseconds) will be ignored.
- * </ul>
- *
- * <p>If {@code timestampLabel} is not provided, the system will generate record timestamps
- * the first time it sees each record. All windowing will be done relative to these timestamps.
- *
- * <p>By default, windows are emitted based on an estimate of when this source is likely
- * done producing data for a given timestamp (referred to as the Watermark; see
- * {@link AfterWatermark} for more details). Any late data will be handled by the trigger
- * specified with the windowing strategy – by default it will be output immediately.
- *
- * <p>Note that the system can guarantee that no late data will ever be seen when it assigns
- * timestamps by arrival time (i.e. {@code timestampLabel} is not provided).
- *
- * @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
- */
- public static Bound<String> timestampLabel(String timestampLabel) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).timestampLabel(timestampLabel);
- }
-
- /**
- * Creates and returns a transform for reading from Cloud Pub/Sub where unique record
- * identifiers are expected to be provided as Pub/Sub message attributes. The {@code idLabel}
- * parameter specifies the attribute name. The value of the attribute can be any string
- * that uniquely identifies this record.
- *
- * <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream.
- * If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will
- * be delivered, and deduplication of the stream will be strictly best effort.
- */
- public static Bound<String> idLabel(String idLabel) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).idLabel(idLabel);
- }
-
- /**
- * Creates and returns a transform for reading from Cloud Pub/Sub that uses the given
- * {@link Coder} to decode Pub/Sub messages into a value of type {@code T}.
- *
- * <p>By default, uses {@link StringUtf8Coder}, which just
- * returns the text lines as Java strings.
- *
- * @param <T> the type of the decoded elements, and the elements
- * of the resulting PCollection.
- */
- public static <T> Bound<T> withCoder(Coder<T> coder) {
- return new Bound<>(coder);
- }
-
- /**
- * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
- * records that will be read. The transform produces a <i>bounded</i> {@link PCollection}.
- *
- * <p>Either this option or {@link #maxReadTime(Duration)} must be set in order to create a
- * bounded source.
- */
- public static Bound<String> maxNumRecords(int maxNumRecords) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).maxNumRecords(maxNumRecords);
- }
-
- /**
- * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
- * duration during which records will be read. The transform produces a <i>bounded</i>
- * {@link PCollection}.
- *
- * <p>Either this option or {@link #maxNumRecords(int)} must be set in order to create a bounded
- * source.
- */
- public static Bound<String> maxReadTime(Duration maxReadTime) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).maxReadTime(maxReadTime);
- }
-
- /**
- * A {@link PTransform} that reads from a Cloud Pub/Sub source and returns
- * a unbounded {@link PCollection} containing the items from the stream.
- */
- public static class Bound<T> extends PTransform<PBegin, PCollection<T>> {
+ public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
/** The Cloud Pub/Sub topic to read from. */
@Nullable private final ValueProvider<PubsubTopic> topic;
@@ -579,13 +493,17 @@ public class PubsubIO {
/** Stop after reading for this much time. */
@Nullable private final Duration maxReadTime;
- private Bound(Coder<T> coder) {
- this(null, null, null, null, coder, null, 0, null);
+ /** User function for parsing PubsubMessage object. */
+ SimpleFunction<PubsubMessage, T> parseFn;
+
+ private Read() {
+ this(null, null, null, null, null, null, 0, null, null);
}
- private Bound(String name, ValueProvider<PubsubSubscription> subscription,
+ private Read(String name, ValueProvider<PubsubSubscription> subscription,
ValueProvider<PubsubTopic> topic, String timestampLabel, Coder<T> coder,
- String idLabel, int maxNumRecords, Duration maxReadTime) {
+ String idLabel, int maxNumRecords, Duration maxReadTime,
+ SimpleFunction<PubsubMessage, T> parseFn) {
super(name);
this.subscription = subscription;
this.topic = topic;
@@ -594,6 +512,7 @@ public class PubsubIO {
this.idLabel = idLabel;
this.maxNumRecords = maxNumRecords;
this.maxReadTime = maxReadTime;
+ this.parseFn = parseFn;
}
/**
@@ -609,104 +528,152 @@ public class PubsubIO {
*
* <p>Does not modify this object.
*/
- public Bound<T> subscription(String subscription) {
+ public Read<T> subscription(String subscription) {
return subscription(StaticValueProvider.of(subscription));
}
/**
* Like {@code subscription()} but with a {@link ValueProvider}.
*/
- public Bound<T> subscription(ValueProvider<String> subscription) {
+ public Read<T> subscription(ValueProvider<String> subscription) {
if (subscription.isAccessible()) {
// Validate.
PubsubSubscription.fromPath(subscription.get());
}
- return new Bound<>(name,
+ return new Read<>(name,
NestedValueProvider.of(subscription, new SubscriptionTranslator()),
- topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
+ topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn);
}
/**
- * Returns a transform that's like this one but that reads from the specified topic.
+ * Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive
+ * with {@link #subscription(String)}.
*
- * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the
- * format of the {@code topic} string.
+ * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format
+ * of the {@code topic} string.
*
- * <p>Does not modify this object.
+ * <p>The Beam runner will start reading data published on this topic from the time the
+ * pipeline is started. Any data published on the topic before the pipeline is started will
+ * not be read by the runner.
*/
- public Bound<T> topic(String topic) {
+ public Read<T> topic(String topic) {
return topic(StaticValueProvider.of(topic));
}
/**
* Like {@code topic()} but with a {@link ValueProvider}.
*/
- public Bound<T> topic(ValueProvider<String> topic) {
+ public Read<T> topic(ValueProvider<String> topic) {
if (topic.isAccessible()) {
// Validate.
- PubsubTopic.fromPath(topic.get());
+ PubsubTopic.fromPath(topic.get());
}
- return new Bound<>(name, subscription,
+ return new Read<>(name, subscription,
NestedValueProvider.of(topic, new TopicTranslator()),
- timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
+ timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn);
}
/**
- * Returns a transform that's like this one but that reads message timestamps
- * from the given message attribute. See {@link PubsubIO.Read#timestampLabel(String)} for
- * more details on the format of the timestamp attribute.
+ * Creates and returns a transform reading from Cloud Pub/Sub where record timestamps are
+ * expected to be provided as Pub/Sub message attributes. The {@code timestampLabel}
+ * parameter specifies the name of the attribute that contains the timestamp.
*
- * <p>Does not modify this object.
+ * <p>The timestamp value is expected to be represented in the attribute as either:
+ *
+ * <ul>
+ * <li>a numerical value representing the number of milliseconds since the Unix epoch. For
+ * example, if using the Joda time classes, {@link Instant#getMillis()} returns the correct
+ * value for this attribute.
+ * <li>a String in RFC 3339 format. For example, {@code 2015-10-29T23:41:41.123Z}. The
+ * sub-second component of the timestamp is optional, and digits beyond the first three
+ * (i.e., time units smaller than milliseconds) will be ignored.
+ * </ul>
+ *
+ * <p>If {@code timestampLabel} is not provided, the system will generate record timestamps
+ * the first time it sees each record. All windowing will be done relative to these
+ * timestamps.
+ *
+ * <p>By default, windows are emitted based on an estimate of when this source is likely
+ * done producing data for a given timestamp (referred to as the Watermark; see
+ * {@link AfterWatermark} for more details). Any late data will be handled by the trigger
+ * specified with the windowing strategy – by default it will be output immediately.
+ *
+ * <p>Note that the system can guarantee that no late data will ever be seen when it assigns
+ * timestamps by arrival time (i.e. {@code timestampLabel} is not provided).
+ *
+ * @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
*/
- public Bound<T> timestampLabel(String timestampLabel) {
- return new Bound<>(
- name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
+ public Read<T> timestampLabel(String timestampLabel) {
+ return new Read<>(
+ name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+ parseFn);
}
- /**
- * Returns a transform that's like this one but that reads unique message IDs
- * from the given message attribute. See {@link PubsubIO.Read#idLabel(String)} for more
- * details on the format of the ID attribute.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> idLabel(String idLabel) {
- return new Bound<>(
- name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
+ /**
+ * Creates and returns a transform for reading from Cloud Pub/Sub where unique record
+ * identifiers are expected to be provided as Pub/Sub message attributes. The {@code idLabel}
+ * parameter specifies the attribute name. The value of the attribute can be any string
+ * that uniquely identifies this record.
+ *
+ * <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream.
+ * If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will
+ * be delivered, and deduplication of the stream will be strictly best effort.
+ */
+ public Read<T> idLabel(String idLabel) {
+ return new Read<>(
+ name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+ parseFn);
}
/**
* Returns a transform that's like this one but that uses the given
- * {@link Coder} to decode each record into a value of type {@code X}.
+ * {@link Coder} to decode each record into a value of type {@code T}.
*
* <p>Does not modify this object.
- *
- * @param <X> the type of the decoded elements, and the
- * elements of the resulting PCollection.
*/
- public <X> Bound<X> withCoder(Coder<X> coder) {
- return new Bound<>(
- name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
+ public Read<T> withCoder(Coder<T> coder) {
+ return new Read<>(
+ name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+ parseFn);
}
/**
- * Returns a transform that's like this one but will only read up to the specified
- * maximum number of records from Cloud Pub/Sub. The transform produces a <i>bounded</i>
- * {@link PCollection}. See {@link PubsubIO.Read#maxNumRecords(int)} for more details.
+ * Causes the source to return a PubsubMessage that includes Pubsub attributes.
+ * The user must supply a parsing function to transform the PubsubMessage into an output type.
+ * A Coder for the output type T must be registered or set on the output via
+ * {@link PCollection#setCoder(Coder)}.
*/
- public Bound<T> maxNumRecords(int maxNumRecords) {
- return new Bound<>(
- name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
+ public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) {
+ return new Read<T>(
+ name, subscription, topic, timestampLabel, coder, idLabel,
+ maxNumRecords, maxReadTime, parseFn);
}
- /**
- * Returns a transform that's like this one but will only read during the specified
- * duration from Cloud Pub/Sub. The transform produces a <i>bounded</i> {@link PCollection}.
- * See {@link PubsubIO.Read#maxReadTime(Duration)} for more details.
- */
- public Bound<T> maxReadTime(Duration maxReadTime) {
- return new Bound<>(
- name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
+ /**
+ * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
+ * records that will be read. The transform produces a <i>bounded</i> {@link PCollection}.
+ *
+ * <p>Either this option or {@link #maxReadTime(Duration)} must be set in order to create a
+ * bounded source.
+ */
+ public Read<T> maxNumRecords(int maxNumRecords) {
+ return new Read<>(
+ name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+ parseFn);
+ }
+
+ /**
+ * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
+ * duration during which records will be read. The transform produces a <i>bounded</i>
+ * {@link PCollection}.
+ *
+ * <p>Either this option or {@link #maxNumRecords(int)} must be set in order to create a
+ * bounded source.
+ */
+ public Read<T> maxReadTime(Duration maxReadTime) {
+ return new Read<>(
+ name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+ parseFn);
}
@Override
@@ -719,6 +686,10 @@ public class PubsubIO {
throw new IllegalStateException("Can't set both the topic and the subscription for "
+ "a PubsubIO.Read transform");
}
+ if (coder == null) {
+ throw new IllegalStateException("PubsubIO.Read requires that a coder be set using "
+ + "the withCoder method.");
+ }
boolean boundedOutput = getMaxNumRecords() > 0 || getMaxReadTime() != null;
@@ -739,7 +710,7 @@ public class PubsubIO {
return input.getPipeline().begin()
.apply(new PubsubUnboundedSource<T>(
FACTORY, projectPath, topicPath, subscriptionPath,
- coder, timestampLabel, idLabel));
+ coder, timestampLabel, idLabel, parseFn));
}
}
@@ -767,43 +738,78 @@ public class PubsubIO {
return coder;
}
+ /**
+ * Get the topic being read from.
+ */
public PubsubTopic getTopic() {
return topic == null ? null : topic.get();
}
+ /**
+ * Get the {@link ValueProvider} for the topic being read from.
+ */
public ValueProvider<PubsubTopic> getTopicProvider() {
return topic;
}
+ /**
+ * Get the subscription being read from.
+ */
public PubsubSubscription getSubscription() {
return subscription == null ? null : subscription.get();
}
+ /**
+ * Get the {@link ValueProvider} for the subscription being read from.
+ */
public ValueProvider<PubsubSubscription> getSubscriptionProvider() {
return subscription;
}
+ /**
+ * Get the timestamp label.
+ */
public String getTimestampLabel() {
return timestampLabel;
}
- public Coder<T> getCoder() {
- return coder;
- }
-
+ /**
+ * Get the id label.
+ */
public String getIdLabel() {
return idLabel;
}
+
+ /**
+ * Get the {@link Coder} used for the transform's output.
+ */
+ public Coder<T> getCoder() {
+ return coder;
+ }
+
+ /**
+ * Get the maximum number of records to read.
+ */
public int getMaxNumRecords() {
return maxNumRecords;
}
+ /**
+ * Get the maximum read time.
+ */
public Duration getMaxReadTime() {
return maxReadTime;
}
/**
+ * Get the parse function used for PubSub attributes.
+ */
+ public SimpleFunction<PubsubMessage, T> getPubSubMessageParseFn() {
+ return parseFn;
+ }
+
+ /**
* Default reader when Pubsub subscription has some form of upper bound.
*
* <p>TODO: Consider replacing with BoundedReadFromUnboundedSource on top
@@ -891,51 +897,79 @@ public class PubsubIO {
}
for (IncomingMessage message : messages) {
- c.outputWithTimestamp(
- CoderUtils.decodeFromByteArray(getCoder(), message.elementBytes),
- new Instant(message.timestampMsSinceEpoch));
+ T element = null;
+ if (parseFn != null) {
+ element = parseFn.apply(new PubsubMessage(
+ message.elementBytes, message.attributes));
+ } else {
+ element = CoderUtils.decodeFromByteArray(getCoder(), message.elementBytes);
+ }
+ c.outputWithTimestamp(element, new Instant(message.timestampMsSinceEpoch));
}
}
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(Bound.this);
+ builder.delegate(Read.this);
}
}
}
- /** Disallow construction of utility class. */
- private Read() {}
- }
-
/////////////////////////////////////////////////////////////////////////////
/** Disallow construction of utility class. */
private PubsubIO() {}
+
/**
- * A {@link PTransform} that continuously writes a
- * {@link PCollection} of {@link String Strings} to a Cloud Pub/Sub stream.
+ * A {@link PTransform} that writes an unbounded {@link PCollection} of {@link String Strings}
+ * to a Cloud Pub/Sub stream.
*/
- // TODO: Support non-String encodings.
- public static class Write {
+ public static class Write<T> extends PTransform<PCollection<T>, PDone> {
+ /** The Cloud Pub/Sub topic to publish to. */
+ @Nullable private final ValueProvider<PubsubTopic> topic;
+ /** The name of the message attribute to publish message timestamps in. */
+ @Nullable private final String timestampLabel;
+ /** The name of the message attribute to publish unique message IDs in. */
+ @Nullable private final String idLabel;
+ /** The input type Coder. */
+ private final Coder<T> coder;
+ /** The format function for input PubsubMessage objects. */
+ SimpleFunction<T, PubsubMessage> formatFn;
+
+ private Write() {
+ this(null, null, null, null, null, null);
+ }
+
+ private Write(
+ String name, ValueProvider<PubsubTopic> topic, String timestampLabel,
+ String idLabel, Coder<T> coder, SimpleFunction<T, PubsubMessage> formatFn) {
+ super(name);
+ this.topic = topic;
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ this.coder = coder;
+ this.formatFn = formatFn;
+ }
+
/**
* Creates a transform that publishes to the specified topic.
*
* <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
* {@code topic} string.
*/
- public static Bound<String> topic(String topic) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).topic(StaticValueProvider.of(topic));
+ public Write<T> topic(String topic) {
+ return topic(StaticValueProvider.of(topic));
}
/**
* Like {@code topic()} but with a {@link ValueProvider}.
*/
- public static Bound<String> topic(ValueProvider<String> topic) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).topic(topic);
+ public Write<T> topic(ValueProvider<String> topic) {
+ return new Write<>(name, NestedValueProvider.of(topic, new TopicTranslator()),
+ timestampLabel, idLabel, coder, formatFn);
}
/**
@@ -948,8 +982,8 @@ public class PubsubIO {
* {@link PubsubIO.Read#timestampLabel(String)} can be used to ensure the other source reads
* these timestamps from the appropriate attribute.
*/
- public static Bound<String> timestampLabel(String timestampLabel) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).timestampLabel(timestampLabel);
+ public Write<T> timestampLabel(String timestampLabel) {
+ return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
}
/**
@@ -961,221 +995,172 @@ public class PubsubIO {
* {@link PubsubIO.Read#idLabel(String)} can be used to ensure that* the other source reads
* these unique identifiers from the appropriate attribute.
*/
- public static Bound<String> idLabel(String idLabel) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).idLabel(idLabel);
+ public Write<T> idLabel(String idLabel) {
+ return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
}
/**
- * Creates a transform that uses the given {@link Coder} to encode each of the
- * elements of the input collection into an output message.
- *
- * <p>By default, uses {@link StringUtf8Coder}, which writes input Java strings directly as
- * records.
+ * Returns a new transform that's like this one
+ * but that uses the given {@link Coder} to encode each of
+ * the elements of the input {@link PCollection} into an
+ * output record.
*
- * @param <T> the type of the elements of the input PCollection
+ * <p>Does not modify this object.
*/
- public static <T> Bound<T> withCoder(Coder<T> coder) {
- return new Bound<>(coder);
+ public Write<T> withCoder(Coder<T> coder) {
+ return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
}
/**
- * A {@link PTransform} that writes an unbounded {@link PCollection} of {@link String Strings}
- * to a Cloud Pub/Sub stream.
+ * Used to write a PubSub message together with PubSub attributes. The user-supplied format
+ * function translates the input type T to a PubsubMessage object, which is used by the sink
+ * to separately set the PubSub message's payload and attributes.
*/
- public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
- /** The Cloud Pub/Sub topic to publish to. */
- @Nullable private final ValueProvider<PubsubTopic> topic;
- /** The name of the message attribute to publish message timestamps in. */
- @Nullable private final String timestampLabel;
- /** The name of the message attribute to publish unique message IDs in. */
- @Nullable private final String idLabel;
- private final Coder<T> coder;
-
- private Bound(Coder<T> coder) {
- this(null, null, null, null, coder);
- }
-
- private Bound(
- String name, ValueProvider<PubsubTopic> topic, String timestampLabel,
- String idLabel, Coder<T> coder) {
- super(name);
- this.topic = topic;
- this.timestampLabel = timestampLabel;
- this.idLabel = idLabel;
- this.coder = coder;
- }
-
- /**
- * Returns a new transform that's like this one but that writes to the specified
- * topic.
- *
- * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
- * {@code topic} string.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> topic(String topic) {
- return topic(StaticValueProvider.of(topic));
- }
-
- /**
- * Like {@code topic()} but with a {@link ValueProvider}.
- */
- public Bound<T> topic(ValueProvider<String> topic) {
- return new Bound<>(name, NestedValueProvider.of(topic, new TopicTranslator()),
- timestampLabel, idLabel, coder);
- }
+ public Write<T> withAttributes(SimpleFunction<T, PubsubMessage> formatFn) {
+ return new Write<T>(name, topic, timestampLabel, idLabel, coder, formatFn);
+ }
- /**
- * Returns a new transform that's like this one but that publishes record timestamps
- * to a message attribute with the specified name. See
- * {@link PubsubIO.Write#timestampLabel(String)} for more details.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> timestampLabel(String timestampLabel) {
- return new Bound<>(name, topic, timestampLabel, idLabel, coder);
+ @Override
+ public PDone expand(PCollection<T> input) {
+ if (topic == null) {
+ throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
}
-
- /**
- * Returns a new transform that's like this one but that publishes unique record IDs
- * to a message attribute with the specified name. See {@link PubsubIO.Write#idLabel(String)}
- * for more details.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> idLabel(String idLabel) {
- return new Bound<>(name, topic, timestampLabel, idLabel, coder);
+ switch (input.isBounded()) {
+ case BOUNDED:
+ input.apply(ParDo.of(new PubsubBoundedWriter()));
+ return PDone.in(input.getPipeline());
+ case UNBOUNDED:
+ return input.apply(new PubsubUnboundedSink<T>(
+ FACTORY,
+ NestedValueProvider.of(topic, new TopicPathTranslator()),
+ coder,
+ timestampLabel,
+ idLabel,
+ formatFn,
+ 100 /* numShards */));
}
+ throw new RuntimeException(); // cases are exhaustive.
+ }
- /**
- * Returns a new transform that's like this one
- * but that uses the given {@link Coder} to encode each of
- * the elements of the input {@link PCollection} into an
- * output record.
- *
- * <p>Does not modify this object.
- *
- * @param <X> the type of the elements of the input {@link PCollection}
- */
- public <X> Bound<X> withCoder(Coder<X> coder) {
- return new Bound<>(name, topic, timestampLabel, idLabel, coder);
- }
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
+ }
- @Override
- public PDone expand(PCollection<T> input) {
- if (topic == null) {
- throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
- }
- switch (input.isBounded()) {
- case BOUNDED:
- input.apply(ParDo.of(new PubsubBoundedWriter()));
- return PDone.in(input.getPipeline());
- case UNBOUNDED:
- return input.apply(new PubsubUnboundedSink<T>(
- FACTORY,
- NestedValueProvider.of(topic, new TopicPathTranslator()),
- coder,
- timestampLabel,
- idLabel,
- 100 /* numShards */));
- }
- throw new RuntimeException(); // cases are exhaustive.
- }
+ @Override
+ protected Coder<Void> getDefaultOutputCoder() {
+ return VoidCoder.of();
+ }
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
- }
+ /**
+ * Returns the PubSub topic being read from.
+ */
+ public PubsubTopic getTopic() {
+ return topic.get();
+ }
- @Override
- protected Coder<Void> getDefaultOutputCoder() {
- return VoidCoder.of();
- }
+ /**
+ * Returns the {@link ValueProvider} for the topic being read from.
+ */
+ public ValueProvider<PubsubTopic> getTopicProvider() {
+ return topic;
+ }
- public PubsubTopic getTopic() {
- return topic.get();
- }
+ /**
+ * Returns the timestamp label.
+ */
+ public String getTimestampLabel() {
+ return timestampLabel;
+ }
- public ValueProvider<PubsubTopic> getTopicProvider() {
- return topic;
- }
+ /**
+ * Returns the id label.
+ */
+ public String getIdLabel() {
+ return idLabel;
+ }
- public String getTimestampLabel() {
- return timestampLabel;
- }
+ /**
+ * Returns the output coder.
+ */
+ public Coder<T> getCoder() {
+ return coder;
+ }
- public String getIdLabel() {
- return idLabel;
- }
+ /**
+ * Returns the formatting function used if publishing attributes.
+ */
+ public SimpleFunction<T, PubsubMessage> getFormatFn() {
+ return formatFn;
+ }
- public Coder<T> getCoder() {
- return coder;
+ /**
+ * Writer to Pubsub which batches messages from bounded collections.
+ *
+ * <p>NOTE: This is not the implementation used when running on the Google Cloud Dataflow
+ * service in streaming mode.
+ *
+ * <p>Public so can be suppressed by runners.
+ */
+ public class PubsubBoundedWriter extends DoFn<T, Void> {
+ private static final int MAX_PUBLISH_BATCH_SIZE = 100;
+ private transient List<OutgoingMessage> output;
+ private transient PubsubClient pubsubClient;
+
+ @StartBundle
+ public void startBundle(Context c) throws IOException {
+ this.output = new ArrayList<>();
+ // NOTE: idLabel is ignored.
+ this.pubsubClient =
+ FACTORY.newClient(timestampLabel, null,
+ c.getPipelineOptions().as(PubsubOptions.class));
}
- /**
- * Writer to Pubsub which batches messages from bounded collections.
- *
- * <p>NOTE: This is not the implementation used when running on the Google Cloud Dataflow
- * service in streaming mode.
- *
- * <p>Public so can be suppressed by runners.
- */
- public class PubsubBoundedWriter extends DoFn<T, Void> {
- private static final int MAX_PUBLISH_BATCH_SIZE = 100;
- private transient List<OutgoingMessage> output;
- private transient PubsubClient pubsubClient;
-
- @StartBundle
- public void startBundle(Context c) throws IOException {
- this.output = new ArrayList<>();
- // NOTE: idLabel is ignored.
- this.pubsubClient =
- FACTORY.newClient(timestampLabel, null,
- c.getPipelineOptions().as(PubsubOptions.class));
+ @ProcessElement
+ public void processElement(ProcessContext c) throws IOException {
+ byte[] payload = null;
+ Map<String, String> attributes = null;
+ if (formatFn != null) {
+ PubsubMessage message = formatFn.apply(c.element());
+ payload = message.getMessage();
+ attributes = message.getAttributeMap();
+ } else {
+ payload = CoderUtils.encodeToByteArray(getCoder(), c.element());
}
+ // NOTE: The record id is always null.
+ OutgoingMessage message =
+ new OutgoingMessage(payload, attributes, c.timestamp().getMillis(), null);
+ output.add(message);
- @ProcessElement
- public void processElement(ProcessContext c) throws IOException {
- // NOTE: The record id is always null.
- OutgoingMessage message =
- new OutgoingMessage(CoderUtils.encodeToByteArray(getCoder(), c.element()),
- c.timestamp().getMillis(), null);
- output.add(message);
-
- if (output.size() >= MAX_PUBLISH_BATCH_SIZE) {
- publish();
- }
+ if (output.size() >= MAX_PUBLISH_BATCH_SIZE) {
+ publish();
}
+ }
- @FinishBundle
- public void finishBundle(Context c) throws IOException {
- if (!output.isEmpty()) {
- publish();
- }
- output = null;
- pubsubClient.close();
- pubsubClient = null;
+ @FinishBundle
+ public void finishBundle(Context c) throws IOException {
+ if (!output.isEmpty()) {
+ publish();
}
+ output = null;
+ pubsubClient.close();
+ pubsubClient = null;
+ }
- private void publish() throws IOException {
- int n = pubsubClient.publish(
- PubsubClient.topicPathFromName(getTopic().project, getTopic().topic),
- output);
- checkState(n == output.size());
- output.clear();
- }
+ private void publish() throws IOException {
+ int n = pubsubClient.publish(
+ PubsubClient.topicPathFromName(getTopic().project, getTopic().topic),
+ output);
+ checkState(n == output.size());
+ output.clear();
+ }
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder.delegate(Bound.this);
- }
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.delegate(Write.this);
}
}
-
- /** Disallow construction of utility class. */
- private Write() {}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 75f6b7d..c726fd7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -21,24 +21,30 @@ package org.apache.beam.sdk.io;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
import com.google.common.hash.Hashing;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
+
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.PubsubIO.PubsubMessage;
import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Aggregator;
@@ -46,6 +52,7 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -105,23 +112,27 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
private static class OutgoingMessageCoder extends AtomicCoder<OutgoingMessage> {
private static final NullableCoder<String> RECORD_ID_CODER =
NullableCoder.of(StringUtf8Coder.of());
+ private static final NullableCoder<Map<String, String>> ATTRIBUTES_CODER =
+ NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
@Override
public void encode(
OutgoingMessage value, OutputStream outStream, Context context)
throws CoderException, IOException {
ByteArrayCoder.of().encode(value.elementBytes, outStream, context.nested());
+ ATTRIBUTES_CODER.encode(value.attributes, outStream, context.nested());
BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, context.nested());
- RECORD_ID_CODER.encode(value.recordId, outStream, context);
+ RECORD_ID_CODER.encode(value.recordId, outStream, context.nested());
}
@Override
public OutgoingMessage decode(
InputStream inStream, Context context) throws CoderException, IOException {
byte[] elementBytes = ByteArrayCoder.of().decode(inStream, context.nested());
+ Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context.nested());
long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, context.nested());
- @Nullable String recordId = RECORD_ID_CODER.decode(inStream, context);
- return new OutgoingMessage(elementBytes, timestampMsSinceEpoch, recordId);
+ @Nullable String recordId = RECORD_ID_CODER.decode(inStream, context.nested());
+ return new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, recordId);
}
}
@@ -158,17 +169,29 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
private final Coder<T> elementCoder;
private final int numShards;
private final RecordIdMethod recordIdMethod;
+ private final SimpleFunction<T, PubsubMessage> formatFn;
- ShardFn(Coder<T> elementCoder, int numShards, RecordIdMethod recordIdMethod) {
+ ShardFn(Coder<T> elementCoder, int numShards,
+ SimpleFunction<T, PubsubIO.PubsubMessage> formatFn, RecordIdMethod recordIdMethod) {
this.elementCoder = elementCoder;
this.numShards = numShards;
+ this.formatFn = formatFn;
this.recordIdMethod = recordIdMethod;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
elementCounter.addValue(1L);
- byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element());
+ byte[] elementBytes = null;
+ Map<String, String> attributes = ImmutableMap.<String, String>of();
+ if (formatFn != null) {
+ PubsubIO.PubsubMessage message = formatFn.apply(c.element());
+ elementBytes = message.getMessage();
+ attributes = message.getAttributeMap();
+ } else {
+ elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element());
+ }
+
long timestampMsSinceEpoch = c.timestamp().getMillis();
@Nullable String recordId = null;
switch (recordIdMethod) {
@@ -186,7 +209,8 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
break;
}
c.output(KV.of(ThreadLocalRandom.current().nextInt(numShards),
- new OutgoingMessage(elementBytes, timestampMsSinceEpoch, recordId)));
+ new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch,
+ recordId)));
}
@Override
@@ -365,6 +389,12 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
*/
private final RecordIdMethod recordIdMethod;
+ /**
+ * In order to publish attributes, a formatting function is used to format the output into
+ * a {@link PubsubIO.PubsubMessage}.
+ */
+ private final SimpleFunction<T, PubsubIO.PubsubMessage> formatFn;
+
@VisibleForTesting
PubsubUnboundedSink(
PubsubClientFactory pubsubFactory,
@@ -376,6 +406,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
int publishBatchSize,
int publishBatchBytes,
Duration maxLatency,
+ SimpleFunction<T, PubsubIO.PubsubMessage> formatFn,
RecordIdMethod recordIdMethod) {
this.pubsubFactory = pubsubFactory;
this.topic = topic;
@@ -386,6 +417,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
this.publishBatchSize = publishBatchSize;
this.publishBatchBytes = publishBatchBytes;
this.maxLatency = maxLatency;
+ this.formatFn = formatFn;
this.recordIdMethod = idLabel == null ? RecordIdMethod.NONE : recordIdMethod;
}
@@ -395,30 +427,54 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
Coder<T> elementCoder,
String timestampLabel,
String idLabel,
+ SimpleFunction<T, PubsubIO.PubsubMessage> formatFn,
int numShards) {
this(pubsubFactory, topic, elementCoder, timestampLabel, idLabel, numShards,
DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY,
- RecordIdMethod.RANDOM);
+ formatFn, RecordIdMethod.RANDOM);
}
+ /**
+ * Get the topic being written to.
+ */
public TopicPath getTopic() {
return topic.get();
}
+ /**
+ * Get the {@link ValueProvider} for the topic being written to.
+ */
public ValueProvider<TopicPath> getTopicProvider() {
return topic;
}
+ /**
+ * Get the timestamp label.
+ */
@Nullable
public String getTimestampLabel() {
return timestampLabel;
}
+ /**
+ * Get the id label.
+ */
@Nullable
public String getIdLabel() {
return idLabel;
}
+ /**
+ * Get the format function used for PubSub attributes.
+ */
+ @Nullable
+ public SimpleFunction<T, PubsubIO.PubsubMessage> getFormatFn() {
+ return formatFn;
+ }
+
+ /**
+ * Get the Coder used to encode output elements.
+ */
public Coder<T> getElementCoder() {
return elementCoder;
}
@@ -433,7 +489,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
.plusDelayOf(maxLatency))))
.discardingFiredPanes())
.apply("PubsubUnboundedSink.Shard",
- ParDo.of(new ShardFn<T>(elementCoder, numShards, recordIdMethod)))
+ ParDo.of(new ShardFn<T>(elementCoder, numShards, formatFn, recordIdMethod)))
.setCoder(KvCoder.of(VarIntCoder.of(), CODER))
.apply(GroupByKey.<Integer, OutgoingMessage>create())
.apply("PubsubUnboundedSink.Writer",
http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index 4b3792d..c1f8720 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -50,6 +50,7 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.PubsubIO.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.options.ValueProvider;
@@ -58,6 +59,7 @@ import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -394,6 +396,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
@VisibleForTesting
final SubscriptionPath subscription;
+ private final SimpleFunction<PubsubIO.PubsubMessage, T> parseFn;
+
/**
* Client on which to talk to Pubsub. Null if closed.
*/
@@ -580,10 +584,12 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
/**
* Construct a reader.
*/
- public PubsubReader(PubsubOptions options, PubsubSource<T> outer, SubscriptionPath subscription)
+ public PubsubReader(PubsubOptions options, PubsubSource<T> outer, SubscriptionPath subscription,
+ SimpleFunction<PubsubIO.PubsubMessage, T> parseFn)
throws IOException, GeneralSecurityException {
this.outer = outer;
this.subscription = subscription;
+ this.parseFn = parseFn;
pubsubClient =
outer.outer.pubsubFactory.newClient(outer.outer.timestampLabel, outer.outer.idLabel,
options);
@@ -959,7 +965,12 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
throw new NoSuchElementException();
}
try {
- return CoderUtils.decodeFromByteArray(outer.outer.elementCoder, current.elementBytes);
+ if (parseFn != null) {
+ return parseFn.apply(new PubsubIO.PubsubMessage(
+ current.elementBytes, current.attributes));
+ } else {
+ return CoderUtils.decodeFromByteArray(outer.outer.elementCoder, current.elementBytes);
+ }
} catch (CoderException e) {
throw new RuntimeException("Unable to decode element from Pubsub message: ", e);
}
@@ -1110,7 +1121,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
}
}
try {
- reader = new PubsubReader<>(options.as(PubsubOptions.class), this, subscription);
+ reader = new PubsubReader<>(options.as(PubsubOptions.class), this, subscription,
+ outer.parseFn);
} catch (GeneralSecurityException | IOException e) {
throw new RuntimeException("Unable to subscribe to " + subscriptionPath + ": ", e);
}
@@ -1260,6 +1272,13 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
@Nullable
private final String idLabel;
+ /**
+ * If not {@literal null}, the user is asking for PubSub attributes. This parse function will be
+ * used to parse {@link PubsubIO.PubsubMessage}s containing a payload and attributes.
+ */
+ @Nullable
+ SimpleFunction<PubsubMessage, T> parseFn;
+
@VisibleForTesting
PubsubUnboundedSource(
Clock clock,
@@ -1269,7 +1288,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
@Nullable ValueProvider<SubscriptionPath> subscription,
Coder<T> elementCoder,
@Nullable String timestampLabel,
- @Nullable String idLabel) {
+ @Nullable String idLabel,
+ @Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) {
checkArgument((topic == null) != (subscription == null),
"Exactly one of topic and subscription must be given");
checkArgument((topic == null) == (project == null),
@@ -1282,6 +1302,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
this.elementCoder = checkNotNull(elementCoder);
this.timestampLabel = timestampLabel;
this.idLabel = idLabel;
+ this.parseFn = parseFn;
}
/**
@@ -1294,49 +1315,83 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
@Nullable ValueProvider<SubscriptionPath> subscription,
Coder<T> elementCoder,
@Nullable String timestampLabel,
- @Nullable String idLabel) {
- this(null, pubsubFactory, project, topic, subscription, elementCoder, timestampLabel, idLabel);
+ @Nullable String idLabel,
+ @Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) {
+ this(null, pubsubFactory, project, topic, subscription, elementCoder, timestampLabel, idLabel,
+ parseFn);
}
+ /**
+ * Get the coder used for elements.
+ */
public Coder<T> getElementCoder() {
return elementCoder;
}
+ /**
+ * Get the project path.
+ */
@Nullable
public ProjectPath getProject() {
return project == null ? null : project.get();
}
+ /**
+ * Get the topic being read from.
+ */
@Nullable
public TopicPath getTopic() {
return topic == null ? null : topic.get();
}
+ /**
+ * Get the {@link ValueProvider} for the topic being read from.
+ */
@Nullable
public ValueProvider<TopicPath> getTopicProvider() {
return topic;
}
+ /**
+ * Get the subscription being read from.
+ */
@Nullable
public SubscriptionPath getSubscription() {
return subscription == null ? null : subscription.get();
}
+ /**
+ * Get the {@link ValueProvider} for the subscription being read from.
+ */
@Nullable
public ValueProvider<SubscriptionPath> getSubscriptionProvider() {
return subscription;
}
+ /**
+ * Get the timestamp label.
+ */
@Nullable
public String getTimestampLabel() {
return timestampLabel;
}
+ /**
+ * Get the id label.
+ */
@Nullable
public String getIdLabel() {
return idLabel;
}
+ /**
+ * Get the parsing function for PubSub attributes.
+ */
+ @Nullable
+ public SimpleFunction<PubsubIO.PubsubMessage, T> getWithAttributesParseFn() {
+ return parseFn;
+ }
+
@Override
public PCollection<T> expand(PBegin input) {
return input.getPipeline().begin()
http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index a339af7..1541059 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -102,7 +102,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
* as the input.
*
* <p>If the input {@code PCollection} contains late data (see
- * {@link org.apache.beam.sdk.io.PubsubIO.Read.Bound#timestampLabel}
+ * {@link org.apache.beam.sdk.io.PubsubIO.Read#timestampLabel}
* for an example of how this can occur) or the
* {@link Window#triggering requested TriggerFn} can fire before
* the watermark, then there may be multiple elements
http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
index 49a2b87..ee25448 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
@@ -83,6 +83,7 @@ public class PropertyNames {
public static final String PARALLEL_INPUT = "parallel_input";
public static final String PHASE = "phase";
public static final String PUBSUB_ID_LABEL = "pubsub_id_label";
+ public static final String PUBSUB_SERIALIZED_ATTRIBUTES_FN = "pubsub_serialized_attributes_fn";
public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription";
public static final String PUBSUB_SUBSCRIPTION_OVERRIDE = "pubsub_subscription_runtime_override";
public static final String PUBSUB_TIMESTAMP_LABEL = "pubsub_timestamp_label";
http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
index 06b776b..fc84057 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
@@ -310,6 +310,8 @@ public abstract class PubsubClient implements Closeable {
*/
public final byte[] elementBytes;
+ public final Map<String, String> attributes;
+
/**
* Timestamp for element (ms since epoch).
*/
@@ -322,9 +324,10 @@ public abstract class PubsubClient implements Closeable {
@Nullable
public final String recordId;
- public OutgoingMessage(
- byte[] elementBytes, long timestampMsSinceEpoch, @Nullable String recordId) {
+ public OutgoingMessage(byte[] elementBytes, Map<String, String> attributes,
+ long timestampMsSinceEpoch, @Nullable String recordId) {
this.elementBytes = elementBytes;
+ this.attributes = attributes;
this.timestampMsSinceEpoch = timestampMsSinceEpoch;
this.recordId = recordId;
}
@@ -347,13 +350,15 @@ public abstract class PubsubClient implements Closeable {
OutgoingMessage that = (OutgoingMessage) o;
return timestampMsSinceEpoch == that.timestampMsSinceEpoch
- && Arrays.equals(elementBytes, that.elementBytes)
- && Objects.equal(recordId, that.recordId);
+ && Arrays.equals(elementBytes, that.elementBytes)
+ && Objects.equal(attributes, that.attributes)
+ && Objects.equal(recordId, that.recordId);
}
@Override
public int hashCode() {
- return Objects.hashCode(Arrays.hashCode(elementBytes), timestampMsSinceEpoch, recordId);
+ return Objects.hashCode(Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch,
+ recordId);
}
}
@@ -369,6 +374,8 @@ public abstract class PubsubClient implements Closeable {
*/
public final byte[] elementBytes;
+ public Map<String, String> attributes;
+
/**
* Timestamp for element (ms since epoch). Either Pubsub's processing time,
* or the custom timestamp associated with the message.
@@ -392,11 +399,13 @@ public abstract class PubsubClient implements Closeable {
public IncomingMessage(
byte[] elementBytes,
+ Map<String, String> attributes,
long timestampMsSinceEpoch,
long requestTimeMsSinceEpoch,
String ackId,
String recordId) {
this.elementBytes = elementBytes;
+ this.attributes = attributes;
this.timestampMsSinceEpoch = timestampMsSinceEpoch;
this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
this.ackId = ackId;
@@ -404,8 +413,8 @@ public abstract class PubsubClient implements Closeable {
}
public IncomingMessage withRequestTime(long requestTimeMsSinceEpoch) {
- return new IncomingMessage(elementBytes, timestampMsSinceEpoch, requestTimeMsSinceEpoch,
- ackId, recordId);
+ return new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
+ requestTimeMsSinceEpoch, ackId, recordId);
}
@Override
@@ -429,12 +438,13 @@ public abstract class PubsubClient implements Closeable {
&& requestTimeMsSinceEpoch == that.requestTimeMsSinceEpoch
&& ackId.equals(that.ackId)
&& recordId.equals(that.recordId)
- && Arrays.equals(elementBytes, that.elementBytes);
+ && Arrays.equals(elementBytes, that.elementBytes)
+ && Objects.equal(attributes, that.attributes);
}
@Override
public int hashCode() {
- return Objects.hashCode(Arrays.hashCode(elementBytes), timestampMsSinceEpoch,
+ return Objects.hashCode(Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch,
requestTimeMsSinceEpoch,
ackId, recordId);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
index 201877c..4a6ddac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
@@ -223,6 +223,10 @@ public class PubsubGrpcClient extends PubsubClient {
PubsubMessage.newBuilder()
.setData(ByteString.copyFrom(outgoingMessage.elementBytes));
+ if (outgoingMessage.attributes != null) {
+ message.putAllAttributes(outgoingMessage.attributes);
+ }
+
if (timestampLabel != null) {
message.getMutableAttributes()
.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
@@ -286,7 +290,7 @@ public class PubsubGrpcClient extends PubsubClient {
recordId = pubsubMessage.getMessageId();
}
- incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch,
+ incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
requestTimeMsSinceEpoch, ackId, recordId));
}
return incomingMessages;
http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
index 215a136..6bc104f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
@@ -138,6 +138,8 @@ public class PubsubJsonClient extends PubsubClient {
Map<String, String> attributes = pubsubMessage.getAttributes();
if ((timestampLabel != null || idLabel != null) && attributes == null) {
attributes = new TreeMap<>();
+ }
+ if (attributes != null) {
pubsubMessage.setAttributes(attributes);
}
@@ -201,7 +203,7 @@ public class PubsubJsonClient extends PubsubClient {
recordId = pubsubMessage.getMessageId();
}
- incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch,
+ incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
requestTimeMsSinceEpoch, ackId, recordId));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
index 45477fc..61479f9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.Closeable;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -39,7 +40,7 @@ import org.apache.beam.sdk.options.PubsubOptions;
* testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline}
* methods. Relies on statics to mimic the Pubsub service, though we try to hide that.
*/
-public class PubsubTestClient extends PubsubClient {
+public class PubsubTestClient extends PubsubClient implements Serializable {
/**
* Mimic the state of the simulated Pubsub 'service'.
*
@@ -113,7 +114,8 @@ public class PubsubTestClient extends PubsubClient {
private static final State STATE = new State();
/** Closing the factory will validate all expected messages were processed. */
- public interface PubsubTestClientFactory extends PubsubClientFactory, Closeable {
+ public interface PubsubTestClientFactory
+ extends PubsubClientFactory, Closeable, Serializable {
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index f1bf788..e15562e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import java.util.Set;
+
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -48,19 +50,20 @@ public class PubsubIOTest {
@Test
public void testPubsubIOGetName() {
assertEquals("PubsubIO.Read",
- PubsubIO.Read.topic("projects/myproject/topics/mytopic").getName());
+ PubsubIO.<String>read().topic("projects/myproject/topics/mytopic").getName());
assertEquals("PubsubIO.Write",
- PubsubIO.Write.topic("projects/myproject/topics/mytopic").getName());
+ PubsubIO.<String>write().topic("projects/myproject/topics/mytopic").getName());
}
@Test
public void testTopicValidationSuccess() throws Exception {
- PubsubIO.Read.topic("projects/my-project/topics/abc");
- PubsubIO.Read.topic("projects/my-project/topics/ABC");
- PubsubIO.Read.topic("projects/my-project/topics/AbC-DeF");
- PubsubIO.Read.topic("projects/my-project/topics/AbC-1234");
- PubsubIO.Read.topic("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc");
- PubsubIO.Read.topic(new StringBuilder().append("projects/my-project/topics/A-really-long-one-")
+ PubsubIO.<String>read().topic("projects/my-project/topics/abc");
+ PubsubIO.<String>read().topic("projects/my-project/topics/ABC");
+ PubsubIO.<String>read().topic("projects/my-project/topics/AbC-DeF");
+ PubsubIO.<String>read().topic("projects/my-project/topics/AbC-1234");
+ PubsubIO.<String>read().topic("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc");
+ PubsubIO.<String>read().topic(new StringBuilder()
+ .append("projects/my-project/topics/A-really-long-one-")
.append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
.append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
.append("11111111111111111111111111111111111111111111111111111111111111111111111111")
@@ -70,13 +73,14 @@ public class PubsubIOTest {
@Test
public void testTopicValidationBadCharacter() throws Exception {
thrown.expect(IllegalArgumentException.class);
- PubsubIO.Read.topic("projects/my-project/topics/abc-*-abc");
+ PubsubIO.<String>read().topic("projects/my-project/topics/abc-*-abc");
}
@Test
public void testTopicValidationTooLong() throws Exception {
thrown.expect(IllegalArgumentException.class);
- PubsubIO.Read.topic(new StringBuilder().append("projects/my-project/topics/A-really-long-one-")
+ PubsubIO.<String>read().topic(new StringBuilder().append
+ ("projects/my-project/topics/A-really-long-one-")
.append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
.append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
.append("1111111111111111111111111111111111111111111111111111111111111111111111111111")
@@ -88,7 +92,7 @@ public class PubsubIOTest {
String topic = "projects/project/topics/topic";
String subscription = "projects/project/subscriptions/subscription";
Duration maxReadTime = Duration.standardMinutes(5);
- PubsubIO.Read.Bound<String> read = PubsubIO.Read
+ PubsubIO.Read<String> read = PubsubIO.<String>read()
.topic(StaticValueProvider.of(topic))
.subscription(StaticValueProvider.of(subscription))
.timestampLabel("myTimestamp")
@@ -109,7 +113,7 @@ public class PubsubIOTest {
@Test
public void testNullTopic() {
String subscription = "projects/project/subscriptions/subscription";
- PubsubIO.Read.Bound<String> read = PubsubIO.Read
+ PubsubIO.Read<String> read = PubsubIO.<String>read()
.subscription(StaticValueProvider.of(subscription));
assertNull(read.getTopic());
assertNotNull(read.getSubscription());
@@ -119,7 +123,7 @@ public class PubsubIOTest {
@Test
public void testNullSubscription() {
String topic = "projects/project/topics/topic";
- PubsubIO.Read.Bound<String> read = PubsubIO.Read
+ PubsubIO.Read<String> read = PubsubIO.<String>read()
.topic(StaticValueProvider.of(topic));
assertNotNull(read.getTopic());
assertNull(read.getSubscription());
@@ -130,9 +134,10 @@ public class PubsubIOTest {
@Category(RunnableOnService.class)
public void testPrimitiveReadDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- PubsubIO.Read.Bound<String> read =
- PubsubIO.Read.subscription("projects/project/subscriptions/subscription")
- .maxNumRecords(1);
+ PubsubIO.Read<String> read =
+ PubsubIO.<String>read().subscription("projects/project/subscriptions/subscription")
+ .maxNumRecords(1)
+ .withCoder(StringUtf8Coder.of());
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
assertThat("PubsubIO.Read should include the subscription in its primitive display data",
@@ -142,7 +147,7 @@ public class PubsubIOTest {
@Test
public void testWriteDisplayData() {
String topic = "projects/project/topics/topic";
- PubsubIO.Write.Bound<?> write = PubsubIO.Write
+ PubsubIO.Write<?> write = PubsubIO.<String>write()
.topic(topic)
.timestampLabel("myTimestamp")
.idLabel("myId");
@@ -158,7 +163,7 @@ public class PubsubIOTest {
@Category(RunnableOnService.class)
public void testPrimitiveWriteDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- PubsubIO.Write.Bound<?> write = PubsubIO.Write.topic("projects/project/topics/topic");
+ PubsubIO.Write<?> write = PubsubIO.<String>write().topic("projects/project/topics/topic");
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
assertThat("PubsubIO.Write should include the topic in its primitive display data",