You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/29 22:31:53 UTC
[1/9] beam git commit: Renames {id, timestamp}Label to {id,
timestamp}Attribute throughout SDK
Repository: beam
Updated Branches:
refs/heads/master f5e3f5230 -> 14d60b26e
Renames {id,timestamp}Label to {id,timestamp}Attribute throughout SDK
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8853d53d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8853d53d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8853d53d
Branch: refs/heads/master
Commit: 8853d53d9ffdf6e68c80880f6dd5f2d11a6e451e
Parents: f065114
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 27 17:19:14 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700
----------------------------------------------------------------------
.../beam/examples/complete/game/GameStats.java | 2 +-
.../examples/complete/game/LeaderBoard.java | 2 +-
.../beam/runners/dataflow/DataflowRunner.java | 18 ++---
.../org/apache/beam/sdk/util/PropertyNames.java | 4 +-
.../beam/sdk/io/gcp/pubsub/PubsubClient.java | 42 ++++++-----
.../sdk/io/gcp/pubsub/PubsubGrpcClient.java | 36 +++++-----
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 74 ++++++++++----------
.../sdk/io/gcp/pubsub/PubsubJsonClient.java | 36 +++++-----
.../sdk/io/gcp/pubsub/PubsubTestClient.java | 6 +-
.../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 58 ++++++++-------
.../io/gcp/pubsub/PubsubUnboundedSource.java | 61 +++++++++-------
.../sdk/io/gcp/pubsub/PubsubClientTest.java | 50 ++++++-------
.../sdk/io/gcp/pubsub/PubsubGrpcClientTest.java | 16 +++--
.../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 24 +++----
.../sdk/io/gcp/pubsub/PubsubJsonClientTest.java | 14 ++--
.../io/gcp/pubsub/PubsubUnboundedSinkTest.java | 10 +--
.../gcp/pubsub/PubsubUnboundedSourceTest.java | 6 +-
17 files changed, 238 insertions(+), 221 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index d628497..a46d3c5 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -252,7 +252,7 @@ public class GameStats extends LeaderBoard {
// Read Events from Pub/Sub using custom timestamps
PCollection<GameActionInfo> rawEvents = pipeline
.apply(PubsubIO.readStrings()
- .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
+ .withTimestampAttribute(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
// Extract username/score pairs from the event stream
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index fbffac6..9af34c5 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -191,7 +191,7 @@ public class LeaderBoard extends HourlyTeamScore {
// data elements, and parse the data.
PCollection<GameActionInfo> gameEvents = pipeline
.apply(PubsubIO.readStrings()
- .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
+ .withTimestampAttribute(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
gameEvents.apply("CalculateTeamScores",
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 63c2191..a61fe49 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -922,12 +922,13 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
((NestedValueProvider) overriddenTransform.getSubscriptionProvider()).propertyName());
}
}
- if (overriddenTransform.getTimestampLabel() != null) {
+ if (overriddenTransform.getTimestampAttribute() != null) {
stepContext.addInput(
- PropertyNames.PUBSUB_TIMESTAMP_LABEL, overriddenTransform.getTimestampLabel());
+ PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, overriddenTransform.getTimestampAttribute());
}
- if (overriddenTransform.getIdLabel() != null) {
- stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
+ if (overriddenTransform.getIdAttribute() != null) {
+ stepContext.addInput(
+ PropertyNames.PUBSUB_ID_ATTRIBUTE, overriddenTransform.getIdAttribute());
}
if (overriddenTransform.getWithAttributesParseFn() != null) {
stepContext.addInput(
@@ -997,12 +998,13 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
PropertyNames.PUBSUB_TOPIC_OVERRIDE,
((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName());
}
- if (overriddenTransform.getTimestampLabel() != null) {
+ if (overriddenTransform.getTimestampAttribute() != null) {
stepContext.addInput(
- PropertyNames.PUBSUB_TIMESTAMP_LABEL, overriddenTransform.getTimestampLabel());
+ PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, overriddenTransform.getTimestampAttribute());
}
- if (overriddenTransform.getIdLabel() != null) {
- stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
+ if (overriddenTransform.getIdAttribute() != null) {
+ stepContext.addInput(
+ PropertyNames.PUBSUB_ID_ATTRIBUTE, overriddenTransform.getIdAttribute());
}
if (overriddenTransform.getFormatFn() != null) {
stepContext.addInput(
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
index ee25448..aa5855b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
@@ -82,11 +82,11 @@ public class PropertyNames {
public static final String OUTPUT_NAME = "output_name";
public static final String PARALLEL_INPUT = "parallel_input";
public static final String PHASE = "phase";
- public static final String PUBSUB_ID_LABEL = "pubsub_id_label";
+ public static final String PUBSUB_ID_ATTRIBUTE = "pubsub_id_label";
public static final String PUBSUB_SERIALIZED_ATTRIBUTES_FN = "pubsub_serialized_attributes_fn";
public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription";
public static final String PUBSUB_SUBSCRIPTION_OVERRIDE = "pubsub_subscription_runtime_override";
- public static final String PUBSUB_TIMESTAMP_LABEL = "pubsub_timestamp_label";
+ public static final String PUBSUB_TIMESTAMP_ATTRIBUTE = "pubsub_timestamp_label";
public static final String PUBSUB_TOPIC = "pubsub_topic";
public static final String PUBSUB_TOPIC_OVERRIDE = "pubsub_topic_runtime_override";
public static final String SCALAR_FIELD_NAME = "value";
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
index 3a69799..cfe36ee 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
@@ -42,16 +42,15 @@ abstract class PubsubClient implements Closeable {
*/
public interface PubsubClientFactory extends Serializable {
/**
- * Construct a new Pubsub client. It should be closed via {@link #close} in order
- * to ensure tidy cleanup of underlying netty resources (or use the try-with-resources
- * construct). Uses {@code options} to derive pubsub endpoints and application credentials.
- * If non-{@literal null}, use {@code timestampLabel} and {@code idLabel} to store custom
- * timestamps/ids within message metadata.
+ * Construct a new Pubsub client. It should be closed via {@link #close} in order to ensure tidy
+ * cleanup of underlying netty resources (or use the try-with-resources construct). Uses {@code
+ * options} to derive pubsub endpoints and application credentials. If non-{@literal null}, use
+ * {@code timestampAttribute} and {@code idAttribute} to store custom timestamps/ids within
+ * message metadata.
*/
PubsubClient newClient(
- @Nullable String timestampLabel,
- @Nullable String idLabel,
- PubsubOptions options) throws IOException;
+ @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options)
+ throws IOException;
/**
* Return the display name for this factory. Eg "Json", "gRPC".
@@ -86,33 +85,33 @@ abstract class PubsubClient implements Closeable {
* Return the timestamp (in ms since unix epoch) to use for a Pubsub message with {@code
* attributes} and {@code pubsubTimestamp}.
*
- * <p>If {@code timestampLabel} is non-{@literal null} then the message attributes must contain
- * that label, and the value of that label will be taken as the timestamp.
+ * <p>If {@code timestampAttribute} is non-{@literal null} then the message attributes must
+ * contain that attribute, and the value of that attribute will be taken as the timestamp.
* Otherwise the timestamp will be taken from the Pubsub publish timestamp {@code
* pubsubTimestamp}.
*
* @throws IllegalArgumentException if the timestamp cannot be recognized as a ms-since-unix-epoch
- * or RFC3339 time.
+ * or RFC3339 time.
*/
protected static long extractTimestamp(
- @Nullable String timestampLabel,
+ @Nullable String timestampAttribute,
@Nullable String pubsubTimestamp,
@Nullable Map<String, String> attributes) {
Long timestampMsSinceEpoch;
- if (Strings.isNullOrEmpty(timestampLabel)) {
+ if (Strings.isNullOrEmpty(timestampAttribute)) {
timestampMsSinceEpoch = asMsSinceEpoch(pubsubTimestamp);
checkArgument(timestampMsSinceEpoch != null,
"Cannot interpret PubSub publish timestamp: %s",
pubsubTimestamp);
} else {
- String value = attributes == null ? null : attributes.get(timestampLabel);
+ String value = attributes == null ? null : attributes.get(timestampAttribute);
checkArgument(value != null,
- "PubSub message is missing a value for timestamp label %s",
- timestampLabel);
+ "PubSub message is missing a value for timestamp attribute %s",
+ timestampAttribute);
timestampMsSinceEpoch = asMsSinceEpoch(value);
checkArgument(timestampMsSinceEpoch != null,
- "Cannot interpret value of label %s as timestamp: %s",
- timestampLabel, value);
+ "Cannot interpret value of attribute %s as timestamp: %s",
+ timestampAttribute, value);
}
return timestampMsSinceEpoch;
}
@@ -317,11 +316,10 @@ abstract class PubsubClient implements Closeable {
public final long timestampMsSinceEpoch;
/**
- * If using an id label, the record id to associate with this record's metadata so the receiver
- * can reject duplicates. Otherwise {@literal null}.
+ * If using an id attribute, the record id to associate with this record's metadata so the
+ * receiver can reject duplicates. Otherwise {@literal null}.
*/
- @Nullable
- public final String recordId;
+ @Nullable public final String recordId;
public OutgoingMessage(byte[] elementBytes, Map<String, String> attributes,
long timestampMsSinceEpoch, @Nullable String recordId) {
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
index 16de648..9778edf 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
@@ -79,7 +79,7 @@ class PubsubGrpcClient extends PubsubClient {
private static class PubsubGrpcClientFactory implements PubsubClientFactory {
@Override
public PubsubClient newClient(
- @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+ @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options)
throws IOException {
ManagedChannel channel = NettyChannelBuilder
.forAddress(PUBSUB_ADDRESS, PUBSUB_PORT)
@@ -87,8 +87,8 @@ class PubsubGrpcClient extends PubsubClient {
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
.build();
- return new PubsubGrpcClient(timestampLabel,
- idLabel,
+ return new PubsubGrpcClient(timestampAttribute,
+ idAttribute,
DEFAULT_TIMEOUT_S,
channel,
options.getGcpCredential());
@@ -122,17 +122,17 @@ class PubsubGrpcClient extends PubsubClient {
private final Credentials credentials;
/**
- * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
+ * Attribute to use for custom timestamps, or {@literal null} if should use Pubsub publish time
* instead.
*/
@Nullable
- private final String timestampLabel;
+ private final String timestampAttribute;
/**
- * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
+ * Attribute to use for custom ids, or {@literal null} if should use Pubsub provided ids.
*/
@Nullable
- private final String idLabel;
+ private final String idAttribute;
/**
@@ -144,13 +144,13 @@ class PubsubGrpcClient extends PubsubClient {
@VisibleForTesting
PubsubGrpcClient(
- @Nullable String timestampLabel,
- @Nullable String idLabel,
+ @Nullable String timestampAttribute,
+ @Nullable String idAttribute,
int timeoutSec,
ManagedChannel publisherChannel,
Credentials credentials) {
- this.timestampLabel = timestampLabel;
- this.idLabel = idLabel;
+ this.timestampAttribute = timestampAttribute;
+ this.idAttribute = idAttribute;
this.timeoutSec = timeoutSec;
this.publisherChannel = publisherChannel;
this.credentials = credentials;
@@ -226,13 +226,13 @@ class PubsubGrpcClient extends PubsubClient {
message.putAllAttributes(outgoingMessage.attributes);
}
- if (timestampLabel != null) {
+ if (timestampAttribute != null) {
message.getMutableAttributes()
- .put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+ .put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
}
- if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
- message.getMutableAttributes().put(idLabel, outgoingMessage.recordId);
+ if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+ message.getMutableAttributes().put(idAttribute, outgoingMessage.recordId);
}
request.addMessages(message);
@@ -273,7 +273,7 @@ class PubsubGrpcClient extends PubsubClient {
+ timestampProto.getNanos() / 1000L);
}
long timestampMsSinceEpoch =
- extractTimestamp(timestampLabel, pubsubTimestampString, attributes);
+ extractTimestamp(timestampAttribute, pubsubTimestampString, attributes);
// Ack id.
String ackId = message.getAckId();
@@ -281,8 +281,8 @@ class PubsubGrpcClient extends PubsubClient {
// Record id, if any.
@Nullable String recordId = null;
- if (idLabel != null && attributes != null) {
- recordId = attributes.get(idLabel);
+ if (idAttribute != null && attributes != null) {
+ recordId = attributes.get(idAttribute);
}
if (Strings.isNullOrEmpty(recordId)) {
// Fall back to the Pubsub provided message id.
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 3a7522e..129a25f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -136,12 +136,12 @@ public class PubsubIO {
* Populate common {@link DisplayData} between Pubsub source and sink.
*/
private static void populateCommonDisplayData(DisplayData.Builder builder,
- String timestampLabel, String idLabel, ValueProvider<PubsubTopic> topic) {
+ String timestampAttribute, String idAttribute, ValueProvider<PubsubTopic> topic) {
builder
- .addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)
- .withLabel("Timestamp Label Attribute"))
- .addIfNotNull(DisplayData.item("idLabel", idLabel)
- .withLabel("ID Label Attribute"));
+ .addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute)
+ .withLabel("Timestamp Attribute"))
+ .addIfNotNull(DisplayData.item("idAttribute", idAttribute)
+ .withLabel("ID Attribute"));
if (topic != null) {
String topicString = topic.isAccessible() ? topic.get().asPath()
@@ -529,11 +529,11 @@ public class PubsubIO {
/** The name of the message attribute to read timestamps from. */
@Nullable
- abstract String getTimestampLabel();
+ abstract String getTimestampAttribute();
/** The name of the message attribute to read unique message IDs from. */
@Nullable
- abstract String getIdLabel();
+ abstract String getIdAttribute();
/** The coder used to decode each record. */
@Nullable
@@ -551,9 +551,9 @@ public class PubsubIO {
abstract Builder<T> setSubscriptionProvider(ValueProvider<PubsubSubscription> subscription);
- abstract Builder<T> setTimestampLabel(String timestampLabel);
+ abstract Builder<T> setTimestampAttribute(String timestampAttribute);
- abstract Builder<T> setIdLabel(String idLabel);
+ abstract Builder<T> setIdAttribute(String idAttribute);
abstract Builder<T> setCoder(Coder<T> coder);
@@ -633,7 +633,7 @@ public class PubsubIO {
* (i.e., time units smaller than milliseconds) will be ignored.
* </ul>
*
- * <p>If {@code timestampLabel} is not provided, the system will generate record timestamps
+ * <p>If {@code timestampAttribute} is not provided, the system will generate record timestamps
* the first time it sees each record. All windowing will be done relative to these
* timestamps.
*
@@ -643,12 +643,12 @@ public class PubsubIO {
* specified with the windowing strategy – by default it will be output immediately.
*
* <p>Note that the system can guarantee that no late data will ever be seen when it assigns
- * timestamps by arrival time (i.e. {@code timestampLabel} is not provided).
+ * timestamps by arrival time (i.e. {@code timestampAttribute} is not provided).
*
* @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
*/
- public Read<T> withTimestampLabel(String timestampLabel) {
- return toBuilder().setTimestampLabel(timestampLabel).build();
+ public Read<T> withTimestampAttribute(String timestampAttribute) {
+ return toBuilder().setTimestampAttribute(timestampAttribute).build();
}
/**
@@ -657,11 +657,11 @@ public class PubsubIO {
* The value of the attribute can be any string that uniquely identifies this record.
*
* <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream.
- * If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will
+ * If {@code idAttribute} is not provided, Beam cannot guarantee that no duplicate data will
* be delivered, and deduplication of the stream will be strictly best effort.
*/
- public Read<T> withIdLabel(String idLabel) {
- return toBuilder().setIdLabel(idLabel).build();
+ public Read<T> withIdAttribute(String idAttribute) {
+ return toBuilder().setIdAttribute(idAttribute).build();
}
/**
@@ -718,8 +718,8 @@ public class PubsubIO {
topicPath,
subscriptionPath,
getCoder(),
- getTimestampLabel(),
- getIdLabel(),
+ getTimestampAttribute(),
+ getIdAttribute(),
getParseFn());
return input.getPipeline().apply(source);
}
@@ -727,7 +727,8 @@ public class PubsubIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- populateCommonDisplayData(builder, getTimestampLabel(), getIdLabel(), getTopicProvider());
+ populateCommonDisplayData(
+ builder, getTimestampAttribute(), getIdAttribute(), getTopicProvider());
if (getSubscriptionProvider() != null) {
String subscriptionString = getSubscriptionProvider().isAccessible()
@@ -757,11 +758,11 @@ public class PubsubIO {
/** The name of the message attribute to publish message timestamps in. */
@Nullable
- abstract String getTimestampLabel();
+ abstract String getTimestampAttribute();
/** The name of the message attribute to publish unique message IDs in. */
@Nullable
- abstract String getIdLabel();
+ abstract String getIdAttribute();
/** The input type Coder. */
@Nullable
@@ -777,9 +778,9 @@ public class PubsubIO {
abstract static class Builder<T> {
abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> topicProvider);
- abstract Builder<T> setTimestampLabel(String timestampLabel);
+ abstract Builder<T> setTimestampAttribute(String timestampAttribute);
- abstract Builder<T> setIdLabel(String idLabel);
+ abstract Builder<T> setIdAttribute(String idAttribute);
abstract Builder<T> setCoder(Coder<T> coder);
@@ -814,23 +815,23 @@ public class PubsubIO {
* time classes, {@link Instant#Instant(long)} can be used to parse this value.
*
* <p>If the output from this sink is being read by another Beam pipeline, then
- * {@link PubsubIO.Read#withTimestampLabel(String)} can be used to ensure the other source reads
- * these timestamps from the appropriate attribute.
+ * {@link PubsubIO.Read#withTimestampAttribute(String)} can be used to ensure the other source
+ * reads these timestamps from the appropriate attribute.
*/
- public Write<T> withTimestampLabel(String timestampLabel) {
- return toBuilder().setTimestampLabel(timestampLabel).build();
+ public Write<T> withTimestampAttribute(String timestampAttribute) {
+ return toBuilder().setTimestampAttribute(timestampAttribute).build();
}
/**
* Writes to Pub/Sub, adding each record's unique identifier to the published messages in an
* attribute with the specified name. The value of the attribute is an opaque string.
*
- * <p>If the the output from this sink is being read by another Beam pipeline, then
- * {@link PubsubIO.Read#withIdLabel(String)} can be used to ensure that* the other source reads
+ * <p>If the the output from this sink is being read by another Beam pipeline, then {@link
+ * PubsubIO.Read#withIdAttribute(String)} can be used to ensure that* the other source reads
* these unique identifiers from the appropriate attribute.
*/
- public Write<T> withIdLabel(String idLabel) {
- return toBuilder().setIdLabel(idLabel).build();
+ public Write<T> withIdAttribute(String idAttribute) {
+ return toBuilder().setIdAttribute(idAttribute).build();
}
/**
@@ -864,8 +865,8 @@ public class PubsubIO {
FACTORY,
NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator()),
getCoder(),
- getTimestampLabel(),
- getIdLabel(),
+ getTimestampAttribute(),
+ getIdAttribute(),
getFormatFn(),
100 /* numShards */));
}
@@ -875,7 +876,8 @@ public class PubsubIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- populateCommonDisplayData(builder, getTimestampLabel(), getIdLabel(), getTopicProvider());
+ populateCommonDisplayData(
+ builder, getTimestampAttribute(), getIdAttribute(), getTopicProvider());
}
@Override
@@ -897,9 +899,9 @@ public class PubsubIO {
@StartBundle
public void startBundle(Context c) throws IOException {
this.output = new ArrayList<>();
- // NOTE: idLabel is ignored.
+ // NOTE: idAttribute is ignored.
this.pubsubClient =
- FACTORY.newClient(getTimestampLabel(), null,
+ FACTORY.newClient(getTimestampAttribute(), null,
c.getPipelineOptions().as(PubsubOptions.class));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
index 39184fb..b745422 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -69,7 +69,7 @@ class PubsubJsonClient extends PubsubClient {
@Override
public PubsubClient newClient(
- @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+ @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options)
throws IOException {
Pubsub pubsub = new Builder(
Transport.getTransport(),
@@ -82,7 +82,7 @@ class PubsubJsonClient extends PubsubClient {
.setApplicationName(options.getAppName())
.setGoogleClientRequestInitializer(options.getGoogleApiTrace())
.build();
- return new PubsubJsonClient(timestampLabel, idLabel, pubsub);
+ return new PubsubJsonClient(timestampAttribute, idAttribute, pubsub);
}
@Override
@@ -97,17 +97,17 @@ class PubsubJsonClient extends PubsubClient {
public static final PubsubClientFactory FACTORY = new PubsubJsonClientFactory();
/**
- * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
+ * Attribute to use for custom timestamps, or {@literal null} if should use Pubsub publish time
* instead.
*/
@Nullable
- private final String timestampLabel;
+ private final String timestampAttribute;
/**
- * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
+ * Attribute to use for custom ids, or {@literal null} if should use Pubsub provided ids.
*/
@Nullable
- private final String idLabel;
+ private final String idAttribute;
/**
* Underlying JSON transport.
@@ -116,11 +116,11 @@ class PubsubJsonClient extends PubsubClient {
@VisibleForTesting
PubsubJsonClient(
- @Nullable String timestampLabel,
- @Nullable String idLabel,
+ @Nullable String timestampAttribute,
+ @Nullable String idAttribute,
Pubsub pubsub) {
- this.timestampLabel = timestampLabel;
- this.idLabel = idLabel;
+ this.timestampAttribute = timestampAttribute;
+ this.idAttribute = idAttribute;
this.pubsub = pubsub;
}
@@ -137,19 +137,19 @@ class PubsubJsonClient extends PubsubClient {
PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes);
Map<String, String> attributes = outgoingMessage.attributes;
- if ((timestampLabel != null || idLabel != null) && attributes == null) {
+ if ((timestampAttribute != null || idAttribute != null) && attributes == null) {
attributes = new TreeMap<>();
}
if (attributes != null) {
pubsubMessage.setAttributes(attributes);
}
- if (timestampLabel != null) {
- attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+ if (timestampAttribute != null) {
+ attributes.put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
}
- if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
- attributes.put(idLabel, outgoingMessage.recordId);
+ if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+ attributes.put(idAttribute, outgoingMessage.recordId);
}
pubsubMessages.add(pubsubMessage);
@@ -188,7 +188,7 @@ class PubsubJsonClient extends PubsubClient {
// Timestamp.
long timestampMsSinceEpoch =
- extractTimestamp(timestampLabel, message.getMessage().getPublishTime(), attributes);
+ extractTimestamp(timestampAttribute, message.getMessage().getPublishTime(), attributes);
// Ack id.
String ackId = message.getAckId();
@@ -196,8 +196,8 @@ class PubsubJsonClient extends PubsubClient {
// Record id, if any.
@Nullable String recordId = null;
- if (idLabel != null && attributes != null) {
- recordId = attributes.get(idLabel);
+ if (idAttribute != null && attributes != null) {
+ recordId = attributes.get(idAttribute);
}
if (Strings.isNullOrEmpty(recordId)) {
// Fall back to the Pubsub provided message id.
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
index 9d40e41..df90597 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
@@ -136,7 +136,7 @@ class PubsubTestClient extends PubsubClient implements Serializable {
return new PubsubTestClientFactory() {
@Override
public PubsubClient newClient(
- @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+ @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options)
throws IOException {
return new PubsubTestClient();
}
@@ -182,7 +182,7 @@ class PubsubTestClient extends PubsubClient implements Serializable {
return new PubsubTestClientFactory() {
@Override
public PubsubClient newClient(
- @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+ @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options)
throws IOException {
return new PubsubTestClient();
}
@@ -226,7 +226,7 @@ class PubsubTestClient extends PubsubClient implements Serializable {
@Override
public PubsubClient newClient(
- @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+ @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options)
throws IOException {
return new PubsubTestClient() {
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index 002e979..8d273ba 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -224,8 +224,8 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
private final PubsubClientFactory pubsubFactory;
private final ValueProvider<TopicPath> topic;
- private final String timestampLabel;
- private final String idLabel;
+ private final String timestampAttribute;
+ private final String idAttribute;
private final int publishBatchSize;
private final int publishBatchBytes;
@@ -240,12 +240,16 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
private final Counter byteCounter = SinkMetrics.bytesWritten();
WriterFn(
- PubsubClientFactory pubsubFactory, ValueProvider<TopicPath> topic,
- String timestampLabel, String idLabel, int publishBatchSize, int publishBatchBytes) {
+ PubsubClientFactory pubsubFactory,
+ ValueProvider<TopicPath> topic,
+ String timestampAttribute,
+ String idAttribute,
+ int publishBatchSize,
+ int publishBatchBytes) {
this.pubsubFactory = pubsubFactory;
this.topic = topic;
- this.timestampLabel = timestampLabel;
- this.idLabel = idLabel;
+ this.timestampAttribute = timestampAttribute;
+ this.idAttribute = idAttribute;
this.publishBatchSize = publishBatchSize;
this.publishBatchBytes = publishBatchBytes;
}
@@ -267,7 +271,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
@StartBundle
public void startBundle(Context c) throws Exception {
checkState(pubsubClient == null, "startBundle invoked without prior finishBundle");
- pubsubClient = pubsubFactory.newClient(timestampLabel, idLabel,
+ pubsubClient = pubsubFactory.newClient(timestampAttribute, idAttribute,
c.getPipelineOptions().as(PubsubOptions.class));
}
@@ -311,8 +315,8 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
: topic.toString();
builder.add(DisplayData.item("topic", topicString));
builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
- builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel));
- builder.addIfNotNull(DisplayData.item("idLabel", idLabel));
+ builder.addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute));
+ builder.addIfNotNull(DisplayData.item("idAttribute", idAttribute));
}
}
@@ -341,14 +345,14 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
* Pubsub message publish timestamp instead.
*/
@Nullable
- private final String timestampLabel;
+ private final String timestampAttribute;
/**
* Pubsub metadata field holding id for each element, or {@literal null} if need to generate
* a unique id ourselves.
*/
@Nullable
- private final String idLabel;
+ private final String idAttribute;
/**
* Number of 'shards' to use so that latency in Pubsub publish can be hidden. Generally this
@@ -374,7 +378,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
private final Duration maxLatency;
/**
- * How record ids should be generated for each record (if {@link #idLabel} is non-{@literal
+ * How record ids should be generated for each record (if {@link #idAttribute} is non-{@literal
* null}).
*/
private final RecordIdMethod recordIdMethod;
@@ -390,8 +394,8 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
PubsubClientFactory pubsubFactory,
ValueProvider<TopicPath> topic,
Coder<T> elementCoder,
- String timestampLabel,
- String idLabel,
+ String timestampAttribute,
+ String idAttribute,
int numShards,
int publishBatchSize,
int publishBatchBytes,
@@ -401,25 +405,25 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
this.pubsubFactory = pubsubFactory;
this.topic = topic;
this.elementCoder = elementCoder;
- this.timestampLabel = timestampLabel;
- this.idLabel = idLabel;
+ this.timestampAttribute = timestampAttribute;
+ this.idAttribute = idAttribute;
this.numShards = numShards;
this.publishBatchSize = publishBatchSize;
this.publishBatchBytes = publishBatchBytes;
this.maxLatency = maxLatency;
this.formatFn = formatFn;
- this.recordIdMethod = idLabel == null ? RecordIdMethod.NONE : recordIdMethod;
+ this.recordIdMethod = idAttribute == null ? RecordIdMethod.NONE : recordIdMethod;
}
public PubsubUnboundedSink(
PubsubClientFactory pubsubFactory,
ValueProvider<TopicPath> topic,
Coder<T> elementCoder,
- String timestampLabel,
- String idLabel,
+ String timestampAttribute,
+ String idAttribute,
SimpleFunction<T, PubsubIO.PubsubMessage> formatFn,
int numShards) {
- this(pubsubFactory, topic, elementCoder, timestampLabel, idLabel, numShards,
+ this(pubsubFactory, topic, elementCoder, timestampAttribute, idAttribute, numShards,
DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY,
formatFn, RecordIdMethod.RANDOM);
}
@@ -439,19 +443,19 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
}
/**
- * Get the timestamp label.
+ * Get the timestamp attribute.
*/
@Nullable
- public String getTimestampLabel() {
- return timestampLabel;
+ public String getTimestampAttribute() {
+ return timestampAttribute;
}
/**
- * Get the id label.
+ * Get the id attribute.
*/
@Nullable
- public String getIdLabel() {
- return idLabel;
+ public String getIdAttribute() {
+ return idAttribute;
}
/**
@@ -483,7 +487,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
.setCoder(KvCoder.of(VarIntCoder.of(), CODER))
.apply(GroupByKey.<Integer, OutgoingMessage>create())
.apply("PubsubUnboundedSink.Writer",
- ParDo.of(new WriterFn(pubsubFactory, topic, timestampLabel, idLabel,
+ ParDo.of(new WriterFn(pubsubFactory, topic, timestampAttribute, idAttribute,
publishBatchSize, publishBatchBytes)));
return PDone.in(input.getPipeline());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 6392fd2..903ae41 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -602,7 +602,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
pubsubClient =
new AtomicReference<>(
outer.outer.pubsubFactory.newClient(
- outer.outer.timestampLabel, outer.outer.idLabel, options));
+ outer.outer.timestampAttribute, outer.outer.idAttribute, options));
ackTimeoutMs = -1;
safeToAckIds = new HashSet<>();
notYetRead = new ArrayDeque<>();
@@ -1207,22 +1207,22 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
@Nullable
private final ValueProvider<TopicPath> topic;
@Nullable
- private final String timestampLabel;
+ private final String timestampAttribute;
@Nullable
- private final String idLabel;
+ private final String idAttribute;
public StatsFn(
PubsubClientFactory pubsubFactory,
@Nullable ValueProvider<SubscriptionPath> subscription,
@Nullable ValueProvider<TopicPath> topic,
- @Nullable String timestampLabel,
- @Nullable String idLabel) {
+ @Nullable String timestampAttribute,
+ @Nullable String idAttribute) {
checkArgument(pubsubFactory != null, "pubsubFactory should not be null");
this.pubsubFactory = pubsubFactory;
this.subscription = subscription;
this.topic = topic;
- this.timestampLabel = timestampLabel;
- this.idLabel = idLabel;
+ this.timestampAttribute = timestampAttribute;
+ this.idAttribute = idAttribute;
}
@ProcessElement
@@ -1247,8 +1247,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
builder.add(DisplayData.item("topic", topicString));
}
builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
- builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel));
- builder.addIfNotNull(DisplayData.item("idLabel", idLabel));
+ builder.addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute));
+ builder.addIfNotNull(DisplayData.item("idAttribute", idAttribute));
}
}
@@ -1303,14 +1303,14 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
* Pubsub message publish timestamp instead.
*/
@Nullable
- private final String timestampLabel;
+ private final String timestampAttribute;
/**
* Pubsub metadata field holding id for each element, or {@literal null} if need to generate
* a unique id ourselves.
*/
@Nullable
- private final String idLabel;
+ private final String idAttribute;
/**
* If not {@literal null}, the user is asking for PubSub attributes. This parse function will be
@@ -1327,8 +1327,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
@Nullable ValueProvider<TopicPath> topic,
@Nullable ValueProvider<SubscriptionPath> subscription,
Coder<T> elementCoder,
- @Nullable String timestampLabel,
- @Nullable String idLabel,
+ @Nullable String timestampAttribute,
+ @Nullable String idAttribute,
@Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) {
checkArgument((topic == null) != (subscription == null),
"Exactly one of topic and subscription must be given");
@@ -1340,8 +1340,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
this.topic = topic;
this.subscription = subscription;
this.elementCoder = checkNotNull(elementCoder);
- this.timestampLabel = timestampLabel;
- this.idLabel = idLabel;
+ this.timestampAttribute = timestampAttribute;
+ this.idAttribute = idAttribute;
this.parseFn = parseFn;
}
@@ -1354,10 +1354,18 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
@Nullable ValueProvider<TopicPath> topic,
@Nullable ValueProvider<SubscriptionPath> subscription,
Coder<T> elementCoder,
- @Nullable String timestampLabel,
- @Nullable String idLabel,
+ @Nullable String timestampAttribute,
+ @Nullable String idAttribute,
@Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) {
- this(null, pubsubFactory, project, topic, subscription, elementCoder, timestampLabel, idLabel,
+ this(
+ null,
+ pubsubFactory,
+ project,
+ topic,
+ subscription,
+ elementCoder,
+ timestampAttribute,
+ idAttribute,
parseFn);
}
@@ -1409,19 +1417,19 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
}
/**
- * Get the timestamp label.
+ * Get the timestamp attribute.
*/
@Nullable
- public String getTimestampLabel() {
- return timestampLabel;
+ public String getTimestampAttribute() {
+ return timestampAttribute;
}
/**
- * Get the id label.
+ * Get the id attribute.
*/
@Nullable
- public String getIdLabel() {
- return idLabel;
+ public String getIdAttribute() {
+ return idAttribute;
}
/**
@@ -1438,13 +1446,14 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
.apply(Read.from(new PubsubSource<T>(this)))
.apply("PubsubUnboundedSource.Stats",
ParDo.of(new StatsFn<T>(
- pubsubFactory, subscription, topic, timestampLabel, idLabel)));
+ pubsubFactory, subscription, topic, timestampAttribute, idAttribute)));
}
private SubscriptionPath createRandomSubscription(PipelineOptions options) {
try {
try (PubsubClient pubsubClient =
- pubsubFactory.newClient(timestampLabel, idLabel, options.as(PubsubOptions.class))) {
+ pubsubFactory.newClient(
+ timestampAttribute, idAttribute, options.as(PubsubOptions.class))) {
checkState(project.isAccessible(), "createRandomSubscription must be called at runtime.");
checkState(topic.isAccessible(), "createRandomSubscription must be called at runtime.");
SubscriptionPath subscriptionPath =
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
index 14c36f9..d37235f 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
@@ -45,8 +45,8 @@ public class PubsubClientTest {
//
private long parse(String timestamp) {
- Map<String, String> map = ImmutableMap.of("myLabel", timestamp);
- return PubsubClient.extractTimestamp("myLabel", null, map);
+ Map<String, String> map = ImmutableMap.of("myAttribute", timestamp);
+ return PubsubClient.extractTimestamp("myAttribute", null, map);
}
private void roundTripRfc339(String timestamp) {
@@ -58,106 +58,106 @@ public class PubsubClientTest {
}
@Test
- public void noTimestampLabelReturnsPubsubPublish() {
+ public void noTimestampAttributeReturnsPubsubPublish() {
final long time = 987654321L;
long timestamp = PubsubClient.extractTimestamp(null, String.valueOf(time), null);
assertEquals(time, timestamp);
}
@Test
- public void noTimestampLabelAndInvalidPubsubPublishThrowsError() {
+ public void noTimestampAttributeAndInvalidPubsubPublishThrowsError() {
thrown.expect(NumberFormatException.class);
PubsubClient.extractTimestamp(null, "not-a-date", null);
}
@Test
- public void timestampLabelWithNullAttributesThrowsError() {
+ public void timestampAttributeWithNullAttributesThrowsError() {
thrown.expect(RuntimeException.class);
- thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel");
- PubsubClient.extractTimestamp("myLabel", null, null);
+ thrown.expectMessage("PubSub message is missing a value for timestamp attribute myAttribute");
+ PubsubClient.extractTimestamp("myAttribute", null, null);
}
@Test
- public void timestampLabelSetWithMissingAttributeThrowsError() {
+ public void timestampAttributeSetWithMissingAttributeThrowsError() {
thrown.expect(RuntimeException.class);
- thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel");
+ thrown.expectMessage("PubSub message is missing a value for timestamp attribute myAttribute");
Map<String, String> map = ImmutableMap.of("otherLabel", "whatever");
- PubsubClient.extractTimestamp("myLabel", null, map);
+ PubsubClient.extractTimestamp("myAttribute", null, map);
}
@Test
- public void timestampLabelParsesMillisecondsSinceEpoch() {
+ public void timestampAttributeParsesMillisecondsSinceEpoch() {
long time = 1446162101123L;
- Map<String, String> map = ImmutableMap.of("myLabel", String.valueOf(time));
- long timestamp = PubsubClient.extractTimestamp("myLabel", null, map);
+ Map<String, String> map = ImmutableMap.of("myAttribute", String.valueOf(time));
+ long timestamp = PubsubClient.extractTimestamp("myAttribute", null, map);
assertEquals(time, timestamp);
}
@Test
- public void timestampLabelParsesRfc3339Seconds() {
+ public void timestampAttributeParsesRfc3339Seconds() {
roundTripRfc339("2015-10-29T23:41:41Z");
}
@Test
- public void timestampLabelParsesRfc3339Tenths() {
+ public void timestampAttributeParsesRfc3339Tenths() {
roundTripRfc339("2015-10-29T23:41:41.1Z");
}
@Test
- public void timestampLabelParsesRfc3339Hundredths() {
+ public void timestampAttributeParsesRfc3339Hundredths() {
roundTripRfc339("2015-10-29T23:41:41.12Z");
}
@Test
- public void timestampLabelParsesRfc3339Millis() {
+ public void timestampAttributeParsesRfc3339Millis() {
roundTripRfc339("2015-10-29T23:41:41.123Z");
}
@Test
- public void timestampLabelParsesRfc3339Micros() {
+ public void timestampAttributeParsesRfc3339Micros() {
// Note: micros part 456/1000 is dropped.
truncatedRfc339("2015-10-29T23:41:41.123456Z", "2015-10-29T23:41:41.123Z");
}
@Test
- public void timestampLabelParsesRfc3339MicrosRounding() {
+ public void timestampAttributeParsesRfc3339MicrosRounding() {
// Note: micros part 999/1000 is dropped, not rounded up.
truncatedRfc339("2015-10-29T23:41:41.123999Z", "2015-10-29T23:41:41.123Z");
}
@Test
- public void timestampLabelWithInvalidFormatThrowsError() {
+ public void timestampAttributeWithInvalidFormatThrowsError() {
thrown.expect(NumberFormatException.class);
parse("not-a-timestamp");
}
@Test
- public void timestampLabelWithInvalidFormat2ThrowsError() {
+ public void timestampAttributeWithInvalidFormat2ThrowsError() {
thrown.expect(NumberFormatException.class);
parse("null");
}
@Test
- public void timestampLabelWithInvalidFormat3ThrowsError() {
+ public void timestampAttributeWithInvalidFormat3ThrowsError() {
thrown.expect(NumberFormatException.class);
parse("2015-10");
}
@Test
- public void timestampLabelParsesRfc3339WithSmallYear() {
+ public void timestampAttributeParsesRfc3339WithSmallYear() {
// Google and JodaTime agree on dates after 1582-10-15, when the Gregorian Calendar was adopted
// This is therefore a "small year" until this difference is reconciled.
roundTripRfc339("1582-10-15T01:23:45.123Z");
}
@Test
- public void timestampLabelParsesRfc3339WithLargeYear() {
+ public void timestampAttributeParsesRfc3339WithLargeYear() {
// Year 9999 in range.
roundTripRfc339("9999-10-29T23:41:41.123999Z");
}
@Test
- public void timestampLabelRfc3339WithTooLargeYearThrowsError() {
+ public void timestampAttributeRfc3339WithTooLargeYearThrowsError() {
thrown.expect(NumberFormatException.class);
// Year 10000 out of range.
parse("10000-10-29T23:41:41.123999Z");
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
index 63721dc..87d6029 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
@@ -72,8 +72,8 @@ public class PubsubGrpcClientTest {
private static final long REQ_TIME = 1234L;
private static final long PUB_TIME = 3456L;
private static final long MESSAGE_TIME = 6789L;
- private static final String TIMESTAMP_LABEL = "timestamp";
- private static final String ID_LABEL = "id";
+ private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
+ private static final String ID_ATTRIBUTE = "id";
private static final String MESSAGE_ID = "testMessageId";
private static final String DATA = "testData";
private static final String RECORD_ID = "testRecordId";
@@ -87,7 +87,9 @@ public class PubsubGrpcClientTest {
PubsubGrpcClientTest.class.getName(), ThreadLocalRandom.current().nextInt());
inProcessChannel = InProcessChannelBuilder.forName(channelName).directExecutor().build();
testCredentials = new TestCredential();
- client = new PubsubGrpcClient(TIMESTAMP_LABEL, ID_LABEL, 10, inProcessChannel, testCredentials);
+ client =
+ new PubsubGrpcClient(
+ TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 10, inProcessChannel, testCredentials);
}
@After
@@ -117,9 +119,9 @@ public class PubsubGrpcClientTest {
.setPublishTime(timestamp)
.putAllAttributes(ATTRIBUTES)
.putAllAttributes(
- ImmutableMap.of(TIMESTAMP_LABEL,
+ ImmutableMap.of(TIMESTAMP_ATTRIBUTE,
String.valueOf(MESSAGE_TIME),
- ID_LABEL, RECORD_ID))
+ ID_ATTRIBUTE, RECORD_ID))
.build();
ReceivedMessage expectedReceivedMessage =
ReceivedMessage.newBuilder()
@@ -167,8 +169,8 @@ public class PubsubGrpcClientTest {
.setData(ByteString.copyFrom(DATA.getBytes()))
.putAllAttributes(ATTRIBUTES)
.putAllAttributes(
- ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
- ID_LABEL, RECORD_ID))
+ ImmutableMap.of(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME),
+ ID_ATTRIBUTE, RECORD_ID))
.build();
final PublishRequest expectedRequest =
PublishRequest.newBuilder()
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 20039d4..5f06b88 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -93,14 +93,14 @@ public class PubsubIOTest {
Duration maxReadTime = Duration.standardMinutes(5);
PubsubIO.Read<String> read = PubsubIO.<String>read()
.fromTopic(StaticValueProvider.of(topic))
- .withTimestampLabel("myTimestamp")
- .withIdLabel("myId");
+ .withTimestampAttribute("myTimestamp")
+ .withIdAttribute("myId");
DisplayData displayData = DisplayData.from(read);
assertThat(displayData, hasDisplayItem("topic", topic));
- assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
- assertThat(displayData, hasDisplayItem("idLabel", "myId"));
+ assertThat(displayData, hasDisplayItem("timestampAttribute", "myTimestamp"));
+ assertThat(displayData, hasDisplayItem("idAttribute", "myId"));
}
@Test
@@ -110,14 +110,14 @@ public class PubsubIOTest {
Duration maxReadTime = Duration.standardMinutes(5);
PubsubIO.Read<String> read = PubsubIO.<String>read()
.fromSubscription(StaticValueProvider.of(subscription))
- .withTimestampLabel("myTimestamp")
- .withIdLabel("myId");
+ .withTimestampAttribute("myTimestamp")
+ .withIdAttribute("myId");
DisplayData displayData = DisplayData.from(read);
assertThat(displayData, hasDisplayItem("subscription", subscription));
- assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
- assertThat(displayData, hasDisplayItem("idLabel", "myId"));
+ assertThat(displayData, hasDisplayItem("timestampAttribute", "myTimestamp"));
+ assertThat(displayData, hasDisplayItem("idAttribute", "myId"));
}
@Test
@@ -168,14 +168,14 @@ public class PubsubIOTest {
String topic = "projects/project/topics/topic";
PubsubIO.Write<?> write = PubsubIO.<String>write()
.to(topic)
- .withTimestampLabel("myTimestamp")
- .withIdLabel("myId");
+ .withTimestampAttribute("myTimestamp")
+ .withIdAttribute("myId");
DisplayData displayData = DisplayData.from(write);
assertThat(displayData, hasDisplayItem("topic", topic));
- assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
- assertThat(displayData, hasDisplayItem("idLabel", "myId"));
+ assertThat(displayData, hasDisplayItem("timestampAttribute", "myTimestamp"));
+ assertThat(displayData, hasDisplayItem("idAttribute", "myId"));
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
index d290994..578f814 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
@@ -58,8 +58,8 @@ public class PubsubJsonClientTest {
private static final long REQ_TIME = 1234L;
private static final long PUB_TIME = 3456L;
private static final long MESSAGE_TIME = 6789L;
- private static final String TIMESTAMP_LABEL = "timestamp";
- private static final String ID_LABEL = "id";
+ private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
+ private static final String ID_ATTRIBUTE = "id";
private static final String MESSAGE_ID = "testMessageId";
private static final String DATA = "testData";
private static final String RECORD_ID = "testRecordId";
@@ -68,7 +68,7 @@ public class PubsubJsonClientTest {
@Before
public void setup() throws IOException {
mockPubsub = Mockito.mock(Pubsub.class, Mockito.RETURNS_DEEP_STUBS);
- client = new PubsubJsonClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub);
+ client = new PubsubJsonClient(TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, mockPubsub);
}
@After
@@ -88,8 +88,8 @@ public class PubsubJsonClientTest {
.encodeData(DATA.getBytes())
.setPublishTime(String.valueOf(PUB_TIME))
.setAttributes(
- ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
- ID_LABEL, RECORD_ID));
+ ImmutableMap.of(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME),
+ ID_ATTRIBUTE, RECORD_ID));
ReceivedMessage expectedReceivedMessage =
new ReceivedMessage().setMessage(expectedPubsubMessage)
.setAckId(ACK_ID);
@@ -117,8 +117,8 @@ public class PubsubJsonClientTest {
.encodeData(DATA.getBytes())
.setAttributes(
ImmutableMap.<String, String> builder()
- .put(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME))
- .put(ID_LABEL, RECORD_ID)
+ .put(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME))
+ .put(ID_ATTRIBUTE, RECORD_ID)
.put("k", "v").build());
PublishRequest expectedRequest = new PublishRequest()
.setMessages(ImmutableList.of(expectedPubsubMessage));
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
index be425d4..580ada9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
@@ -58,8 +58,8 @@ public class PubsubUnboundedSinkTest implements Serializable {
private static final Map<String, String> ATTRIBUTES =
ImmutableMap.<String, String>builder().put("a", "b").put("c", "d").build();
private static final long TIMESTAMP = 1234L;
- private static final String TIMESTAMP_LABEL = "timestamp";
- private static final String ID_LABEL = "id";
+ private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
+ private static final String ID_ATTRIBUTE = "id";
private static final int NUM_SHARDS = 10;
private static class Stamp extends DoFn<String, String> {
@@ -99,7 +99,7 @@ public class PubsubUnboundedSinkTest implements Serializable {
ImmutableList.<OutgoingMessage>of())) {
PubsubUnboundedSink<String> sink =
new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(),
- TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
+ TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, batchSize, batchBytes,
Duration.standardSeconds(2),
new SimpleFunction<String, PubsubIO.PubsubMessage>() {
@Override
@@ -135,7 +135,7 @@ public class PubsubUnboundedSinkTest implements Serializable {
ImmutableList.<OutgoingMessage>of())) {
PubsubUnboundedSink<String> sink =
new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(),
- TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
+ TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, batchSize, batchBytes,
Duration.standardSeconds(2), null, RecordIdMethod.DETERMINISTIC);
p.apply(Create.of(data))
.apply(ParDo.of(new Stamp()))
@@ -170,7 +170,7 @@ public class PubsubUnboundedSinkTest implements Serializable {
ImmutableList.<OutgoingMessage>of())) {
PubsubUnboundedSink<String> sink =
new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC),
- StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
+ StringUtf8Coder.of(), TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE,
NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2),
null, RecordIdMethod.DETERMINISTIC);
p.apply(Create.of(data))
http://git-wip-us.apache.org/repos/asf/beam/blob/8853d53d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
index 949ba4f..dc66ea1 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
@@ -70,8 +70,8 @@ public class PubsubUnboundedSourceTest {
private static final String DATA = "testData";
private static final long TIMESTAMP = 1234L;
private static final long REQ_TIME = 6373L;
- private static final String TIMESTAMP_LABEL = "timestamp";
- private static final String ID_LABEL = "id";
+ private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
+ private static final String ID_ATTRIBUTE = "id";
private static final String ACK_ID = "testAckId";
private static final String RECORD_ID = "testRecordId";
private static final int ACK_TIMEOUT_S = 60;
@@ -96,7 +96,7 @@ public class PubsubUnboundedSourceTest {
PubsubUnboundedSource<String> source =
new PubsubUnboundedSource<>(
clock, factory, null, null, StaticValueProvider.of(SUBSCRIPTION),
- StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, null);
+ StringUtf8Coder.of(), TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, null);
primSource = new PubsubSource<>(source);
}
[5/9] beam git commit: Converts PubsubIO.Write to AutoValue
Posted by jk...@apache.org.
Converts PubsubIO.Write to AutoValue
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/df6ef969
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/df6ef969
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/df6ef969
Branch: refs/heads/master
Commit: df6ef969d6df5c42d091cc00997b0ed7680315fb
Parents: 9e81548
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 17:34:11 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 166 +++++++------------
1 file changed, 61 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/df6ef969/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 69a5bd6..5702af1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -461,8 +461,9 @@ public class PubsubIO {
return new AutoValue_PubsubIO_Read.Builder<T>().build();
}
+ /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */
public static <T> Write<T> write() {
- return new Write<>();
+ return new AutoValue_PubsubIO_Write.Builder<T>().build();
}
/** Implementation of {@link #read}. */
@@ -696,43 +697,47 @@ public class PubsubIO {
private PubsubIO() {}
- /**
- * A {@link PTransform} that writes an unbounded {@link PCollection} of {@link String Strings}
- * to a Cloud Pub/Sub stream.
- */
- public static class Write<T> extends PTransform<PCollection<T>, PDone> {
-
- /** The Cloud Pub/Sub topic to publish to. */
+ /** Implementation of {@link #write}. */
+ @AutoValue
+ public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
@Nullable
- private final ValueProvider<PubsubTopic> topic;
+ abstract ValueProvider<PubsubTopic> getTopicProvider();
+
/** The name of the message attribute to publish message timestamps in. */
@Nullable
- private final String timestampLabel;
+ abstract String getTimestampLabel();
+
/** The name of the message attribute to publish unique message IDs in. */
@Nullable
- private final String idLabel;
+ abstract String getIdLabel();
+
/** The input type Coder. */
- private final Coder<T> coder;
+ @Nullable
+ abstract Coder<T> getCoder();
+
/** The format function for input PubsubMessage objects. */
- SimpleFunction<T, PubsubMessage> formatFn;
+ @Nullable
+ abstract SimpleFunction<T, PubsubMessage> getFormatFn();
- private Write() {
- this(null, null, null, null, null, null);
- }
+ abstract Builder<T> toBuilder();
- private Write(
- String name, ValueProvider<PubsubTopic> topic, String timestampLabel,
- String idLabel, Coder<T> coder, SimpleFunction<T, PubsubMessage> formatFn) {
- super(name);
- this.topic = topic;
- this.timestampLabel = timestampLabel;
- this.idLabel = idLabel;
- this.coder = coder;
- this.formatFn = formatFn;
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> topicProvider);
+
+ abstract Builder<T> setTimestampLabel(String timestampLabel);
+
+ abstract Builder<T> setIdLabel(String idLabel);
+
+ abstract Builder<T> setCoder(Coder<T> coder);
+
+ abstract Builder<T> setFormatFn(SimpleFunction<T, PubsubMessage> formatFn);
+
+ abstract Write<T> build();
}
/**
- * Creates a transform that publishes to the specified topic.
+ * Publishes to the specified topic.
*
* <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
* {@code topic} string.
@@ -745,14 +750,15 @@ public class PubsubIO {
* Like {@code topic()} but with a {@link ValueProvider}.
*/
public Write<T> topic(ValueProvider<String> topic) {
- return new Write<>(name, NestedValueProvider.of(topic, new TopicTranslator()),
- timestampLabel, idLabel, coder, formatFn);
+ return toBuilder()
+ .setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator()))
+ .build();
}
/**
- * Creates a transform that writes to Pub/Sub, adds each record's timestamp to the published
- * messages in an attribute with the specified name. The value of the attribute will be a number
- * representing the number of milliseconds since the Unix epoch. For example, if using the Joda
+ * Writes to Pub/Sub and adds each record's timestamp to the published messages in an attribute
+ * with the specified name. The value of the attribute will be a number representing the number
+ * of milliseconds since the Unix epoch. For example, if using the Joda
* time classes, {@link Instant#Instant(long)} can be used to parse this value.
*
* <p>If the output from this sink is being read by another Beam pipeline, then
@@ -760,32 +766,27 @@ public class PubsubIO {
* these timestamps from the appropriate attribute.
*/
public Write<T> timestampLabel(String timestampLabel) {
- return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
+ return toBuilder().setTimestampLabel(timestampLabel).build();
}
/**
- * Creates a transform that writes to Pub/Sub, adding each record's unique identifier to the
- * published messages in an attribute with the specified name. The value of the attribute is an
- * opaque string.
+ * Writes to Pub/Sub, adding each record's unique identifier to the published messages in an
+ * attribute with the specified name. The value of the attribute is an opaque string.
*
* <p>If the the output from this sink is being read by another Beam pipeline, then
* {@link PubsubIO.Read#withIdLabel(String)} can be used to ensure that* the other source reads
* these unique identifiers from the appropriate attribute.
*/
public Write<T> idLabel(String idLabel) {
- return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
+ return toBuilder().setIdLabel(idLabel).build();
}
/**
- * Returns a new transform that's like this one
- * but that uses the given {@link Coder} to encode each of
- * the elements of the input {@link PCollection} into an
- * output record.
- *
- * <p>Does not modify this object.
+ * Uses the given {@link Coder} to encode each of the elements of the input {@link PCollection}
+ * into an output record.
*/
public Write<T> withCoder(Coder<T> coder) {
- return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
+ return toBuilder().setCoder(coder).build();
}
/**
@@ -794,12 +795,12 @@ public class PubsubIO {
* to separately set the PubSub message's payload and attributes.
*/
public Write<T> withAttributes(SimpleFunction<T, PubsubMessage> formatFn) {
- return new Write<T>(name, topic, timestampLabel, idLabel, coder, formatFn);
+ return toBuilder().setFormatFn(formatFn).build();
}
@Override
public PDone expand(PCollection<T> input) {
- if (topic == null) {
+ if (getTopicProvider() == null) {
throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
}
switch (input.isBounded()) {
@@ -809,11 +810,11 @@ public class PubsubIO {
case UNBOUNDED:
return input.apply(new PubsubUnboundedSink<T>(
FACTORY,
- NestedValueProvider.of(topic, new TopicPathTranslator()),
- coder,
- timestampLabel,
- idLabel,
- formatFn,
+ NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator()),
+ getCoder(),
+ getTimestampLabel(),
+ getIdLabel(),
+ getFormatFn(),
100 /* numShards */));
}
throw new RuntimeException(); // cases are exhaustive.
@@ -822,7 +823,7 @@ public class PubsubIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
+ populateCommonDisplayData(builder, getTimestampLabel(), getIdLabel(), getTopicProvider());
}
@Override
@@ -831,54 +832,6 @@ public class PubsubIO {
}
/**
- * Returns the PubSub topic being written to.
- */
- @Nullable
- public PubsubTopic getTopic() {
- return (topic == null) ? null : topic.get();
- }
-
- /**
- * Returns the {@link ValueProvider} for the topic being written to.
- */
- @Nullable
- public ValueProvider<PubsubTopic> getTopicProvider() {
- return topic;
- }
-
- /**
- * Returns the timestamp label.
- */
- @Nullable
- public String getTimestampLabel() {
- return timestampLabel;
- }
-
- /**
- * Returns the id label.
- */
- @Nullable
- public String getIdLabel() {
- return idLabel;
- }
-
- /**
- * Returns the output coder.
- */
- @Nullable
- public Coder<T> getCoder() {
- return coder;
- }
-
- /**
- * Returns the formatting function used if publishing attributes.
- */
- @Nullable
- public SimpleFunction<T, PubsubMessage> getFormatFn() {
- return formatFn;
- }
-
- /**
* Writer to Pubsub which batches messages from bounded collections.
*
* <p>Public so can be suppressed by runners.
@@ -894,7 +847,7 @@ public class PubsubIO {
this.output = new ArrayList<>();
// NOTE: idLabel is ignored.
this.pubsubClient =
- FACTORY.newClient(timestampLabel, null,
+ FACTORY.newClient(getTimestampLabel(), null,
c.getPipelineOptions().as(PubsubOptions.class));
}
@@ -902,8 +855,8 @@ public class PubsubIO {
public void processElement(ProcessContext c) throws IOException {
byte[] payload = null;
Map<String, String> attributes = null;
- if (formatFn != null) {
- PubsubMessage message = formatFn.apply(c.element());
+ if (getFormatFn() != null) {
+ PubsubMessage message = getFormatFn().apply(c.element());
payload = message.getMessage();
attributes = message.getAttributeMap();
} else {
@@ -930,9 +883,12 @@ public class PubsubIO {
}
private void publish() throws IOException {
- int n = pubsubClient.publish(
- PubsubClient.topicPathFromName(getTopic().project, getTopic().topic),
- output);
+ PubsubTopic topic = getTopicProvider().get();
+ int n =
+ pubsubClient.publish(
+ PubsubClient.topicPathFromName(
+ topic.project, topic.topic),
+ output);
checkState(n == output.size());
output.clear();
}
[3/9] beam git commit: Remove override of topic by subscription and
vice versa
Posted by jk...@apache.org.
Remove override of topic by subscription and vice versa
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9e815485
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9e815485
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9e815485
Branch: refs/heads/master
Commit: 9e815485b979b99b190c4acf1098ab054492ae9e
Parents: 5d8fbc4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 27 17:04:58 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 4 ----
.../apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 13 ++++++++-----
2 files changed, 8 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9e815485/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 20aed6d..69a5bd6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -534,8 +534,6 @@ public class PubsubIO {
return toBuilder()
.setSubscriptionProvider(
NestedValueProvider.of(subscription, new SubscriptionTranslator()))
- /* reset topic to null */
- .setTopicProvider(null)
.build();
}
@@ -564,8 +562,6 @@ public class PubsubIO {
}
return toBuilder()
.setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator()))
- /* reset subscription to null */
- .setSubscriptionProvider(null)
.build();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9e815485/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index f44fffc..69d989f 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -146,16 +146,19 @@ public class PubsubIOTest {
public void testPrimitiveReadDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
Set<DisplayData> displayData;
- PubsubIO.Read<String> read = PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
+ PubsubIO.Read<String> baseRead = PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
// Reading from a subscription.
- read = read.fromSubscription("projects/project/subscriptions/subscription");
+ PubsubIO.Read<String> read =
+ baseRead.fromSubscription("projects/project/subscriptions/subscription");
displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
- assertThat("PubsubIO.Read should include the subscription in its primitive display data",
- displayData, hasItem(hasDisplayItem("subscription")));
+ assertThat(
+ "PubsubIO.Read should include the subscription in its primitive display data",
+ displayData,
+ hasItem(hasDisplayItem("subscription")));
// Reading from a topic.
- read = read.fromTopic("projects/project/topics/topic");
+ read = baseRead.fromTopic("projects/project/topics/topic");
displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
assertThat("PubsubIO.Read should include the topic in its primitive display data",
displayData, hasItem(hasDisplayItem("topic")));
[2/9] beam git commit: Adds PubsubIO.writeStrings(), writeProtos(),
writeAvros()
Posted by jk...@apache.org.
Adds PubsubIO.writeStrings(), writeProtos(), writeAvros()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f0651145
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f0651145
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f0651145
Branch: refs/heads/master
Commit: f0651145fea31854ab83fc064a3c7866251cc0a4
Parents: 079353d
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 17:54:03 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 28 ++++++++++++++++++--
1 file changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f0651145/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 9604864..3a7522e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -474,8 +474,8 @@ public class PubsubIO {
}
/**
- * Returns A {@link PTransform} that continuously reads binary encoded protos of the given type
- * from a Google Cloud Pub/Sub stream.
+ * Returns A {@link PTransform} that continuously reads binary encoded protobuf messages of the
+ * given type from a Google Cloud Pub/Sub stream.
*/
public static <T extends Message> Read<T> readProtos(Class<T> messageClass) {
return PubsubIO.<T>read().withCoder(ProtoCoder.of(messageClass));
@@ -494,6 +494,30 @@ public class PubsubIO {
return new AutoValue_PubsubIO_Write.Builder<T>().build();
}
+ /**
+ * Returns A {@link PTransform} that writes UTF-8 encoded strings to a Google Cloud Pub/Sub
+ * stream.
+ */
+ public static Write<String> writeStrings() {
+ return PubsubIO.<String>write().withCoder(StringUtf8Coder.of());
+ }
+
+ /**
+ * Returns A {@link PTransform} that writes binary encoded protobuf messages of a given type
+ * to a Google Cloud Pub/Sub stream.
+ */
+ public static <T extends Message> Write<T> writeProtos(Class<T> messageClass) {
+ return PubsubIO.<T>write().withCoder(ProtoCoder.of(messageClass));
+ }
+
+ /**
+ * Returns A {@link PTransform} that writes binary encoded Avro messages of a given type
+ * to a Google Cloud Pub/Sub stream.
+ */
+ public static <T extends Message> Write<T> writeAvros(Class<T> clazz) {
+ return PubsubIO.<T>write().withCoder(AvroCoder.of(clazz));
+ }
+
/** Implementation of {@link #read}. */
@AutoValue
public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
[4/9] beam git commit: Converts PubsubIO.Read to AutoValue
Posted by jk...@apache.org.
Converts PubsubIO.Read to AutoValue
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f4d04606
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f4d04606
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f4d04606
Branch: refs/heads/master
Commit: f4d04606c105ca45a7754516781cb72b4c818baf
Parents: f5e3f52
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 17:14:08 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 244 +++++++------------
.../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 8 +-
2 files changed, 95 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f4d04606/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index f0926d4..3c76942 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.pubsub;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -455,64 +456,61 @@ public class PubsubIO {
}
}
+ /** Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. */
public static <T> Read<T> read() {
- return new Read<>();
+ return new AutoValue_PubsubIO_Read.Builder<T>().build();
}
public static <T> Write<T> write() {
return new Write<>();
}
- /**
- * A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream and
- * returns a {@link PCollection} of {@link String Strings} containing the items from
- * the stream.
- */
- public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
-
- /** The Cloud Pub/Sub topic to read from. */
+ /** Implementation of {@link #read}. */
+ @AutoValue
+ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
@Nullable
- private final ValueProvider<PubsubTopic> topic;
+ abstract ValueProvider<PubsubTopic> getTopicProvider();
- /** The Cloud Pub/Sub subscription to read from. */
@Nullable
- private final ValueProvider<PubsubSubscription> subscription;
+ abstract ValueProvider<PubsubSubscription> getSubscriptionProvider();
/** The name of the message attribute to read timestamps from. */
@Nullable
- private final String timestampLabel;
+ abstract String getTimestampLabel();
/** The name of the message attribute to read unique message IDs from. */
@Nullable
- private final String idLabel;
+ abstract String getIdLabel();
/** The coder used to decode each record. */
@Nullable
- private final Coder<T> coder;
+ abstract Coder<T> getCoder();
/** User function for parsing PubsubMessage object. */
- SimpleFunction<PubsubMessage, T> parseFn;
+ @Nullable
+ abstract SimpleFunction<PubsubMessage, T> getParseFn();
- private Read() {
- this(null, null, null, null, null, null, null);
- }
+ abstract Builder<T> toBuilder();
- private Read(String name, ValueProvider<PubsubSubscription> subscription,
- ValueProvider<PubsubTopic> topic, String timestampLabel, Coder<T> coder,
- String idLabel,
- SimpleFunction<PubsubMessage, T> parseFn) {
- super(name);
- this.subscription = subscription;
- this.topic = topic;
- this.timestampLabel = timestampLabel;
- this.coder = coder;
- this.idLabel = idLabel;
- this.parseFn = parseFn;
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> topic);
+
+ abstract Builder<T> setSubscriptionProvider(ValueProvider<PubsubSubscription> subscription);
+
+ abstract Builder<T> setTimestampLabel(String timestampLabel);
+
+ abstract Builder<T> setIdLabel(String idLabel);
+
+ abstract Builder<T> setCoder(Coder<T> coder);
+
+ abstract Builder<T> setParseFn(SimpleFunction<PubsubMessage, T> parseFn);
+
+ abstract Read<T> build();
}
/**
- * Returns a transform that's like this one but reading from the
- * given subscription.
+ * Reads from the given subscription.
*
* <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format
* of the {@code subscription} string.
@@ -520,8 +518,6 @@ public class PubsubIO {
* <p>Multiple readers reading from the same subscription will each receive
* some arbitrary portion of the data. Most likely, separate readers should
* use their own subscriptions.
- *
- * <p>Does not modify this object.
*/
public Read<T> subscription(String subscription) {
return subscription(StaticValueProvider.of(subscription));
@@ -535,9 +531,12 @@ public class PubsubIO {
// Validate.
PubsubSubscription.fromPath(subscription.get());
}
- return new Read<>(
- name, NestedValueProvider.of(subscription, new SubscriptionTranslator()),
- null /* reset topic to null */, timestampLabel, coder, idLabel, parseFn);
+ return toBuilder()
+ .setSubscriptionProvider(
+ NestedValueProvider.of(subscription, new SubscriptionTranslator()))
+ /* reset topic to null */
+ .setTopicProvider(null)
+ .build();
}
/**
@@ -563,15 +562,16 @@ public class PubsubIO {
// Validate.
PubsubTopic.fromPath(topic.get());
}
- return new Read<>(name, null /* reset subscription to null */,
- NestedValueProvider.of(topic, new TopicTranslator()),
- timestampLabel, coder, idLabel, parseFn);
+ return toBuilder()
+ .setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator()))
+ /* reset subscription to null */
+ .setSubscriptionProvider(null)
+ .build();
}
/**
- * Creates and returns a transform reading from Cloud Pub/Sub where record timestamps are
- * expected to be provided as Pub/Sub message attributes. The {@code timestampLabel}
- * parameter specifies the name of the attribute that contains the timestamp.
+ * When reading from Cloud Pub/Sub where record timestamps are provided as Pub/Sub message
+ * attributes, specifies the name of the attribute that contains the timestamp.
*
* <p>The timestamp value is expected to be represented in the attribute as either:
*
@@ -599,88 +599,90 @@ public class PubsubIO {
* @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
*/
public Read<T> timestampLabel(String timestampLabel) {
- return new Read<>(
- name, subscription, topic, timestampLabel, coder, idLabel,
- parseFn);
+ return toBuilder().setTimestampLabel(timestampLabel).build();
}
/**
- * Creates and returns a transform for reading from Cloud Pub/Sub where unique record
- * identifiers are expected to be provided as Pub/Sub message attributes. The {@code idLabel}
- * parameter specifies the attribute name. The value of the attribute can be any string
- * that uniquely identifies this record.
+ * When reading from Cloud Pub/Sub where unique record identifiers are provided as Pub/Sub
+ * message attributes, specifies the name of the attribute containing the unique identifier.
+ * The value of the attribute can be any string that uniquely identifies this record.
*
* <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream.
* If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will
* be delivered, and deduplication of the stream will be strictly best effort.
*/
public Read<T> idLabel(String idLabel) {
- return new Read<>(
- name, subscription, topic, timestampLabel, coder, idLabel,
- parseFn);
+ return toBuilder().setIdLabel(idLabel).build();
}
/**
- * Returns a transform that's like this one but that uses the given
- * {@link Coder} to decode each record into a value of type {@code T}.
- *
- * <p>Does not modify this object.
+ * Uses the given {@link Coder} to decode each record into a value of type {@code T}.
*/
public Read<T> withCoder(Coder<T> coder) {
- return new Read<>(
- name, subscription, topic, timestampLabel, coder, idLabel,
- parseFn);
+ return toBuilder().setCoder(coder).build();
}
/**
- * Causes the source to return a PubsubMessage that includes Pubsub attributes.
- * The user must supply a parsing function to transform the PubsubMessage into an output type.
+ * Causes the source to return a PubsubMessage that includes Pubsub attributes, and uses the
+ * given parsing function to transform the PubsubMessage into an output type.
* A Coder for the output type T must be registered or set on the output via
* {@link PCollection#setCoder(Coder)}.
*/
public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) {
- return new Read<T>(
- name, subscription, topic, timestampLabel, coder, idLabel,
- parseFn);
+ return toBuilder().setParseFn(parseFn).build();
}
@Override
public PCollection<T> expand(PBegin input) {
- if (topic == null && subscription == null) {
- throw new IllegalStateException("Need to set either the topic or the subscription for "
- + "a PubsubIO.Read transform");
+ if (getTopicProvider() == null && getSubscriptionProvider() == null) {
+ throw new IllegalStateException(
+ "Need to set either the topic or the subscription for " + "a PubsubIO.Read transform");
}
- if (topic != null && subscription != null) {
- throw new IllegalStateException("Can't set both the topic and the subscription for "
- + "a PubsubIO.Read transform");
+ if (getTopicProvider() != null && getSubscriptionProvider() != null) {
+ throw new IllegalStateException(
+ "Can't set both the topic and the subscription for " + "a PubsubIO.Read transform");
}
- if (coder == null) {
- throw new IllegalStateException("PubsubIO.Read requires that a coder be set using "
- + "the withCoder method.");
+ if (getCoder() == null) {
+ throw new IllegalStateException(
+ "PubsubIO.Read requires that a coder be set using " + "the withCoder method.");
}
- @Nullable ValueProvider<ProjectPath> projectPath =
- topic == null ? null : NestedValueProvider.of(topic, new ProjectPathTranslator());
- @Nullable ValueProvider<TopicPath> topicPath =
- topic == null ? null : NestedValueProvider.of(topic, new TopicPathTranslator());
- @Nullable ValueProvider<SubscriptionPath> subscriptionPath =
- subscription == null
+ @Nullable
+ ValueProvider<ProjectPath> projectPath =
+ getTopicProvider() == null
? null
- : NestedValueProvider.of(subscription, new SubscriptionPathTranslator());
- PubsubUnboundedSource<T> source = new PubsubUnboundedSource<T>(
- FACTORY, projectPath, topicPath, subscriptionPath,
- coder, timestampLabel, idLabel, parseFn);
+ : NestedValueProvider.of(getTopicProvider(), new ProjectPathTranslator());
+ @Nullable
+ ValueProvider<TopicPath> topicPath =
+ getTopicProvider() == null
+ ? null
+ : NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator());
+ @Nullable
+ ValueProvider<SubscriptionPath> subscriptionPath =
+ getSubscriptionProvider() == null
+ ? null
+ : NestedValueProvider.of(getSubscriptionProvider(), new SubscriptionPathTranslator());
+ PubsubUnboundedSource<T> source =
+ new PubsubUnboundedSource<T>(
+ FACTORY,
+ projectPath,
+ topicPath,
+ subscriptionPath,
+ getCoder(),
+ getTimestampLabel(),
+ getIdLabel(),
+ getParseFn());
return input.getPipeline().apply(source);
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
+ populateCommonDisplayData(builder, getTimestampLabel(), getIdLabel(), getTopicProvider());
- if (subscription != null) {
- String subscriptionString = subscription.isAccessible()
- ? subscription.get().asPath() : subscription.toString();
+ if (getSubscriptionProvider() != null) {
+ String subscriptionString = getSubscriptionProvider().isAccessible()
+ ? getSubscriptionProvider().get().asPath() : getSubscriptionProvider().toString();
builder.add(DisplayData.item("subscription", subscriptionString)
.withLabel("Pubsub Subscription"));
}
@@ -688,72 +690,8 @@ public class PubsubIO {
@Override
protected Coder<T> getDefaultOutputCoder() {
- return coder;
+ return getCoder();
}
-
- /**
- * Get the topic being read from.
- */
- @Nullable
- public PubsubTopic getTopic() {
- return topic == null ? null : topic.get();
- }
-
- /**
- * Get the {@link ValueProvider} for the topic being read from.
- */
- public ValueProvider<PubsubTopic> getTopicProvider() {
- return topic;
- }
-
- /**
- * Get the subscription being read from.
- */
- @Nullable
- public PubsubSubscription getSubscription() {
- return subscription == null ? null : subscription.get();
- }
-
- /**
- * Get the {@link ValueProvider} for the subscription being read from.
- */
- public ValueProvider<PubsubSubscription> getSubscriptionProvider() {
- return subscription;
- }
-
- /**
- * Get the timestamp label.
- */
- @Nullable
- public String getTimestampLabel() {
- return timestampLabel;
- }
-
- /**
- * Get the id label.
- */
- @Nullable
- public String getIdLabel() {
- return idLabel;
- }
-
-
- /**
- * Get the {@link Coder} used for the transform's output.
- */
- @Nullable
- public Coder<T> getCoder() {
- return coder;
- }
-
- /**
- * Get the parse function used for PubSub attributes.
- */
- @Nullable
- public SimpleFunction<PubsubMessage, T> getPubSubMessageParseFn() {
- return parseFn;
- }
-
}
/////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/beam/blob/f4d04606/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 6e9922c..7fe6e26 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -126,8 +126,8 @@ public class PubsubIOTest {
String subscription = "projects/project/subscriptions/subscription";
PubsubIO.Read<String> read = PubsubIO.<String>read()
.subscription(StaticValueProvider.of(subscription));
- assertNull(read.getTopic());
- assertNotNull(read.getSubscription());
+ assertNull(read.getTopicProvider());
+ assertNotNull(read.getSubscriptionProvider());
assertNotNull(DisplayData.from(read));
}
@@ -136,8 +136,8 @@ public class PubsubIOTest {
String topic = "projects/project/topics/topic";
PubsubIO.Read<String> read = PubsubIO.<String>read()
.topic(StaticValueProvider.of(topic));
- assertNotNull(read.getTopic());
- assertNull(read.getSubscription());
+ assertNotNull(read.getTopicProvider());
+ assertNull(read.getSubscriptionProvider());
assertNotNull(DisplayData.from(read));
}
[9/9] beam git commit: This closes #2750
Posted by jk...@apache.org.
This closes #2750
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/14d60b26
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/14d60b26
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/14d60b26
Branch: refs/heads/master
Commit: 14d60b26e7e9b8a578038599341e66ccd99d012b
Parents: f5e3f52 8853d53
Author: Eugene Kirpichov <ki...@google.com>
Authored: Sat Apr 29 15:17:48 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 15:17:48 2017 -0700
----------------------------------------------------------------------
.../beam/examples/complete/game/GameStats.java | 6 +-
.../examples/complete/game/LeaderBoard.java | 6 +-
.../beam/runners/dataflow/DataflowRunner.java | 18 +-
.../org/apache/beam/sdk/util/PropertyNames.java | 4 +-
.../beam/sdk/io/gcp/pubsub/PubsubClient.java | 42 +-
.../sdk/io/gcp/pubsub/PubsubGrpcClient.java | 36 +-
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 508 +++++++++----------
.../sdk/io/gcp/pubsub/PubsubJsonClient.java | 36 +-
.../sdk/io/gcp/pubsub/PubsubTestClient.java | 6 +-
.../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 58 ++-
.../io/gcp/pubsub/PubsubUnboundedSource.java | 65 ++-
.../sdk/io/gcp/pubsub/PubsubClientTest.java | 50 +-
.../sdk/io/gcp/pubsub/PubsubGrpcClientTest.java | 16 +-
.../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 78 +--
.../sdk/io/gcp/pubsub/PubsubJsonClientTest.java | 14 +-
.../io/gcp/pubsub/PubsubUnboundedSinkTest.java | 10 +-
.../gcp/pubsub/PubsubUnboundedSourceTest.java | 6 +-
17 files changed, 459 insertions(+), 500 deletions(-)
----------------------------------------------------------------------
[8/9] beam git commit: Renames PubsubIO.Write builder methods to be
style guide compliant
Posted by jk...@apache.org.
Renames PubsubIO.Write builder methods to be style guide compliant
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/42c975ee
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/42c975ee
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/42c975ee
Branch: refs/heads/master
Commit: 42c975ee533a63be750da2e8de1925b42efd2cad
Parents: df6ef96
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 17:41:48 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 12 ++++++------
.../org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 10 +++++-----
2 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/42c975ee/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 5702af1..99df3c6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -742,14 +742,14 @@ public class PubsubIO {
* <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
* {@code topic} string.
*/
- public Write<T> topic(String topic) {
- return topic(StaticValueProvider.of(topic));
+ public Write<T> to(String topic) {
+ return to(StaticValueProvider.of(topic));
}
/**
* Like {@code topic()} but with a {@link ValueProvider}.
*/
- public Write<T> topic(ValueProvider<String> topic) {
+ public Write<T> to(ValueProvider<String> topic) {
return toBuilder()
.setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator()))
.build();
@@ -765,7 +765,7 @@ public class PubsubIO {
* {@link PubsubIO.Read#withTimestampLabel(String)} can be used to ensure the other source reads
* these timestamps from the appropriate attribute.
*/
- public Write<T> timestampLabel(String timestampLabel) {
+ public Write<T> withTimestampLabel(String timestampLabel) {
return toBuilder().setTimestampLabel(timestampLabel).build();
}
@@ -777,7 +777,7 @@ public class PubsubIO {
* {@link PubsubIO.Read#withIdLabel(String)} can be used to ensure that* the other source reads
* these unique identifiers from the appropriate attribute.
*/
- public Write<T> idLabel(String idLabel) {
+ public Write<T> withIdLabel(String idLabel) {
return toBuilder().setIdLabel(idLabel).build();
}
@@ -794,7 +794,7 @@ public class PubsubIO {
* function translates the input type T to a PubsubMessage object, which is used by the sink
* to separately set the PubSub message's payload and attributes.
*/
- public Write<T> withAttributes(SimpleFunction<T, PubsubMessage> formatFn) {
+ public Write<T> withFormatFn(SimpleFunction<T, PubsubMessage> formatFn) {
return toBuilder().setFormatFn(formatFn).build();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/42c975ee/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 69d989f..f896bfc 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -52,7 +52,7 @@ public class PubsubIOTest {
assertEquals("PubsubIO.Read",
PubsubIO.<String>read().fromTopic("projects/myproject/topics/mytopic").getName());
assertEquals("PubsubIO.Write",
- PubsubIO.<String>write().topic("projects/myproject/topics/mytopic").getName());
+ PubsubIO.<String>write().to("projects/myproject/topics/mytopic").getName());
}
@Test
@@ -168,9 +168,9 @@ public class PubsubIOTest {
public void testWriteDisplayData() {
String topic = "projects/project/topics/topic";
PubsubIO.Write<?> write = PubsubIO.<String>write()
- .topic(topic)
- .timestampLabel("myTimestamp")
- .idLabel("myId");
+ .to(topic)
+ .withTimestampLabel("myTimestamp")
+ .withIdLabel("myId");
DisplayData displayData = DisplayData.from(write);
@@ -183,7 +183,7 @@ public class PubsubIOTest {
@Category(ValidatesRunner.class)
public void testPrimitiveWriteDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- PubsubIO.Write<?> write = PubsubIO.<String>write().topic("projects/project/topics/topic");
+ PubsubIO.Write<?> write = PubsubIO.<String>write().to("projects/project/topics/topic");
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
assertThat("PubsubIO.Write should include the topic in its primitive display data",
[7/9] beam git commit: Renames PubsubIO.Read builder methods to be
style guide compliant
Posted by jk...@apache.org.
Renames PubsubIO.Read builder methods to be style guide compliant
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5d8fbc4c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5d8fbc4c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5d8fbc4c
Branch: refs/heads/master
Commit: 5d8fbc4c4d87f75ea84a40c2ee36531eb0eda26f
Parents: f4d0460
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 17:19:37 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700
----------------------------------------------------------------------
.../beam/examples/complete/game/GameStats.java | 2 +-
.../examples/complete/game/LeaderBoard.java | 2 +-
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 24 ++++++-------
.../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 38 ++++++++++----------
4 files changed, 33 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5d8fbc4c/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index e0048b7..d95eb06 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -253,7 +253,7 @@ public class GameStats extends LeaderBoard {
// Read Events from Pub/Sub using custom timestamps
PCollection<GameActionInfo> rawEvents = pipeline
.apply(PubsubIO.<String>read()
- .timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic())
+ .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic())
.withCoder(StringUtf8Coder.of()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
http://git-wip-us.apache.org/repos/asf/beam/blob/5d8fbc4c/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 96f4291..a87468a 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -192,7 +192,7 @@ public class LeaderBoard extends HourlyTeamScore {
// data elements, and parse the data.
PCollection<GameActionInfo> gameEvents = pipeline
.apply(PubsubIO.<String>read()
- .timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic())
+ .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic())
.withCoder(StringUtf8Coder.of()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
http://git-wip-us.apache.org/repos/asf/beam/blob/5d8fbc4c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 3c76942..20aed6d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -519,14 +519,14 @@ public class PubsubIO {
* some arbitrary portion of the data. Most likely, separate readers should
* use their own subscriptions.
*/
- public Read<T> subscription(String subscription) {
- return subscription(StaticValueProvider.of(subscription));
+ public Read<T> fromSubscription(String subscription) {
+ return fromSubscription(StaticValueProvider.of(subscription));
}
/**
* Like {@code subscription()} but with a {@link ValueProvider}.
*/
- public Read<T> subscription(ValueProvider<String> subscription) {
+ public Read<T> fromSubscription(ValueProvider<String> subscription) {
if (subscription.isAccessible()) {
// Validate.
PubsubSubscription.fromPath(subscription.get());
@@ -541,7 +541,7 @@ public class PubsubIO {
/**
* Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive
- * with {@link #subscription(String)}.
+ * with {@link #fromSubscription(String)}.
*
* <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format
* of the {@code topic} string.
@@ -550,14 +550,14 @@ public class PubsubIO {
* pipeline is started. Any data published on the topic before the pipeline is started will
* not be read by the runner.
*/
- public Read<T> topic(String topic) {
- return topic(StaticValueProvider.of(topic));
+ public Read<T> fromTopic(String topic) {
+ return fromTopic(StaticValueProvider.of(topic));
}
/**
* Like {@code topic()} but with a {@link ValueProvider}.
*/
- public Read<T> topic(ValueProvider<String> topic) {
+ public Read<T> fromTopic(ValueProvider<String> topic) {
if (topic.isAccessible()) {
// Validate.
PubsubTopic.fromPath(topic.get());
@@ -598,7 +598,7 @@ public class PubsubIO {
*
* @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
*/
- public Read<T> timestampLabel(String timestampLabel) {
+ public Read<T> withTimestampLabel(String timestampLabel) {
return toBuilder().setTimestampLabel(timestampLabel).build();
}
@@ -611,7 +611,7 @@ public class PubsubIO {
* If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will
* be delivered, and deduplication of the stream will be strictly best effort.
*/
- public Read<T> idLabel(String idLabel) {
+ public Read<T> withIdLabel(String idLabel) {
return toBuilder().setIdLabel(idLabel).build();
}
@@ -628,7 +628,7 @@ public class PubsubIO {
* A Coder for the output type T must be registered or set on the output via
* {@link PCollection#setCoder(Coder)}.
*/
- public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) {
+ public Read<T> withParseFn(SimpleFunction<PubsubMessage, T> parseFn) {
return toBuilder().setParseFn(parseFn).build();
}
@@ -760,7 +760,7 @@ public class PubsubIO {
* time classes, {@link Instant#Instant(long)} can be used to parse this value.
*
* <p>If the output from this sink is being read by another Beam pipeline, then
- * {@link PubsubIO.Read#timestampLabel(String)} can be used to ensure the other source reads
+ * {@link PubsubIO.Read#withTimestampLabel(String)} can be used to ensure the other source reads
* these timestamps from the appropriate attribute.
*/
public Write<T> timestampLabel(String timestampLabel) {
@@ -773,7 +773,7 @@ public class PubsubIO {
* opaque string.
*
* <p>If the the output from this sink is being read by another Beam pipeline, then
- * {@link PubsubIO.Read#idLabel(String)} can be used to ensure that* the other source reads
+ * {@link PubsubIO.Read#withIdLabel(String)} can be used to ensure that* the other source reads
* these unique identifiers from the appropriate attribute.
*/
public Write<T> idLabel(String idLabel) {
http://git-wip-us.apache.org/repos/asf/beam/blob/5d8fbc4c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 7fe6e26..f44fffc 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -50,19 +50,19 @@ public class PubsubIOTest {
@Test
public void testPubsubIOGetName() {
assertEquals("PubsubIO.Read",
- PubsubIO.<String>read().topic("projects/myproject/topics/mytopic").getName());
+ PubsubIO.<String>read().fromTopic("projects/myproject/topics/mytopic").getName());
assertEquals("PubsubIO.Write",
PubsubIO.<String>write().topic("projects/myproject/topics/mytopic").getName());
}
@Test
public void testTopicValidationSuccess() throws Exception {
- PubsubIO.<String>read().topic("projects/my-project/topics/abc");
- PubsubIO.<String>read().topic("projects/my-project/topics/ABC");
- PubsubIO.<String>read().topic("projects/my-project/topics/AbC-DeF");
- PubsubIO.<String>read().topic("projects/my-project/topics/AbC-1234");
- PubsubIO.<String>read().topic("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc");
- PubsubIO.<String>read().topic(new StringBuilder()
+ PubsubIO.<String>read().fromTopic("projects/my-project/topics/abc");
+ PubsubIO.<String>read().fromTopic("projects/my-project/topics/ABC");
+ PubsubIO.<String>read().fromTopic("projects/my-project/topics/AbC-DeF");
+ PubsubIO.<String>read().fromTopic("projects/my-project/topics/AbC-1234");
+ PubsubIO.<String>read().fromTopic("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc");
+ PubsubIO.<String>read().fromTopic(new StringBuilder()
.append("projects/my-project/topics/A-really-long-one-")
.append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
.append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
@@ -73,13 +73,13 @@ public class PubsubIOTest {
@Test
public void testTopicValidationBadCharacter() throws Exception {
thrown.expect(IllegalArgumentException.class);
- PubsubIO.<String>read().topic("projects/my-project/topics/abc-*-abc");
+ PubsubIO.<String>read().fromTopic("projects/my-project/topics/abc-*-abc");
}
@Test
public void testTopicValidationTooLong() throws Exception {
thrown.expect(IllegalArgumentException.class);
- PubsubIO.<String>read().topic(new StringBuilder().append
+ PubsubIO.<String>read().fromTopic(new StringBuilder().append
("projects/my-project/topics/A-really-long-one-")
.append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
.append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
@@ -93,9 +93,9 @@ public class PubsubIOTest {
String subscription = "projects/project/subscriptions/subscription";
Duration maxReadTime = Duration.standardMinutes(5);
PubsubIO.Read<String> read = PubsubIO.<String>read()
- .topic(StaticValueProvider.of(topic))
- .timestampLabel("myTimestamp")
- .idLabel("myId");
+ .fromTopic(StaticValueProvider.of(topic))
+ .withTimestampLabel("myTimestamp")
+ .withIdLabel("myId");
DisplayData displayData = DisplayData.from(read);
@@ -110,9 +110,9 @@ public class PubsubIOTest {
String subscription = "projects/project/subscriptions/subscription";
Duration maxReadTime = Duration.standardMinutes(5);
PubsubIO.Read<String> read = PubsubIO.<String>read()
- .subscription(StaticValueProvider.of(subscription))
- .timestampLabel("myTimestamp")
- .idLabel("myId");
+ .fromSubscription(StaticValueProvider.of(subscription))
+ .withTimestampLabel("myTimestamp")
+ .withIdLabel("myId");
DisplayData displayData = DisplayData.from(read);
@@ -125,7 +125,7 @@ public class PubsubIOTest {
public void testNullTopic() {
String subscription = "projects/project/subscriptions/subscription";
PubsubIO.Read<String> read = PubsubIO.<String>read()
- .subscription(StaticValueProvider.of(subscription));
+ .fromSubscription(StaticValueProvider.of(subscription));
assertNull(read.getTopicProvider());
assertNotNull(read.getSubscriptionProvider());
assertNotNull(DisplayData.from(read));
@@ -135,7 +135,7 @@ public class PubsubIOTest {
public void testNullSubscription() {
String topic = "projects/project/topics/topic";
PubsubIO.Read<String> read = PubsubIO.<String>read()
- .topic(StaticValueProvider.of(topic));
+ .fromTopic(StaticValueProvider.of(topic));
assertNotNull(read.getTopicProvider());
assertNull(read.getSubscriptionProvider());
assertNotNull(DisplayData.from(read));
@@ -149,13 +149,13 @@ public class PubsubIOTest {
PubsubIO.Read<String> read = PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
// Reading from a subscription.
- read = read.subscription("projects/project/subscriptions/subscription");
+ read = read.fromSubscription("projects/project/subscriptions/subscription");
displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
assertThat("PubsubIO.Read should include the subscription in its primitive display data",
displayData, hasItem(hasDisplayItem("subscription")));
// Reading from a topic.
- read = read.topic("projects/project/topics/topic");
+ read = read.fromTopic("projects/project/topics/topic");
displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
assertThat("PubsubIO.Read should include the topic in its primitive display data",
displayData, hasItem(hasDisplayItem("topic")));
[6/9] beam git commit: Adds PubsubIO.readStrings(), readProtos(),
readAvros()
Posted by jk...@apache.org.
Adds PubsubIO.readStrings(), readProtos(), readAvros()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/079353d5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/079353d5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/079353d5
Branch: refs/heads/master
Commit: 079353d58c65141683e4640e425ee610001e7718
Parents: 42c975e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 17:50:43 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 29 13:15:48 2017 -0700
----------------------------------------------------------------------
.../beam/examples/complete/game/GameStats.java | 6 ++---
.../examples/complete/game/LeaderBoard.java | 6 ++---
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 28 ++++++++++++++++++++
.../io/gcp/pubsub/PubsubUnboundedSource.java | 4 ++-
.../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 3 +--
5 files changed, 36 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/079353d5/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index d95eb06..d628497 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -24,7 +24,6 @@ import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
@@ -252,9 +251,8 @@ public class GameStats extends LeaderBoard {
// Read Events from Pub/Sub using custom timestamps
PCollection<GameActionInfo> rawEvents = pipeline
- .apply(PubsubIO.<String>read()
- .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic())
- .withCoder(StringUtf8Coder.of()))
+ .apply(PubsubIO.readStrings()
+ .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
// Extract username/score pairs from the event stream
http://git-wip-us.apache.org/repos/asf/beam/blob/079353d5/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index a87468a..fbffac6 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -27,7 +27,6 @@ import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
@@ -191,9 +190,8 @@ public class LeaderBoard extends HourlyTeamScore {
// Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub
// data elements, and parse the data.
PCollection<GameActionInfo> gameEvents = pipeline
- .apply(PubsubIO.<String>read()
- .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic())
- .withCoder(StringUtf8Coder.of()))
+ .apply(PubsubIO.readStrings()
+ .withTimestampLabel(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
gameEvents.apply("CalculateTeamScores",
http://git-wip-us.apache.org/repos/asf/beam/blob/079353d5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 99df3c6..9604864 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
+import com.google.protobuf.Message;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -29,8 +30,11 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
@@ -461,6 +465,30 @@ public class PubsubIO {
return new AutoValue_PubsubIO_Read.Builder<T>().build();
}
+ /**
+ * Returns A {@link PTransform} that continuously reads UTF-8 encoded strings from a Google Cloud
+ * Pub/Sub stream.
+ */
+ public static Read<String> readStrings() {
+ return PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
+ }
+
+ /**
+ * Returns A {@link PTransform} that continuously reads binary encoded protos of the given type
+ * from a Google Cloud Pub/Sub stream.
+ */
+ public static <T extends Message> Read<T> readProtos(Class<T> messageClass) {
+ return PubsubIO.<T>read().withCoder(ProtoCoder.of(messageClass));
+ }
+
+ /**
+ * Returns A {@link PTransform} that continuously reads binary encoded Avro messages of the
+ * given type from a Google Cloud Pub/Sub stream.
+ */
+ public static <T extends Message> Read<T> readAvros(Class<T> clazz) {
+ return PubsubIO.<T>read().withCoder(AvroCoder.of(clazz));
+ }
+
/** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */
public static <T> Write<T> write() {
return new AutoValue_PubsubIO_Write.Builder<T>().build();
http://git-wip-us.apache.org/repos/asf/beam/blob/079353d5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index b16b665..6392fd2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -82,7 +82,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A PTransform which streams messages from Pubsub.
+ * Users should use {@link PubsubIO#read} instead.
+ *
+ * <p>A PTransform which streams messages from Pubsub.
* <ul>
* <li>The underlying implementation in an {@link UnboundedSource} which receives messages
* in batches and hands them out one at a time.
http://git-wip-us.apache.org/repos/asf/beam/blob/079353d5/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index f896bfc..20039d4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import java.util.Set;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.UsesUnboundedPCollections;
import org.apache.beam.sdk.testing.ValidatesRunner;
@@ -146,7 +145,7 @@ public class PubsubIOTest {
public void testPrimitiveReadDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
Set<DisplayData> displayData;
- PubsubIO.Read<String> baseRead = PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
+ PubsubIO.Read<String> baseRead = PubsubIO.readStrings();
// Reading from a subscription.
PubsubIO.Read<String> read =