You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:48:01 UTC
[14/74] [partial] incubator-beam git commit: Rename
com/google/cloud/dataflow->org/apache/beam
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java
deleted file mode 100644
index deed9ab..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java
+++ /dev/null
@@ -1,1048 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-import static com.google.common.base.MoreObjects.firstNonNull;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.client.util.Clock;
-import com.google.api.client.util.DateTime;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.AcknowledgeRequest;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PubsubMessage;
-import com.google.api.services.pubsub.model.PullRequest;
-import com.google.api.services.pubsub.model.PullResponse;
-import com.google.api.services.pubsub.model.ReceivedMessage;
-import com.google.api.services.pubsub.model.Subscription;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.coders.VoidCoder;
-import com.google.cloud.dataflow.sdk.options.PubsubOptions;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.util.Transport;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
-import com.google.cloud.dataflow.sdk.values.PDone;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nullable;
-
-/**
- * Read and Write {@link PTransform}s for Cloud Pub/Sub streams. These transforms create
- * and consume unbounded {@link PCollection PCollections}.
- *
- * <h3>Permissions</h3>
- * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
- * Dataflow job. Please refer to the documentation of corresponding
- * {@link PipelineRunner PipelineRunners} for more details.
- */
-public class PubsubIO {
- private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class);
-
- /** The default {@link Coder} used to translate to/from Cloud Pub/Sub messages. */
- public static final Coder<String> DEFAULT_PUBSUB_CODER = StringUtf8Coder.of();
-
- /**
- * Project IDs must contain 6-63 lowercase letters, digits, or dashes.
- * IDs must start with a letter and may not end with a dash.
- * This regex isn't exact - this allows for patterns that would be rejected by
- * the service, but this is sufficient for basic parsing of table references.
- */
- private static final Pattern PROJECT_ID_REGEXP =
- Pattern.compile("[a-z][-a-z0-9:.]{4,61}[a-z0-9]");
-
- private static final Pattern SUBSCRIPTION_REGEXP =
- Pattern.compile("projects/([^/]+)/subscriptions/(.+)");
-
- private static final Pattern TOPIC_REGEXP = Pattern.compile("projects/([^/]+)/topics/(.+)");
-
- private static final Pattern V1BETA1_SUBSCRIPTION_REGEXP =
- Pattern.compile("/subscriptions/([^/]+)/(.+)");
-
- private static final Pattern V1BETA1_TOPIC_REGEXP = Pattern.compile("/topics/([^/]+)/(.+)");
-
- private static final Pattern PUBSUB_NAME_REGEXP = Pattern.compile("[a-zA-Z][-._~%+a-zA-Z0-9]+");
-
- private static final int PUBSUB_NAME_MIN_LENGTH = 3;
- private static final int PUBSUB_NAME_MAX_LENGTH = 255;
-
- private static final String SUBSCRIPTION_RANDOM_TEST_PREFIX = "_random/";
- private static final String SUBSCRIPTION_STARTING_SIGNAL = "_starting_signal/";
- private static final String TOPIC_DEV_NULL_TEST_NAME = "/topics/dev/null";
-
- private static void validateProjectName(String project) {
- Matcher match = PROJECT_ID_REGEXP.matcher(project);
- if (!match.matches()) {
- throw new IllegalArgumentException(
- "Illegal project name specified in Pubsub subscription: " + project);
- }
- }
-
- private static void validatePubsubName(String name) {
- if (name.length() < PUBSUB_NAME_MIN_LENGTH) {
- throw new IllegalArgumentException(
- "Pubsub object name is shorter than 3 characters: " + name);
- }
- if (name.length() > PUBSUB_NAME_MAX_LENGTH) {
- throw new IllegalArgumentException(
- "Pubsub object name is longer than 255 characters: " + name);
- }
-
- if (name.startsWith("goog")) {
- throw new IllegalArgumentException("Pubsub object name cannot start with goog: " + name);
- }
-
- Matcher match = PUBSUB_NAME_REGEXP.matcher(name);
- if (!match.matches()) {
- throw new IllegalArgumentException("Illegal Pubsub object name specified: " + name
- + " Please see Javadoc for naming rules.");
- }
- }
-
- /**
- * Returns the {@link Instant} that corresponds to the timestamp in the supplied
- * {@link PubsubMessage} under the specified {@code ink label}. See
- * {@link PubsubIO.Read#timestampLabel(String)} for details about how these messages are
- * parsed.
- *
- * <p>The {@link Clock} parameter is used to virtualize time for testing.
- *
- * @throws IllegalArgumentException if the timestamp label is provided, but there is no
- * corresponding attribute in the message or the value provided is not a valid timestamp
- * string.
- * @see PubsubIO.Read#timestampLabel(String)
- */
- @VisibleForTesting
- protected static Instant assignMessageTimestamp(
- PubsubMessage message, @Nullable String label, Clock clock) {
- if (label == null) {
- return new Instant(clock.currentTimeMillis());
- }
-
- // Extract message attributes, defaulting to empty map if null.
- Map<String, String> attributes = firstNonNull(
- message.getAttributes(), ImmutableMap.<String, String>of());
-
- String timestampStr = attributes.get(label);
- checkArgument(timestampStr != null && !timestampStr.isEmpty(),
- "PubSub message is missing a timestamp in label: %s", label);
-
- long millisSinceEpoch;
- try {
- // Try parsing as milliseconds since epoch. Note there is no way to parse a string in
- // RFC 3339 format here.
- // Expected IllegalArgumentException if parsing fails; we use that to fall back to RFC 3339.
- millisSinceEpoch = Long.parseLong(timestampStr);
- } catch (IllegalArgumentException e) {
- // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an IllegalArgumentException
- // if parsing fails, and the caller should handle.
- millisSinceEpoch = DateTime.parseRfc3339(timestampStr).getValue();
- }
- return new Instant(millisSinceEpoch);
- }
-
- /**
- * Class representing a Cloud Pub/Sub Subscription.
- */
- public static class PubsubSubscription implements Serializable {
- private enum Type { NORMAL, FAKE }
-
- private final Type type;
- private final String project;
- private final String subscription;
-
- private PubsubSubscription(Type type, String project, String subscription) {
- this.type = type;
- this.project = project;
- this.subscription = subscription;
- }
-
- /**
- * Creates a class representing a Pub/Sub subscription from the specified subscription path.
- *
- * <p>Cloud Pub/Sub subscription names should be of the form
- * {@code projects/<project>/subscriptions/<subscription>}, where {@code <project>} is the name
- * of the project the subscription belongs to. The {@code <subscription>} component must comply
- * with the following requirements:
- *
- * <ul>
- * <li>Can only contain lowercase letters, numbers, dashes ('-'), underscores ('_') and periods
- * ('.').</li>
- * <li>Must be between 3 and 255 characters.</li>
- * <li>Must begin with a letter.</li>
- * <li>Must end with a letter or a number.</li>
- * <li>Cannot begin with {@code 'goog'} prefix.</li>
- * </ul>
- */
- public static PubsubSubscription fromPath(String path) {
- if (path.startsWith(SUBSCRIPTION_RANDOM_TEST_PREFIX)
- || path.startsWith(SUBSCRIPTION_STARTING_SIGNAL)) {
- return new PubsubSubscription(Type.FAKE, "", path);
- }
-
- String projectName, subscriptionName;
-
- Matcher v1beta1Match = V1BETA1_SUBSCRIPTION_REGEXP.matcher(path);
- if (v1beta1Match.matches()) {
- LOG.warn("Saw subscription in v1beta1 format. Subscriptions should be in the format "
- + "projects/<project_id>/subscriptions/<subscription_name>");
- projectName = v1beta1Match.group(1);
- subscriptionName = v1beta1Match.group(2);
- } else {
- Matcher match = SUBSCRIPTION_REGEXP.matcher(path);
- if (!match.matches()) {
- throw new IllegalArgumentException("Pubsub subscription is not in "
- + "projects/<project_id>/subscriptions/<subscription_name> format: " + path);
- }
- projectName = match.group(1);
- subscriptionName = match.group(2);
- }
-
- validateProjectName(projectName);
- validatePubsubName(subscriptionName);
- return new PubsubSubscription(Type.NORMAL, projectName, subscriptionName);
- }
-
- /**
- * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub
- * v1beta1 API.
- *
- * @deprecated the v1beta1 API for Cloud Pub/Sub is deprecated.
- */
- @Deprecated
- public String asV1Beta1Path() {
- if (type == Type.NORMAL) {
- return "/subscriptions/" + project + "/" + subscription;
- } else {
- return subscription;
- }
- }
-
- /**
- * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub
- * v1beta2 API.
- *
- * @deprecated the v1beta2 API for Cloud Pub/Sub is deprecated.
- */
- @Deprecated
- public String asV1Beta2Path() {
- if (type == Type.NORMAL) {
- return "projects/" + project + "/subscriptions/" + subscription;
- } else {
- return subscription;
- }
- }
-
- /**
- * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub
- * API.
- */
- public String asPath() {
- if (type == Type.NORMAL) {
- return "projects/" + project + "/subscriptions/" + subscription;
- } else {
- return subscription;
- }
- }
- }
-
- /**
- * Class representing a Cloud Pub/Sub Topic.
- */
- public static class PubsubTopic implements Serializable {
- private enum Type { NORMAL, FAKE }
-
- private final Type type;
- private final String project;
- private final String topic;
-
- private PubsubTopic(Type type, String project, String topic) {
- this.type = type;
- this.project = project;
- this.topic = topic;
- }
-
- /**
- * Creates a class representing a Cloud Pub/Sub topic from the specified topic path.
- *
- * <p>Cloud Pub/Sub topic names should be of the form
- * {@code /topics/<project>/<topic>}, where {@code <project>} is the name of
- * the publishing project. The {@code <topic>} component must comply with
- * the following requirements:
- *
- * <ul>
- * <li>Can only contain lowercase letters, numbers, dashes ('-'), underscores ('_') and periods
- * ('.').</li>
- * <li>Must be between 3 and 255 characters.</li>
- * <li>Must begin with a letter.</li>
- * <li>Must end with a letter or a number.</li>
- * <li>Cannot begin with 'goog' prefix.</li>
- * </ul>
- */
- public static PubsubTopic fromPath(String path) {
- if (path.equals(TOPIC_DEV_NULL_TEST_NAME)) {
- return new PubsubTopic(Type.FAKE, "", path);
- }
-
- String projectName, topicName;
-
- Matcher v1beta1Match = V1BETA1_TOPIC_REGEXP.matcher(path);
- if (v1beta1Match.matches()) {
- LOG.warn("Saw topic in v1beta1 format. Topics should be in the format "
- + "projects/<project_id>/topics/<topic_name>");
- projectName = v1beta1Match.group(1);
- topicName = v1beta1Match.group(2);
- } else {
- Matcher match = TOPIC_REGEXP.matcher(path);
- if (!match.matches()) {
- throw new IllegalArgumentException(
- "Pubsub topic is not in projects/<project_id>/topics/<topic_name> format: " + path);
- }
- projectName = match.group(1);
- topicName = match.group(2);
- }
-
- validateProjectName(projectName);
- validatePubsubName(topicName);
- return new PubsubTopic(Type.NORMAL, projectName, topicName);
- }
-
- /**
- * Returns the string representation of this topic as a path used in the Cloud Pub/Sub
- * v1beta1 API.
- *
- * @deprecated the v1beta1 API for Cloud Pub/Sub is deprecated.
- */
- @Deprecated
- public String asV1Beta1Path() {
- if (type == Type.NORMAL) {
- return "/topics/" + project + "/" + topic;
- } else {
- return topic;
- }
- }
-
- /**
- * Returns the string representation of this topic as a path used in the Cloud Pub/Sub
- * v1beta2 API.
- *
- * @deprecated the v1beta2 API for Cloud Pub/Sub is deprecated.
- */
- @Deprecated
- public String asV1Beta2Path() {
- if (type == Type.NORMAL) {
- return "projects/" + project + "/topics/" + topic;
- } else {
- return topic;
- }
- }
-
- /**
- * Returns the string representation of this topic as a path used in the Cloud Pub/Sub
- * API.
- */
- public String asPath() {
- if (type == Type.NORMAL) {
- return "projects/" + project + "/topics/" + topic;
- } else {
- return topic;
- }
- }
- }
-
- /**
- * A {@link PTransform} that continuously reads from a Cloud Pub/Sub stream and
- * returns a {@link PCollection} of {@link String Strings} containing the items from
- * the stream.
- *
- * <p>When running with a {@link PipelineRunner} that only supports bounded
- * {@link PCollection PCollections} (such as {@link DirectPipelineRunner}),
- * only a bounded portion of the input Pub/Sub stream can be processed. As such, either
- * {@link Bound#maxNumRecords(int)} or {@link Bound#maxReadTime(Duration)} must be set.
- */
- public static class Read {
- /**
- * Creates and returns a transform for reading from Cloud Pub/Sub with the specified transform
- * name.
- */
- public static Bound<String> named(String name) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).named(name);
- }
-
- /**
- * Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive
- * with {@link #subscription(String)}.
- *
- * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format
- * of the {@code topic} string.
- *
- * <p>Dataflow will start reading data published on this topic from the time the pipeline is
- * started. Any data published on the topic before the pipeline is started will not be read by
- * Dataflow.
- */
- public static Bound<String> topic(String topic) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).topic(topic);
- }
-
- /**
- * Creates and returns a transform for reading from a specific Cloud Pub/Sub subscription.
- * Mutually exclusive with {@link #topic(String)}.
- *
- * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format
- * of the {@code subscription} string.
- */
- public static Bound<String> subscription(String subscription) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).subscription(subscription);
- }
-
- /**
- * Creates and returns a transform reading from Cloud Pub/Sub where record timestamps are
- * expected to be provided as Pub/Sub message attributes. The {@code timestampLabel}
- * parameter specifies the name of the attribute that contains the timestamp.
- *
- * <p>The timestamp value is expected to be represented in the attribute as either:
- *
- * <ul>
- * <li>a numerical value representing the number of milliseconds since the Unix epoch. For
- * example, if using the Joda time classes, {@link Instant#getMillis()} returns the correct
- * value for this attribute.
- * <li>a String in RFC 3339 format. For example, {@code 2015-10-29T23:41:41.123Z}. The
- * sub-second component of the timestamp is optional, and digits beyond the first three
- * (i.e., time units smaller than milliseconds) will be ignored.
- * </ul>
- *
- * <p>If {@code timestampLabel} is not provided, the system will generate record timestamps
- * the first time it sees each record. All windowing will be done relative to these timestamps.
- *
- * <p>By default, windows are emitted based on an estimate of when this source is likely
- * done producing data for a given timestamp (referred to as the Watermark; see
- * {@link AfterWatermark} for more details). Any late data will be handled by the trigger
- * specified with the windowing strategy – by default it will be output immediately.
- *
- * <p>Note that the system can guarantee that no late data will ever be seen when it assigns
- * timestamps by arrival time (i.e. {@code timestampLabel} is not provided).
- *
- * @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
- */
- public static Bound<String> timestampLabel(String timestampLabel) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).timestampLabel(timestampLabel);
- }
-
- /**
- * Creates and returns a transform for reading from Cloud Pub/Sub where unique record
- * identifiers are expected to be provided as Pub/Sub message attributes. The {@code idLabel}
- * parameter specifies the attribute name. The value of the attribute can be any string
- * that uniquely identifies this record.
- *
- * <p>If {@code idLabel} is not provided, Dataflow cannot guarantee that no duplicate data will
- * be delivered on the Pub/Sub stream. In this case, deduplication of the stream will be
- * strictly best effort.
- */
- public static Bound<String> idLabel(String idLabel) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).idLabel(idLabel);
- }
-
- /**
- * Creates and returns a transform for reading from Cloud Pub/Sub that uses the given
- * {@link Coder} to decode Pub/Sub messages into a value of type {@code T}.
- *
- * <p>By default, uses {@link StringUtf8Coder}, which just
- * returns the text lines as Java strings.
- *
- * @param <T> the type of the decoded elements, and the elements
- * of the resulting PCollection.
- */
- public static <T> Bound<T> withCoder(Coder<T> coder) {
- return new Bound<>(coder);
- }
-
- /**
- * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
- * records that will be read. The transform produces a <i>bounded</i> {@link PCollection}.
- *
- * <p>Either this option or {@link #maxReadTime(Duration)} must be set in order to create a
- * bounded source.
- */
- public static Bound<String> maxNumRecords(int maxNumRecords) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).maxNumRecords(maxNumRecords);
- }
-
- /**
- * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
- * duration during which records will be read. The transform produces a <i>bounded</i>
- * {@link PCollection}.
- *
- * <p>Either this option or {@link #maxNumRecords(int)} must be set in order to create a bounded
- * source.
- */
- public static Bound<String> maxReadTime(Duration maxReadTime) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).maxReadTime(maxReadTime);
- }
-
- /**
- * A {@link PTransform} that reads from a Cloud Pub/Sub source and returns
- * a unbounded {@link PCollection} containing the items from the stream.
- */
- public static class Bound<T> extends PTransform<PInput, PCollection<T>> {
- /** The Cloud Pub/Sub topic to read from. */
- @Nullable private final PubsubTopic topic;
-
- /** The Cloud Pub/Sub subscription to read from. */
- @Nullable private final PubsubSubscription subscription;
-
- /** The name of the message attribute to read timestamps from. */
- @Nullable private final String timestampLabel;
-
- /** The name of the message attribute to read unique message IDs from. */
- @Nullable private final String idLabel;
-
- /** The coder used to decode each record. */
- @Nullable private final Coder<T> coder;
-
- /** Stop after reading this many records. */
- private final int maxNumRecords;
-
- /** Stop after reading for this much time. */
- @Nullable private final Duration maxReadTime;
-
- private Bound(Coder<T> coder) {
- this(null, null, null, null, coder, null, 0, null);
- }
-
- private Bound(String name, PubsubSubscription subscription, PubsubTopic topic,
- String timestampLabel, Coder<T> coder, String idLabel, int maxNumRecords,
- Duration maxReadTime) {
- super(name);
- this.subscription = subscription;
- this.topic = topic;
- this.timestampLabel = timestampLabel;
- this.coder = coder;
- this.idLabel = idLabel;
- this.maxNumRecords = maxNumRecords;
- this.maxReadTime = maxReadTime;
- }
-
- /**
- * Returns a transform that's like this one but with the given step name.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> named(String name) {
- return new Bound<>(
- name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
- }
-
- /**
- * Returns a transform that's like this one but reading from the
- * given subscription.
- *
- * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format
- * of the {@code subscription} string.
- *
- * <p>Multiple readers reading from the same subscription will each receive
- * some arbitrary portion of the data. Most likely, separate readers should
- * use their own subscriptions.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> subscription(String subscription) {
- return new Bound<>(name, PubsubSubscription.fromPath(subscription), topic, timestampLabel,
- coder, idLabel, maxNumRecords, maxReadTime);
- }
-
- /**
- * Returns a transform that's like this one but that reads from the specified topic.
- *
- * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the
- * format of the {@code topic} string.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> topic(String topic) {
- return new Bound<>(name, subscription, PubsubTopic.fromPath(topic), timestampLabel, coder,
- idLabel, maxNumRecords, maxReadTime);
- }
-
- /**
- * Returns a transform that's like this one but that reads message timestamps
- * from the given message attribute. See {@link PubsubIO.Read#timestampLabel(String)} for
- * more details on the format of the timestamp attribute.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> timestampLabel(String timestampLabel) {
- return new Bound<>(
- name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
- }
-
- /**
- * Returns a transform that's like this one but that reads unique message IDs
- * from the given message attribute. See {@link PubsubIO.Read#idLabel(String)} for more
- * details on the format of the ID attribute.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> idLabel(String idLabel) {
- return new Bound<>(
- name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
- }
-
- /**
- * Returns a transform that's like this one but that uses the given
- * {@link Coder} to decode each record into a value of type {@code X}.
- *
- * <p>Does not modify this object.
- *
- * @param <X> the type of the decoded elements, and the
- * elements of the resulting PCollection.
- */
- public <X> Bound<X> withCoder(Coder<X> coder) {
- return new Bound<>(
- name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
- }
-
- /**
- * Returns a transform that's like this one but will only read up to the specified
- * maximum number of records from Cloud Pub/Sub. The transform produces a <i>bounded</i>
- * {@link PCollection}. See {@link PubsubIO.Read#maxNumRecords(int)} for more details.
- */
- public Bound<T> maxNumRecords(int maxNumRecords) {
- return new Bound<>(
- name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
- }
-
- /**
- * Returns a transform that's like this one but will only read during the specified
- * duration from Cloud Pub/Sub. The transform produces a <i>bounded</i> {@link PCollection}.
- * See {@link PubsubIO.Read#maxReadTime(Duration)} for more details.
- */
- public Bound<T> maxReadTime(Duration maxReadTime) {
- return new Bound<>(
- name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
- }
-
- @Override
- public PCollection<T> apply(PInput input) {
- if (topic == null && subscription == null) {
- throw new IllegalStateException("need to set either the topic or the subscription for "
- + "a PubsubIO.Read transform");
- }
- if (topic != null && subscription != null) {
- throw new IllegalStateException("Can't set both the topic and the subscription for a "
- + "PubsubIO.Read transform");
- }
-
- boolean boundedOutput = getMaxNumRecords() > 0 || getMaxReadTime() != null;
-
- if (boundedOutput) {
- return input.getPipeline().begin()
- .apply(Create.of((Void) null)).setCoder(VoidCoder.of())
- .apply(ParDo.of(new PubsubReader())).setCoder(coder);
- } else {
- return PCollection.<T>createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
- .setCoder(coder);
- }
- }
-
- @Override
- protected Coder<T> getDefaultOutputCoder() {
- return coder;
- }
-
- public PubsubTopic getTopic() {
- return topic;
- }
-
- public PubsubSubscription getSubscription() {
- return subscription;
- }
-
- public String getTimestampLabel() {
- return timestampLabel;
- }
-
- public Coder<T> getCoder() {
- return coder;
- }
-
- public String getIdLabel() {
- return idLabel;
- }
-
- public int getMaxNumRecords() {
- return maxNumRecords;
- }
-
- public Duration getMaxReadTime() {
- return maxReadTime;
- }
-
- private class PubsubReader extends DoFn<Void, T> {
- private static final int DEFAULT_PULL_SIZE = 100;
-
- @Override
- public void processElement(ProcessContext c) throws IOException {
- Pubsub pubsubClient =
- Transport.newPubsubClient(c.getPipelineOptions().as(PubsubOptions.class))
- .build();
-
- String subscription;
- if (getSubscription() == null) {
- String topic = getTopic().asPath();
- String[] split = topic.split("/");
- subscription =
- "projects/" + split[1] + "/subscriptions/" + split[3] + "_dataflow_"
- + new Random().nextLong();
- Subscription subInfo = new Subscription().setAckDeadlineSeconds(60).setTopic(topic);
- try {
- pubsubClient.projects().subscriptions().create(subscription, subInfo).execute();
- } catch (Exception e) {
- throw new RuntimeException("Failed to create subscription: ", e);
- }
- } else {
- subscription = getSubscription().asPath();
- }
-
- Instant endTime = (getMaxReadTime() == null)
- ? new Instant(Long.MAX_VALUE) : Instant.now().plus(getMaxReadTime());
-
- List<PubsubMessage> messages = new ArrayList<>();
-
- Throwable finallyBlockException = null;
- try {
- while ((getMaxNumRecords() == 0 || messages.size() < getMaxNumRecords())
- && Instant.now().isBefore(endTime)) {
- PullRequest pullRequest = new PullRequest().setReturnImmediately(false);
- if (getMaxNumRecords() > 0) {
- pullRequest.setMaxMessages(getMaxNumRecords() - messages.size());
- } else {
- pullRequest.setMaxMessages(DEFAULT_PULL_SIZE);
- }
-
- PullResponse pullResponse =
- pubsubClient.projects().subscriptions().pull(subscription, pullRequest).execute();
- List<String> ackIds = new ArrayList<>();
- if (pullResponse.getReceivedMessages() != null) {
- for (ReceivedMessage received : pullResponse.getReceivedMessages()) {
- messages.add(received.getMessage());
- ackIds.add(received.getAckId());
- }
- }
-
- if (ackIds.size() != 0) {
- AcknowledgeRequest ackRequest = new AcknowledgeRequest().setAckIds(ackIds);
- pubsubClient.projects()
- .subscriptions()
- .acknowledge(subscription, ackRequest)
- .execute();
- }
- }
- } catch (IOException e) {
- throw new RuntimeException("Unexpected exception while reading from Pubsub: ", e);
- } finally {
- if (getTopic() != null) {
- try {
- pubsubClient.projects().subscriptions().delete(subscription).execute();
- } catch (IOException e) {
- finallyBlockException = new RuntimeException("Failed to delete subscription: ", e);
- LOG.error("Failed to delete subscription: ", e);
- }
- }
- }
- if (finallyBlockException != null) {
- Throwables.propagate(finallyBlockException);
- }
-
- for (PubsubMessage message : messages) {
- c.outputWithTimestamp(
- CoderUtils.decodeFromByteArray(getCoder(), message.decodeData()),
- assignMessageTimestamp(message, getTimestampLabel(), Clock.SYSTEM));
- }
- }
- }
- }
-
- /** Disallow construction of utility class. */
- private Read() {}
- }
-
-
- /////////////////////////////////////////////////////////////////////////////
-
- /** Disallow construction of utility class. */
- private PubsubIO() {}
-
- /**
- * A {@link PTransform} that continuously writes a
- * {@link PCollection} of {@link String Strings} to a Cloud Pub/Sub stream.
- */
- // TODO: Support non-String encodings.
- public static class Write {
- /**
- * Creates a transform that writes to Pub/Sub with the given step name.
- */
- public static Bound<String> named(String name) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).named(name);
- }
-
- /**
- * Creates a transform that publishes to the specified topic.
- *
- * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
- * {@code topic} string.
- */
- public static Bound<String> topic(String topic) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).topic(topic);
- }
-
- /**
- * Creates a transform that writes to Pub/Sub, adds each record's timestamp to the published
- * messages in an attribute with the specified name. The value of the attribute will be a number
- * representing the number of milliseconds since the Unix epoch. For example, if using the Joda
- * time classes, {@link Instant#Instant(long)} can be used to parse this value.
- *
- * <p>If the output from this sink is being read by another Dataflow source, then
- * {@link PubsubIO.Read#timestampLabel(String)} can be used to ensure the other source reads
- * these timestamps from the appropriate attribute.
- */
- public static Bound<String> timestampLabel(String timestampLabel) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).timestampLabel(timestampLabel);
- }
-
- /**
- * Creates a transform that writes to Pub/Sub, adding each record's unique identifier to the
- * published messages in an attribute with the specified name. The value of the attribute is an
- * opaque string.
- *
- * <p>If the the output from this sink is being read by another Dataflow source, then
- * {@link PubsubIO.Read#idLabel(String)} can be used to ensure that* the other source reads
- * these unique identifiers from the appropriate attribute.
- */
- public static Bound<String> idLabel(String idLabel) {
- return new Bound<>(DEFAULT_PUBSUB_CODER).idLabel(idLabel);
- }
-
- /**
- * Creates a transform that uses the given {@link Coder} to encode each of the
- * elements of the input collection into an output message.
- *
- * <p>By default, uses {@link StringUtf8Coder}, which writes input Java strings directly as
- * records.
- *
- * @param <T> the type of the elements of the input PCollection
- */
- public static <T> Bound<T> withCoder(Coder<T> coder) {
- return new Bound<>(coder);
- }
-
- /**
- * A {@link PTransform} that writes an unbounded {@link PCollection} of {@link String Strings}
- * to a Cloud Pub/Sub stream.
- */
- public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
- /** The Cloud Pub/Sub topic to publish to. */
- @Nullable private final PubsubTopic topic;
- /** The name of the message attribute to publish message timestamps in. */
- @Nullable private final String timestampLabel;
- /** The name of the message attribute to publish unique message IDs in. */
- @Nullable private final String idLabel;
- private final Coder<T> coder;
-
- private Bound(Coder<T> coder) {
- this(null, null, null, null, coder);
- }
-
- private Bound(
- String name, PubsubTopic topic, String timestampLabel, String idLabel, Coder<T> coder) {
- super(name);
- this.topic = topic;
- this.timestampLabel = timestampLabel;
- this.idLabel = idLabel;
- this.coder = coder;
- }
-
- /**
- * Returns a new transform that's like this one but with the specified step
- * name.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> named(String name) {
- return new Bound<>(name, topic, timestampLabel, idLabel, coder);
- }
-
- /**
- * Returns a new transform that's like this one but that writes to the specified
- * topic.
- *
- * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
- * {@code topic} string.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> topic(String topic) {
- return new Bound<>(name, PubsubTopic.fromPath(topic), timestampLabel, idLabel, coder);
- }
-
- /**
- * Returns a new transform that's like this one but that publishes record timestamps
- * to a message attribute with the specified name. See
- * {@link PubsubIO.Write#timestampLabel(String)} for more details.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> timestampLabel(String timestampLabel) {
- return new Bound<>(name, topic, timestampLabel, idLabel, coder);
- }
-
- /**
- * Returns a new transform that's like this one but that publishes unique record IDs
- * to a message attribute with the specified name. See {@link PubsubIO.Write#idLabel(String)}
- * for more details.
- *
- * <p>Does not modify this object.
- */
- public Bound<T> idLabel(String idLabel) {
- return new Bound<>(name, topic, timestampLabel, idLabel, coder);
- }
-
- /**
- * Returns a new transform that's like this one
- * but that uses the given {@link Coder} to encode each of
- * the elements of the input {@link PCollection} into an
- * output record.
- *
- * <p>Does not modify this object.
- *
- * @param <X> the type of the elements of the input {@link PCollection}
- */
- public <X> Bound<X> withCoder(Coder<X> coder) {
- return new Bound<>(name, topic, timestampLabel, idLabel, coder);
- }
-
- @Override
- public PDone apply(PCollection<T> input) {
- if (topic == null) {
- throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
- }
- input.apply(ParDo.of(new PubsubWriter()));
- return PDone.in(input.getPipeline());
- }
-
- @Override
- protected Coder<Void> getDefaultOutputCoder() {
- return VoidCoder.of();
- }
-
- public PubsubTopic getTopic() {
- return topic;
- }
-
- public String getTimestampLabel() {
- return timestampLabel;
- }
-
- public String getIdLabel() {
- return idLabel;
- }
-
- public Coder<T> getCoder() {
- return coder;
- }
-
- private class PubsubWriter extends DoFn<T, Void> {
- private static final int MAX_PUBLISH_BATCH_SIZE = 100;
- private transient List<PubsubMessage> output;
- private transient Pubsub pubsubClient;
-
- @Override
- public void startBundle(Context c) {
- this.output = new ArrayList<>();
- this.pubsubClient =
- Transport.newPubsubClient(c.getPipelineOptions().as(PubsubOptions.class))
- .build();
- }
-
- @Override
- public void processElement(ProcessContext c) throws IOException {
- PubsubMessage message =
- new PubsubMessage().encodeData(CoderUtils.encodeToByteArray(getCoder(), c.element()));
- if (getTimestampLabel() != null) {
- Map<String, String> attributes = message.getAttributes();
- if (attributes == null) {
- attributes = new HashMap<>();
- message.setAttributes(attributes);
- }
- attributes.put(getTimestampLabel(), String.valueOf(c.timestamp().getMillis()));
- }
- output.add(message);
-
- if (output.size() >= MAX_PUBLISH_BATCH_SIZE) {
- publish();
- }
- }
-
- @Override
- public void finishBundle(Context c) throws IOException {
- if (!output.isEmpty()) {
- publish();
- }
- }
-
- private void publish() throws IOException {
- PublishRequest publishRequest = new PublishRequest().setMessages(output);
- pubsubClient.projects().topics()
- .publish(getTopic().asPath(), publishRequest)
- .execute();
- output.clear();
- }
- }
- }
-
- /** Disallow construction of utility class. */
- private Write() {}
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Read.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Read.java
deleted file mode 100644
index fb103ee..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Read.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-import static com.google.cloud.dataflow.sdk.util.StringUtils.approximateSimpleName;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.SerializableUtils;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
-import com.google.cloud.dataflow.sdk.values.PInput;
-
-import org.joda.time.Duration;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link PTransform} for reading from a {@link Source}.
- *
- * <p>Usage example:
- * <pre>
- * Pipeline p = Pipeline.create();
- * p.apply(Read.from(new MySource().withFoo("foo").withBar("bar"))
- * .named("foobar"));
- * </pre>
- */
-public class Read {
- /**
- * Returns a new {@code Read} {@code PTransform} builder with the given name.
- */
- public static Builder named(String name) {
- return new Builder(name);
- }
-
- /**
- * Returns a new {@code Read.Bounded} {@code PTransform} reading from the given
- * {@code BoundedSource}.
- */
- public static <T> Bounded<T> from(BoundedSource<T> source) {
- return new Bounded<>(null, source);
- }
-
- /**
- * Returns a new {@code Read.Unbounded} {@code PTransform} reading from the given
- * {@code UnboundedSource}.
- */
- public static <T> Unbounded<T> from(UnboundedSource<T, ?> source) {
- return new Unbounded<>(null, source);
- }
-
- /**
- * Helper class for building {@code Read} transforms.
- */
- public static class Builder {
- private final String name;
-
- private Builder(String name) {
- this.name = name;
- }
-
- /**
- * Returns a new {@code Read.Bounded} {@code PTransform} reading from the given
- * {@code BoundedSource}.
- */
- public <T> Bounded<T> from(BoundedSource<T> source) {
- return new Bounded<>(name, source);
- }
-
- /**
- * Returns a new {@code Read.Unbounded} {@code PTransform} reading from the given
- * {@code UnboundedSource}.
- */
- public <T> Unbounded<T> from(UnboundedSource<T, ?> source) {
- return new Unbounded<>(name, source);
- }
- }
-
- /**
- * {@link PTransform} that reads from a {@link BoundedSource}.
- */
- public static class Bounded<T> extends PTransform<PInput, PCollection<T>> {
- private final BoundedSource<T> source;
-
- private Bounded(@Nullable String name, BoundedSource<T> source) {
- super(name);
- this.source = SerializableUtils.ensureSerializable(source);
- }
-
- /**
- * Returns a new {@code Bounded} {@code PTransform} that's like this one but
- * has the given name.
- *
- * <p>Does not modify this object.
- */
- public Bounded<T> named(String name) {
- return new Bounded<T>(name, source);
- }
-
- @Override
- protected Coder<T> getDefaultOutputCoder() {
- return source.getDefaultOutputCoder();
- }
-
- @Override
- public final PCollection<T> apply(PInput input) {
- source.validate();
-
- return PCollection.<T>createPrimitiveOutputInternal(input.getPipeline(),
- WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
- .setCoder(getDefaultOutputCoder());
- }
-
- /**
- * Returns the {@code BoundedSource} used to create this {@code Read} {@code PTransform}.
- */
- public BoundedSource<T> getSource() {
- return source;
- }
-
- @Override
- public String getKindString() {
- return "Read(" + approximateSimpleName(source.getClass()) + ")";
- }
-
- static {
- registerDefaultTransformEvaluator();
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private static void registerDefaultTransformEvaluator() {
- DirectPipelineRunner.registerDefaultTransformEvaluator(
- Bounded.class,
- new DirectPipelineRunner.TransformEvaluator<Bounded>() {
- @Override
- public void evaluate(
- Bounded transform, DirectPipelineRunner.EvaluationContext context) {
- evaluateReadHelper(transform, context);
- }
-
- private <T> void evaluateReadHelper(
- Read.Bounded<T> transform, DirectPipelineRunner.EvaluationContext context) {
- try {
- List<DirectPipelineRunner.ValueWithMetadata<T>> output = new ArrayList<>();
- BoundedSource<T> source = transform.getSource();
- try (BoundedSource.BoundedReader<T> reader =
- source.createReader(context.getPipelineOptions())) {
- for (boolean available = reader.start();
- available;
- available = reader.advance()) {
- output.add(
- DirectPipelineRunner.ValueWithMetadata.of(
- WindowedValue.timestampedValueInGlobalWindow(
- reader.getCurrent(), reader.getCurrentTimestamp())));
- }
- }
- context.setPCollectionValuesWithMetadata(context.getOutput(transform), output);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- });
- }
- }
-
- /**
- * {@link PTransform} that reads from a {@link UnboundedSource}.
- */
- public static class Unbounded<T> extends PTransform<PInput, PCollection<T>> {
- private final UnboundedSource<T, ?> source;
-
- private Unbounded(@Nullable String name, UnboundedSource<T, ?> source) {
- super(name);
- this.source = SerializableUtils.ensureSerializable(source);
- }
-
- /**
- * Returns a new {@code Unbounded} {@code PTransform} that's like this one but
- * has the given name.
- *
- * <p>Does not modify this object.
- */
- public Unbounded<T> named(String name) {
- return new Unbounded<T>(name, source);
- }
-
- /**
- * Returns a new {@link BoundedReadFromUnboundedSource} that reads a bounded amount
- * of data from the given {@link UnboundedSource}. The bound is specified as a number
- * of records to read.
- *
- * <p>This may take a long time to execute if the splits of this source are slow to read
- * records.
- */
- public BoundedReadFromUnboundedSource<T> withMaxNumRecords(long maxNumRecords) {
- return new BoundedReadFromUnboundedSource<T>(source, maxNumRecords, null);
- }
-
- /**
- * Returns a new {@link BoundedReadFromUnboundedSource} that reads a bounded amount
- * of data from the given {@link UnboundedSource}. The bound is specified as an amount
- * of time to read for. Each split of the source will read for this much time.
- */
- public BoundedReadFromUnboundedSource<T> withMaxReadTime(Duration maxReadTime) {
- return new BoundedReadFromUnboundedSource<T>(source, Long.MAX_VALUE, maxReadTime);
- }
-
- @Override
- protected Coder<T> getDefaultOutputCoder() {
- return source.getDefaultOutputCoder();
- }
-
- @Override
- public final PCollection<T> apply(PInput input) {
- source.validate();
-
- return PCollection.<T>createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED);
- }
-
- /**
- * Returns the {@code UnboundedSource} used to create this {@code Read} {@code PTransform}.
- */
- public UnboundedSource<T, ?> getSource() {
- return source;
- }
-
- @Override
- public String getKindString() {
- return "Read(" + approximateSimpleName(source.getClass()) + ")";
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/ShardNameTemplate.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/ShardNameTemplate.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/ShardNameTemplate.java
deleted file mode 100644
index cc233f1..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/ShardNameTemplate.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-/**
- * Standard shard naming templates.
- *
- * <p>Shard naming templates are strings that may contain placeholders for
- * the shard number and shard count. When constructing a filename for a
- * particular shard number, the upper-case letters 'S' and 'N' are replaced
- * with the 0-padded shard number and shard count respectively.
- *
- * <p>Left-padding of the numbers enables lexicographical sorting of the
- * resulting filenames. If the shard number or count are too large for the
- * space provided in the template, then the result may no longer sort
- * lexicographically. For example, a shard template of "S-of-N", for 200
- * shards, will result in outputs named "0-of-200", ... '10-of-200',
- * '100-of-200", etc.
- *
- * <p>Shard numbers start with 0, so the last shard number is the shard count
- * minus one. For example, the template "-SSSSS-of-NNNNN" will be
- * instantiated as "-00000-of-01000" for the first shard (shard 0) of a
- * 1000-way sharded output.
- *
- * <p>A shard name template is typically provided along with a name prefix
- * and suffix, which allows constructing complex paths that have embedded
- * shard information. For example, outputs in the form
- * "gs://bucket/path-01-of-99.txt" could be constructed by providing the
- * individual components:
- *
- * <pre>{@code
- * pipeline.apply(
- * TextIO.Write.to("gs://bucket/path")
- * .withShardNameTemplate("-SS-of-NN")
- * .withSuffix(".txt"))
- * }</pre>
- *
- * <p>In the example above, you could make parts of the output configurable
- * by users without the user having to specify all components of the output
- * name.
- *
- * <p>If a shard name template does not contain any repeating 'S', then
- * the output shard count must be 1, as otherwise the same filename would be
- * generated for multiple shards.
- */
-public class ShardNameTemplate {
- /**
- * Shard name containing the index and max.
- *
- * <p>Eg: [prefix]-00000-of-00100[suffix] and
- * [prefix]-00001-of-00100[suffix]
- */
- public static final String INDEX_OF_MAX = "-SSSSS-of-NNNNN";
-
- /**
- * Shard is a file within a directory.
- *
- * <p>Eg: [prefix]/part-00000[suffix] and [prefix]/part-00001[suffix]
- */
- public static final String DIRECTORY_CONTAINER = "/part-SSSSS";
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Sink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Sink.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Sink.java
deleted file mode 100644
index 7845f47..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Sink.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import java.io.Serializable;
-
-/**
- * A {@code Sink} represents a resource that can be written to using the {@link Write} transform.
- *
- * <p>A parallel write to a {@code Sink} consists of three phases:
- * <ol>
- * <li>A sequential <i>initialization</i> phase (e.g., creating a temporary output directory, etc.)
- * <li>A <i>parallel write</i> phase where workers write bundles of records
- * <li>A sequential <i>finalization</i> phase (e.g., committing the writes, merging output files,
- * etc.)
- * </ol>
- *
- * <p>The {@link Write} transform can be used in a Dataflow pipeline to perform this write.
- * Specifically, a Write transform can be applied to a {@link PCollection} {@code p} by:
- *
- * <p>{@code p.apply(Write.to(new MySink()));}
- *
- * <p>Implementing a {@link Sink} and the corresponding write operations requires extending three
- * abstract classes:
- *
- * <ul>
- * <li>{@link Sink}: an immutable logical description of the location/resource to write to.
- * Depending on the type of sink, it may contain fields such as the path to an output directory
- * on a filesystem, a database table name, etc. Implementors of {@link Sink} must
- * implement two methods: {@link Sink#validate} and {@link Sink#createWriteOperation}.
- * {@link Sink#validate Validate} is called by the Write transform at pipeline creation, and should
- * validate that the Sink can be written to. The createWriteOperation method is also called at
- * pipeline creation, and should return a WriteOperation object that defines how to write to the
- * Sink. Note that implementations of Sink must be serializable and Sinks must be immutable.
- *
- * <li>{@link WriteOperation}: The WriteOperation implements the <i>initialization</i> and
- * <i>finalization</i> phases of a write. Implementors of {@link WriteOperation} must implement
- * corresponding {@link WriteOperation#initialize} and {@link WriteOperation#finalize} methods. A
- * WriteOperation must also implement {@link WriteOperation#createWriter} that creates Writers,
- * {@link WriteOperation#getWriterResultCoder} that returns a {@link Coder} for the result of a
- * parallel write, and a {@link WriteOperation#getSink} that returns the Sink that the write
- * operation corresponds to. See below for more information about these methods and restrictions on
- * their implementation.
- *
- * <li>{@link Writer}: A Writer writes a bundle of records. Writer defines four methods:
- * {@link Writer#open}, which is called once at the start of writing a bundle; {@link Writer#write},
- * which writes a single record from the bundle; {@link Writer#close}, which is called once at the
- * end of writing a bundle; and {@link Writer#getWriteOperation}, which returns the write operation
- * that the writer belongs to.
- * </ul>
- *
- * <h2>WriteOperation</h2>
- * <p>{@link WriteOperation#initialize} and {@link WriteOperation#finalize} are conceptually called
- * once: at the beginning and end of a Write transform. However, implementors must ensure that these
- * methods are idempotent, as they may be called multiple times on different machines in the case of
- * failure/retry or for redundancy.
- *
- * <p>The finalize method of WriteOperation is passed an Iterable of a writer result type. This
- * writer result type should encode the result of a write and, in most cases, some encoding of the
- * unique bundle id.
- *
- * <p>All implementations of {@link WriteOperation} must be serializable.
- *
- * <p>WriteOperation may have mutable state. For instance, {@link WriteOperation#initialize} may
- * mutate the object state. These mutations will be visible in {@link WriteOperation#createWriter}
- * and {@link WriteOperation#finalize} because the object will be serialized after initialize and
- * deserialized before these calls. However, it is not serialized again after createWriter is
- * called, as createWriter will be called within workers to create Writers for the bundles that are
- * distributed to these workers. Therefore, newWriter should not mutate the WriteOperation state (as
- * these mutations will not be visible in finalize).
- *
- * <h2>Bundle Ids:</h2>
- * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
- * event of failure/retry or for redundancy). However, exactly one of these executions will have its
- * result passed to the WriteOperation's finalize method. Each call to {@link Writer#open} is passed
- * a unique <i>bundle id</i> when it is called by the Write transform, so even redundant or retried
- * bundles will have a unique way of identifying their output.
- *
- * <p>The bundle id should be used to guarantee that a bundle's output is unique. This uniqueness
- * guarantee is important; if a bundle is to be output to a file, for example, the name of the file
- * must be unique to avoid conflicts with other Writers. The bundle id should be encoded in the
- * writer result returned by the Writer and subsequently used by the WriteOperation's finalize
- * method to identify the results of successful writes.
- *
- * <p>For example, consider the scenario where a Writer writes files containing serialized records
- * and the WriteOperation's finalization step is to merge or rename these output files. In this
- * case, a Writer may use its unique id to name its output file (to avoid conflicts) and return the
- * name of the file it wrote as its writer result. The WriteOperation will then receive an Iterable
- * of output file names that it can then merge or rename using some bundle naming scheme.
- *
- * <h2>Writer Results:</h2>
- * <p>{@link WriteOperation}s and {@link Writer}s must agree on a writer result type that will be
- * returned by a Writer after it writes a bundle. This type can be a client-defined object or an
- * existing type; {@link WriteOperation#getWriterResultCoder} should return a {@link Coder} for the
- * type.
- *
- * <p>A note about thread safety: Any use of static members or methods in Writer should be thread
- * safe, as different instances of Writer objects may be created in different threads on the same
- * worker.
- *
- * @param <T> the type that will be written to the Sink.
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public abstract class Sink<T> implements Serializable {
- /**
- * Ensures that the sink is valid and can be written to before the write operation begins. One
- * should use {@link com.google.common.base.Preconditions} to implement this method.
- */
- public abstract void validate(PipelineOptions options);
-
- /**
- * Returns an instance of a {@link WriteOperation} that can write to this Sink.
- */
- public abstract WriteOperation<T, ?> createWriteOperation(PipelineOptions options);
-
- /**
- * A {@link WriteOperation} defines the process of a parallel write of objects to a Sink.
- *
- * <p>The {@code WriteOperation} defines how to perform initialization and finalization of a
- * parallel write to a sink as well as how to create a {@link Sink.Writer} object that can write
- * a bundle to the sink.
- *
- * <p>Since operations in Dataflow may be run multiple times for redundancy or fault-tolerance,
- * the initialization and finalization defined by a WriteOperation <b>must be idempotent</b>.
- *
- * <p>{@code WriteOperation}s may be mutable; a {@code WriteOperation} is serialized after the
- * call to {@code initialize} method and deserialized before calls to
- * {@code createWriter} and {@code finalized}. However, it is not
- * reserialized after {@code createWriter}, so {@code createWriter} should not mutate the
- * state of the {@code WriteOperation}.
- *
- * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
- *
- * @param <T> The type of objects to write
- * @param <WriteT> The result of a per-bundle write
- */
- public abstract static class WriteOperation<T, WriteT> implements Serializable {
- /**
- * Performs initialization before writing to the sink. Called before writing begins.
- */
- public abstract void initialize(PipelineOptions options) throws Exception;
-
- /**
- * Given an Iterable of results from bundle writes, performs finalization after writing and
- * closes the sink. Called after all bundle writes are complete.
- *
- * <p>The results that are passed to finalize are those returned by bundles that completed
- * successfully. Although bundles may have been run multiple times (for fault-tolerance), only
- * one writer result will be passed to finalize for each bundle. An implementation of finalize
- * should perform clean up of any failed and successfully retried bundles. Note that these
- * failed bundles will not have their writer result passed to finalize, so finalize should be
- * capable of locating any temporary/partial output written by failed bundles.
- *
- * <p>A best practice is to make finalize atomic. If this is impossible given the semantics
- * of the sink, finalize should be idempotent, as it may be called multiple times in the case of
- * failure/retry or for redundancy.
- *
- * <p>Note that the iteration order of the writer results is not guaranteed to be consistent if
- * finalize is called multiple times.
- *
- * @param writerResults an Iterable of results from successful bundle writes.
- */
- public abstract void finalize(Iterable<WriteT> writerResults, PipelineOptions options)
- throws Exception;
-
- /**
- * Creates a new {@link Sink.Writer} to write a bundle of the input to the sink.
- *
- * <p>The bundle id that the writer will use to uniquely identify its output will be passed to
- * {@link Writer#open}.
- *
- * <p>Must not mutate the state of the WriteOperation.
- */
- public abstract Writer<T, WriteT> createWriter(PipelineOptions options) throws Exception;
-
- /**
- * Returns the Sink that this write operation writes to.
- */
- public abstract Sink<T> getSink();
-
- /**
- * Returns a coder for the writer result type.
- */
- public Coder<WriteT> getWriterResultCoder() {
- return null;
- }
- }
-
- /**
- * A Writer writes a bundle of elements from a PCollection to a sink. {@link Writer#open} is
- * called before writing begins and {@link Writer#close} is called after all elements in the
- * bundle have been written. {@link Writer#write} writes an element to the sink.
- *
- * <p>Note that any access to static members or methods of a Writer must be thread-safe, as
- * multiple instances of a Writer may be instantiated in different threads on the same worker.
- *
- * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
- *
- * @param <T> The type of object to write
- * @param <WriteT> The writer results type (e.g., the bundle's output filename, as String)
- */
- public abstract static class Writer<T, WriteT> {
- /**
- * Performs bundle initialization. For example, creates a temporary file for writing or
- * initializes any state that will be used across calls to {@link Writer#write}.
- *
- * <p>The unique id that is given to open should be used to ensure that the writer's output does
- * not interfere with the output of other Writers, as a bundle may be executed many times for
- * fault tolerance. See {@link Sink} for more information about bundle ids.
- */
- public abstract void open(String uId) throws Exception;
-
- /**
- * Called for each value in the bundle.
- */
- public abstract void write(T value) throws Exception;
-
- /**
- * Finishes writing the bundle. Closes any resources used for writing the bundle.
- *
- * <p>Returns a writer result that will be used in the {@link Sink.WriteOperation}'s
- * finalization. The result should contain some way to identify the output of this bundle (using
- * the bundle id). {@link WriteOperation#finalize} will use the writer result to identify
- * successful writes. See {@link Sink} for more information about bundle ids.
- *
- * @return the writer result
- */
- public abstract WriteT close() throws Exception;
-
- /**
- * Returns the write operation this writer belongs to.
- */
- public abstract WriteOperation<T, WriteT> getWriteOperation();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java
deleted file mode 100644
index 86a8e64..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.NoSuchElementException;
-
-/**
- * Base class for defining input formats and creating a {@code Source} for reading the input.
- *
- * <p>This class is not intended to be subclassed directly. Instead, to define
- * a bounded source (a source which produces a finite amount of input), subclass
- * {@link BoundedSource}; to define an unbounded source, subclass {@link UnboundedSource}.
- *
- * <p>A {@code Source} passed to a {@code Read} transform must be
- * {@code Serializable}. This allows the {@code Source} instance
- * created in this "main program" to be sent (in serialized form) to
- * remote worker machines and reconstituted for each batch of elements
- * of the input {@code PCollection} being processed or for each source splitting
- * operation. A {@code Source} can have instance variable state, and
- * non-transient instance variable state will be serialized in the main program
- * and then deserialized on remote worker machines.
- *
- * <p>{@code Source} classes MUST be effectively immutable. The only acceptable use of
- * mutable fields is to cache the results of expensive operations, and such fields MUST be
- * marked {@code transient}.
- *
- * <p>{@code Source} objects should override {@link Object#toString}, as it will be
- * used in important error and debugging messages.
- *
- * @param <T> Type of elements read by the source.
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public abstract class Source<T> implements Serializable {
- /**
- * Checks that this source is valid, before it can be used in a pipeline.
- *
- * <p>It is recommended to use {@link com.google.common.base.Preconditions} for implementing
- * this method.
- */
- public abstract void validate();
-
- /**
- * Returns the default {@code Coder} to use for the data read from this source.
- */
- public abstract Coder<T> getDefaultOutputCoder();
-
- /**
- * The interface that readers of custom input sources must implement.
- *
- * <p>This interface is deliberately distinct from {@link java.util.Iterator} because
- * the current model tends to be easier to program and more efficient in practice
- * for iterating over sources such as files, databases etc. (rather than pure collections).
- *
- * <p>Reading data from the {@link Reader} must obey the following access pattern:
- * <ul>
- * <li> One call to {@link #start}
- * <ul><li>If {@link #start} returned true, any number of calls to {@code getCurrent}*
- * methods</ul>
- * <li> Repeatedly, a call to {@link #advance}. This may be called regardless
- * of what the previous {@link #start}/{@link #advance} returned.
- * <ul><li>If {@link #advance} returned true, any number of calls to {@code getCurrent}*
- * methods</ul>
- * </ul>
- *
- * <p>For example, if the reader is reading a fixed set of data:
- * <pre>
- * try {
- * for (boolean available = reader.start(); available; available = reader.advance()) {
- * T item = reader.getCurrent();
- * Instant timestamp = reader.getCurrentTimestamp();
- * ...
- * }
- * } finally {
- * reader.close();
- * }
- * </pre>
- *
- * <p>If the set of data being read is continually growing:
- * <pre>
- * try {
- * boolean available = reader.start();
- * while (true) {
- * if (available) {
- * T item = reader.getCurrent();
- * Instant timestamp = reader.getCurrentTimestamp();
- * ...
- * resetExponentialBackoff();
- * } else {
- * exponentialBackoff();
- * }
- * available = reader.advance();
- * }
- * } finally {
- * reader.close();
- * }
- * </pre>
- *
- * <p>Note: this interface is a work-in-progress and may change.
- *
- * <p>All {@code Reader} functions except {@link #getCurrentSource} do not need to be thread-safe;
- * they may only be accessed by a single thread at once. However, {@link #getCurrentSource} needs
- * to be thread-safe, and other functions should assume that its returned value can change
- * asynchronously.
- */
- public abstract static class Reader<T> implements AutoCloseable {
- /**
- * Initializes the reader and advances the reader to the first record.
- *
- * <p>This method should be called exactly once. The invocation should occur prior to calling
- * {@link #advance} or {@link #getCurrent}. This method may perform expensive operations that
- * are needed to initialize the reader.
- *
- * @return {@code true} if a record was read, {@code false} if there is no more input available.
- */
- public abstract boolean start() throws IOException;
-
- /**
- * Advances the reader to the next valid record.
- *
- * <p>It is an error to call this without having called {@link #start} first.
- *
- * @return {@code true} if a record was read, {@code false} if there is no more input available.
- */
- public abstract boolean advance() throws IOException;
-
- /**
- * Returns the value of the data item that was read by the last {@link #start} or
- * {@link #advance} call. The returned value must be effectively immutable and remain valid
- * indefinitely.
- *
- * <p>Multiple calls to this method without an intervening call to {@link #advance} should
- * return the same result.
- *
- * @throws java.util.NoSuchElementException if {@link #start} was never called, or if
- * the last {@link #start} or {@link #advance} returned {@code false}.
- */
- public abstract T getCurrent() throws NoSuchElementException;
-
- /**
- * Returns the timestamp associated with the current data item.
- *
- * <p>If the source does not support timestamps, this should return
- * {@code BoundedWindow.TIMESTAMP_MIN_VALUE}.
- *
- * <p>Multiple calls to this method without an intervening call to {@link #advance} should
- * return the same result.
- *
- * @throws NoSuchElementException if the reader is at the beginning of the input and
- * {@link #start} or {@link #advance} wasn't called, or if the last {@link #start} or
- * {@link #advance} returned {@code false}.
- */
- public abstract Instant getCurrentTimestamp() throws NoSuchElementException;
-
- /**
- * Closes the reader. The reader cannot be used after this method is called.
- */
- @Override
- public abstract void close() throws IOException;
-
- /**
- * Returns a {@code Source} describing the same input that this {@code Reader} currently reads
- * (including items already read).
- *
- * <p>Usually, an implementation will simply return the immutable {@link Source} object from
- * which the current {@link Reader} was constructed, or delegate to the base class.
- * However, when using or implementing this method on a {@link BoundedSource.BoundedReader},
- * special considerations apply, see documentation for
- * {@link BoundedSource.BoundedReader#getCurrentSource}.
- */
- public abstract Source<T> getCurrentSource();
- }
-}