You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:48:01 UTC

[14/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java
deleted file mode 100644
index deed9ab..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java
+++ /dev/null
@@ -1,1048 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-import static com.google.common.base.MoreObjects.firstNonNull;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.client.util.Clock;
-import com.google.api.client.util.DateTime;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.AcknowledgeRequest;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PubsubMessage;
-import com.google.api.services.pubsub.model.PullRequest;
-import com.google.api.services.pubsub.model.PullResponse;
-import com.google.api.services.pubsub.model.ReceivedMessage;
-import com.google.api.services.pubsub.model.Subscription;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.coders.VoidCoder;
-import com.google.cloud.dataflow.sdk.options.PubsubOptions;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.util.Transport;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
-import com.google.cloud.dataflow.sdk.values.PDone;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nullable;
-
-/**
- * Read and Write {@link PTransform}s for Cloud Pub/Sub streams. These transforms create
- * and consume unbounded {@link PCollection PCollections}.
- *
- * <h3>Permissions</h3>
- * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
- * Dataflow job. Please refer to the documentation of corresponding
- * {@link PipelineRunner PipelineRunners} for more details.
- */
-public class PubsubIO {
-  private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class);
-
-  /** The default {@link Coder} used to translate to/from Cloud Pub/Sub messages. */
-  public static final Coder<String> DEFAULT_PUBSUB_CODER = StringUtf8Coder.of();
-
-  /**
-   * Project IDs must contain 6-63 lowercase letters, digits, or dashes.
-   * IDs must start with a letter and may not end with a dash.
-   * This regex isn't exact - this allows for patterns that would be rejected by
-   * the service, but this is sufficient for basic parsing of table references.
-   */
-  private static final Pattern PROJECT_ID_REGEXP =
-      Pattern.compile("[a-z][-a-z0-9:.]{4,61}[a-z0-9]");
-
-  private static final Pattern SUBSCRIPTION_REGEXP =
-      Pattern.compile("projects/([^/]+)/subscriptions/(.+)");
-
-  private static final Pattern TOPIC_REGEXP = Pattern.compile("projects/([^/]+)/topics/(.+)");
-
-  private static final Pattern V1BETA1_SUBSCRIPTION_REGEXP =
-      Pattern.compile("/subscriptions/([^/]+)/(.+)");
-
-  private static final Pattern V1BETA1_TOPIC_REGEXP = Pattern.compile("/topics/([^/]+)/(.+)");
-
-  private static final Pattern PUBSUB_NAME_REGEXP = Pattern.compile("[a-zA-Z][-._~%+a-zA-Z0-9]+");
-
-  private static final int PUBSUB_NAME_MIN_LENGTH = 3;
-  private static final int PUBSUB_NAME_MAX_LENGTH = 255;
-
-  private static final String SUBSCRIPTION_RANDOM_TEST_PREFIX = "_random/";
-  private static final String SUBSCRIPTION_STARTING_SIGNAL = "_starting_signal/";
-  private static final String TOPIC_DEV_NULL_TEST_NAME = "/topics/dev/null";
-
-  private static void validateProjectName(String project) {
-    Matcher match = PROJECT_ID_REGEXP.matcher(project);
-    if (!match.matches()) {
-      throw new IllegalArgumentException(
-          "Illegal project name specified in Pubsub subscription: " + project);
-    }
-  }
-
-  private static void validatePubsubName(String name) {
-    if (name.length() < PUBSUB_NAME_MIN_LENGTH) {
-      throw new IllegalArgumentException(
-          "Pubsub object name is shorter than 3 characters: " + name);
-    }
-    if (name.length() > PUBSUB_NAME_MAX_LENGTH) {
-      throw new IllegalArgumentException(
-          "Pubsub object name is longer than 255 characters: " + name);
-    }
-
-    if (name.startsWith("goog")) {
-      throw new IllegalArgumentException("Pubsub object name cannot start with goog: " + name);
-    }
-
-    Matcher match = PUBSUB_NAME_REGEXP.matcher(name);
-    if (!match.matches()) {
-      throw new IllegalArgumentException("Illegal Pubsub object name specified: " + name
-          + " Please see Javadoc for naming rules.");
-    }
-  }
-
-  /**
-   * Returns the {@link Instant} that corresponds to the timestamp in the supplied
-   * {@link PubsubMessage} under the specified {@code ink label}. See
-   * {@link PubsubIO.Read#timestampLabel(String)} for details about how these messages are
-   * parsed.
-   *
-   * <p>The {@link Clock} parameter is used to virtualize time for testing.
-   *
-   * @throws IllegalArgumentException if the timestamp label is provided, but there is no
-   *     corresponding attribute in the message or the value provided is not a valid timestamp
-   *     string.
-   * @see PubsubIO.Read#timestampLabel(String)
-   */
-  @VisibleForTesting
-  protected static Instant assignMessageTimestamp(
-      PubsubMessage message, @Nullable String label, Clock clock) {
-    if (label == null) {
-      return new Instant(clock.currentTimeMillis());
-    }
-
-    // Extract message attributes, defaulting to empty map if null.
-    Map<String, String> attributes = firstNonNull(
-        message.getAttributes(), ImmutableMap.<String, String>of());
-
-    String timestampStr = attributes.get(label);
-    checkArgument(timestampStr != null && !timestampStr.isEmpty(),
-        "PubSub message is missing a timestamp in label: %s", label);
-
-    long millisSinceEpoch;
-    try {
-      // Try parsing as milliseconds since epoch. Note there is no way to parse a string in
-      // RFC 3339 format here.
-      // Expected IllegalArgumentException if parsing fails; we use that to fall back to RFC 3339.
-      millisSinceEpoch = Long.parseLong(timestampStr);
-    } catch (IllegalArgumentException e) {
-      // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an IllegalArgumentException
-      // if parsing fails, and the caller should handle.
-      millisSinceEpoch = DateTime.parseRfc3339(timestampStr).getValue();
-    }
-    return new Instant(millisSinceEpoch);
-  }
-
-  /**
-   * Class representing a Cloud Pub/Sub Subscription.
-   */
-  public static class PubsubSubscription implements Serializable {
-    private enum Type { NORMAL, FAKE }
-
-    private final Type type;
-    private final String project;
-    private final String subscription;
-
-    private PubsubSubscription(Type type, String project, String subscription) {
-      this.type = type;
-      this.project = project;
-      this.subscription = subscription;
-    }
-
-    /**
-     * Creates a class representing a Pub/Sub subscription from the specified subscription path.
-     *
-     * <p>Cloud Pub/Sub subscription names should be of the form
-     * {@code projects/<project>/subscriptions/<subscription>}, where {@code <project>} is the name
-     * of the project the subscription belongs to. The {@code <subscription>} component must comply
-     * with the following requirements:
-     *
-     * <ul>
-     * <li>Can only contain lowercase letters, numbers, dashes ('-'), underscores ('_') and periods
-     * ('.').</li>
-     * <li>Must be between 3 and 255 characters.</li>
-     * <li>Must begin with a letter.</li>
-     * <li>Must end with a letter or a number.</li>
-     * <li>Cannot begin with {@code 'goog'} prefix.</li>
-     * </ul>
-     */
-    public static PubsubSubscription fromPath(String path) {
-      if (path.startsWith(SUBSCRIPTION_RANDOM_TEST_PREFIX)
-          || path.startsWith(SUBSCRIPTION_STARTING_SIGNAL)) {
-        return new PubsubSubscription(Type.FAKE, "", path);
-      }
-
-      String projectName, subscriptionName;
-
-      Matcher v1beta1Match = V1BETA1_SUBSCRIPTION_REGEXP.matcher(path);
-      if (v1beta1Match.matches()) {
-        LOG.warn("Saw subscription in v1beta1 format. Subscriptions should be in the format "
-            + "projects/<project_id>/subscriptions/<subscription_name>");
-        projectName = v1beta1Match.group(1);
-        subscriptionName = v1beta1Match.group(2);
-      } else {
-        Matcher match = SUBSCRIPTION_REGEXP.matcher(path);
-        if (!match.matches()) {
-          throw new IllegalArgumentException("Pubsub subscription is not in "
-              + "projects/<project_id>/subscriptions/<subscription_name> format: " + path);
-        }
-        projectName = match.group(1);
-        subscriptionName = match.group(2);
-      }
-
-      validateProjectName(projectName);
-      validatePubsubName(subscriptionName);
-      return new PubsubSubscription(Type.NORMAL, projectName, subscriptionName);
-    }
-
-    /**
-     * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub
-     * v1beta1 API.
-     *
-     * @deprecated the v1beta1 API for Cloud Pub/Sub is deprecated.
-     */
-    @Deprecated
-    public String asV1Beta1Path() {
-      if (type == Type.NORMAL) {
-        return "/subscriptions/" + project + "/" + subscription;
-      } else {
-        return subscription;
-      }
-    }
-
-    /**
-     * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub
-     * v1beta2 API.
-     *
-     * @deprecated the v1beta2 API for Cloud Pub/Sub is deprecated.
-     */
-    @Deprecated
-    public String asV1Beta2Path() {
-      if (type == Type.NORMAL) {
-        return "projects/" + project + "/subscriptions/" + subscription;
-      } else {
-        return subscription;
-      }
-    }
-
-    /**
-     * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub
-     * API.
-     */
-    public String asPath() {
-      if (type == Type.NORMAL) {
-        return "projects/" + project + "/subscriptions/" + subscription;
-      } else {
-        return subscription;
-      }
-    }
-  }
-
-  /**
-   * Class representing a Cloud Pub/Sub Topic.
-   */
-  public static class PubsubTopic implements Serializable {
-    private enum Type { NORMAL, FAKE }
-
-    private final Type type;
-    private final String project;
-    private final String topic;
-
-    private PubsubTopic(Type type, String project, String topic) {
-      this.type = type;
-      this.project = project;
-      this.topic = topic;
-    }
-
-    /**
-     * Creates a class representing a Cloud Pub/Sub topic from the specified topic path.
-     *
-     * <p>Cloud Pub/Sub topic names should be of the form
-     * {@code /topics/<project>/<topic>}, where {@code <project>} is the name of
-     * the publishing project. The {@code <topic>} component must comply with
-     * the following requirements:
-     *
-     * <ul>
-     * <li>Can only contain lowercase letters, numbers, dashes ('-'), underscores ('_') and periods
-     * ('.').</li>
-     * <li>Must be between 3 and 255 characters.</li>
-     * <li>Must begin with a letter.</li>
-     * <li>Must end with a letter or a number.</li>
-     * <li>Cannot begin with 'goog' prefix.</li>
-     * </ul>
-     */
-    public static PubsubTopic fromPath(String path) {
-      if (path.equals(TOPIC_DEV_NULL_TEST_NAME)) {
-        return new PubsubTopic(Type.FAKE, "", path);
-      }
-
-      String projectName, topicName;
-
-      Matcher v1beta1Match = V1BETA1_TOPIC_REGEXP.matcher(path);
-      if (v1beta1Match.matches()) {
-        LOG.warn("Saw topic in v1beta1 format.  Topics should be in the format "
-            + "projects/<project_id>/topics/<topic_name>");
-        projectName = v1beta1Match.group(1);
-        topicName = v1beta1Match.group(2);
-      } else {
-        Matcher match = TOPIC_REGEXP.matcher(path);
-        if (!match.matches()) {
-          throw new IllegalArgumentException(
-              "Pubsub topic is not in projects/<project_id>/topics/<topic_name> format: " + path);
-        }
-        projectName = match.group(1);
-        topicName = match.group(2);
-      }
-
-      validateProjectName(projectName);
-      validatePubsubName(topicName);
-      return new PubsubTopic(Type.NORMAL, projectName, topicName);
-    }
-
-    /**
-     * Returns the string representation of this topic as a path used in the Cloud Pub/Sub
-     * v1beta1 API.
-     *
-     * @deprecated the v1beta1 API for Cloud Pub/Sub is deprecated.
-     */
-    @Deprecated
-    public String asV1Beta1Path() {
-      if (type == Type.NORMAL) {
-        return "/topics/" + project + "/" + topic;
-      } else {
-        return topic;
-      }
-    }
-
-    /**
-     * Returns the string representation of this topic as a path used in the Cloud Pub/Sub
-     * v1beta2 API.
-     *
-     * @deprecated the v1beta2 API for Cloud Pub/Sub is deprecated.
-     */
-    @Deprecated
-    public String asV1Beta2Path() {
-      if (type == Type.NORMAL) {
-        return "projects/" + project + "/topics/" + topic;
-      } else {
-        return topic;
-      }
-    }
-
-    /**
-     * Returns the string representation of this topic as a path used in the Cloud Pub/Sub
-     * API.
-     */
-    public String asPath() {
-      if (type == Type.NORMAL) {
-        return "projects/" + project + "/topics/" + topic;
-      } else {
-        return topic;
-      }
-    }
-  }
-
-  /**
-   * A {@link PTransform} that continuously reads from a Cloud Pub/Sub stream and
-   * returns a {@link PCollection} of {@link String Strings} containing the items from
-   * the stream.
-   *
-   * <p>When running with a {@link PipelineRunner} that only supports bounded
-   * {@link PCollection PCollections} (such as {@link DirectPipelineRunner}),
-   * only a bounded portion of the input Pub/Sub stream can be processed. As such, either
-   * {@link Bound#maxNumRecords(int)} or {@link Bound#maxReadTime(Duration)} must be set.
-   */
-  public static class Read {
-    /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub with the specified transform
-     * name.
-     */
-    public static Bound<String> named(String name) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).named(name);
-    }
-
-    /**
-     * Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive
-     * with {@link #subscription(String)}.
-     *
-     * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format
-     * of the {@code topic} string.
-     *
-     * <p>Dataflow will start reading data published on this topic from the time the pipeline is
-     * started. Any data published on the topic before the pipeline is started will not be read by
-     * Dataflow.
-     */
-    public static Bound<String> topic(String topic) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).topic(topic);
-    }
-
-    /**
-     * Creates and returns a transform for reading from a specific Cloud Pub/Sub subscription.
-     * Mutually exclusive with {@link #topic(String)}.
-     *
-     * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format
-     * of the {@code subscription} string.
-     */
-    public static Bound<String> subscription(String subscription) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).subscription(subscription);
-    }
-
-    /**
-     * Creates and returns a transform reading from Cloud Pub/Sub where record timestamps are
-     * expected to be provided as Pub/Sub message attributes. The {@code timestampLabel}
-     * parameter specifies the name of the attribute that contains the timestamp.
-     *
-     * <p>The timestamp value is expected to be represented in the attribute as either:
-     *
-     * <ul>
-     * <li>a numerical value representing the number of milliseconds since the Unix epoch. For
-     * example, if using the Joda time classes, {@link Instant#getMillis()} returns the correct
-     * value for this attribute.
-     * <li>a String in RFC 3339 format. For example, {@code 2015-10-29T23:41:41.123Z}. The
-     * sub-second component of the timestamp is optional, and digits beyond the first three
-     * (i.e., time units smaller than milliseconds) will be ignored.
-     * </ul>
-     *
-     * <p>If {@code timestampLabel} is not provided, the system will generate record timestamps
-     * the first time it sees each record. All windowing will be done relative to these timestamps.
-     *
-     * <p>By default, windows are emitted based on an estimate of when this source is likely
-     * done producing data for a given timestamp (referred to as the Watermark; see
-     * {@link AfterWatermark} for more details). Any late data will be handled by the trigger
-     * specified with the windowing strategy &ndash; by default it will be output immediately.
-     *
-     * <p>Note that the system can guarantee that no late data will ever be seen when it assigns
-     * timestamps by arrival time (i.e. {@code timestampLabel} is not provided).
-     *
-     * @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
-     */
-    public static Bound<String> timestampLabel(String timestampLabel) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).timestampLabel(timestampLabel);
-    }
-
-    /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub where unique record
-     * identifiers are expected to be provided as Pub/Sub message attributes. The {@code idLabel}
-     * parameter specifies the attribute name. The value of the attribute can be any string
-     * that uniquely identifies this record.
-     *
-     * <p>If {@code idLabel} is not provided, Dataflow cannot guarantee that no duplicate data will
-     * be delivered on the Pub/Sub stream. In this case, deduplication of the stream will be
-     * strictly best effort.
-     */
-    public static Bound<String> idLabel(String idLabel) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).idLabel(idLabel);
-    }
-
-    /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub that uses the given
-     * {@link Coder} to decode Pub/Sub messages into a value of type {@code T}.
-     *
-     * <p>By default, uses {@link StringUtf8Coder}, which just
-     * returns the text lines as Java strings.
-     *
-     * @param <T> the type of the decoded elements, and the elements
-     * of the resulting PCollection.
-     */
-    public static <T> Bound<T> withCoder(Coder<T> coder) {
-      return new Bound<>(coder);
-    }
-
-    /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
-     * records that will be read. The transform produces a <i>bounded</i> {@link PCollection}.
-     *
-     * <p>Either this option or {@link #maxReadTime(Duration)} must be set in order to create a
-     * bounded source.
-     */
-    public static Bound<String> maxNumRecords(int maxNumRecords) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).maxNumRecords(maxNumRecords);
-    }
-
-    /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
-     * duration during which records will be read.  The transform produces a <i>bounded</i>
-     * {@link PCollection}.
-     *
-     * <p>Either this option or {@link #maxNumRecords(int)} must be set in order to create a bounded
-     * source.
-     */
-    public static Bound<String> maxReadTime(Duration maxReadTime) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).maxReadTime(maxReadTime);
-    }
-
-    /**
-     * A {@link PTransform} that reads from a Cloud Pub/Sub source and returns
-     * a unbounded {@link PCollection} containing the items from the stream.
-     */
-    public static class Bound<T> extends PTransform<PInput, PCollection<T>> {
-      /** The Cloud Pub/Sub topic to read from. */
-      @Nullable private final PubsubTopic topic;
-
-      /** The Cloud Pub/Sub subscription to read from. */
-      @Nullable private final PubsubSubscription subscription;
-
-      /** The name of the message attribute to read timestamps from. */
-      @Nullable private final String timestampLabel;
-
-      /** The name of the message attribute to read unique message IDs from. */
-      @Nullable private final String idLabel;
-
-      /** The coder used to decode each record. */
-      @Nullable private final Coder<T> coder;
-
-      /** Stop after reading this many records. */
-      private final int maxNumRecords;
-
-      /** Stop after reading for this much time. */
-      @Nullable private final Duration maxReadTime;
-
-      private Bound(Coder<T> coder) {
-        this(null, null, null, null, coder, null, 0, null);
-      }
-
-      private Bound(String name, PubsubSubscription subscription, PubsubTopic topic,
-          String timestampLabel, Coder<T> coder, String idLabel, int maxNumRecords,
-          Duration maxReadTime) {
-        super(name);
-        this.subscription = subscription;
-        this.topic = topic;
-        this.timestampLabel = timestampLabel;
-        this.coder = coder;
-        this.idLabel = idLabel;
-        this.maxNumRecords = maxNumRecords;
-        this.maxReadTime = maxReadTime;
-      }
-
-      /**
-       * Returns a transform that's like this one but with the given step name.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> named(String name) {
-        return new Bound<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
-      }
-
-      /**
-       * Returns a transform that's like this one but reading from the
-       * given subscription.
-       *
-       * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format
-       * of the {@code subscription} string.
-       *
-       * <p>Multiple readers reading from the same subscription will each receive
-       * some arbitrary portion of the data.  Most likely, separate readers should
-       * use their own subscriptions.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> subscription(String subscription) {
-        return new Bound<>(name, PubsubSubscription.fromPath(subscription), topic, timestampLabel,
-            coder, idLabel, maxNumRecords, maxReadTime);
-      }
-
-      /**
-       * Returns a transform that's like this one but that reads from the specified topic.
-       *
-       * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the
-       * format of the {@code topic} string.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> topic(String topic) {
-        return new Bound<>(name, subscription, PubsubTopic.fromPath(topic), timestampLabel, coder,
-            idLabel, maxNumRecords, maxReadTime);
-      }
-
-      /**
-       * Returns a transform that's like this one but that reads message timestamps
-       * from the given message attribute. See {@link PubsubIO.Read#timestampLabel(String)} for
-       * more details on the format of the timestamp attribute.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> timestampLabel(String timestampLabel) {
-        return new Bound<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
-      }
-
-      /**
-       * Returns a transform that's like this one but that reads unique message IDs
-       * from the given message attribute. See {@link PubsubIO.Read#idLabel(String)} for more
-       * details on the format of the ID attribute.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> idLabel(String idLabel) {
-        return new Bound<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
-      }
-
-      /**
-       * Returns a transform that's like this one but that uses the given
-       * {@link Coder} to decode each record into a value of type {@code X}.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param <X> the type of the decoded elements, and the
-       * elements of the resulting PCollection.
-       */
-      public <X> Bound<X> withCoder(Coder<X> coder) {
-        return new Bound<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
-      }
-
-      /**
-       * Returns a transform that's like this one but will only read up to the specified
-       * maximum number of records from Cloud Pub/Sub. The transform produces a <i>bounded</i>
-       * {@link PCollection}. See {@link PubsubIO.Read#maxNumRecords(int)} for more details.
-       */
-      public Bound<T> maxNumRecords(int maxNumRecords) {
-        return new Bound<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
-      }
-
-      /**
-       * Returns a transform that's like this one but will only read during the specified
-       * duration from Cloud Pub/Sub. The transform produces a <i>bounded</i> {@link PCollection}.
-       * See {@link PubsubIO.Read#maxReadTime(Duration)} for more details.
-       */
-      public Bound<T> maxReadTime(Duration maxReadTime) {
-        return new Bound<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
-      }
-
-      @Override
-      public PCollection<T> apply(PInput input) {
-        if (topic == null && subscription == null) {
-          throw new IllegalStateException("need to set either the topic or the subscription for "
-              + "a PubsubIO.Read transform");
-        }
-        if (topic != null && subscription != null) {
-          throw new IllegalStateException("Can't set both the topic and the subscription for a "
-              + "PubsubIO.Read transform");
-        }
-
-        boolean boundedOutput = getMaxNumRecords() > 0 || getMaxReadTime() != null;
-
-        if (boundedOutput) {
-          return input.getPipeline().begin()
-              .apply(Create.of((Void) null)).setCoder(VoidCoder.of())
-              .apply(ParDo.of(new PubsubReader())).setCoder(coder);
-        } else {
-          return PCollection.<T>createPrimitiveOutputInternal(
-                  input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
-              .setCoder(coder);
-        }
-      }
-
-      @Override
-      protected Coder<T> getDefaultOutputCoder() {
-        return coder;
-      }
-
-      public PubsubTopic getTopic() {
-        return topic;
-      }
-
-      public PubsubSubscription getSubscription() {
-        return subscription;
-      }
-
-      public String getTimestampLabel() {
-        return timestampLabel;
-      }
-
-      public Coder<T> getCoder() {
-        return coder;
-      }
-
-      public String getIdLabel() {
-        return idLabel;
-      }
-
-      public int getMaxNumRecords() {
-        return maxNumRecords;
-      }
-
-      public Duration getMaxReadTime() {
-        return maxReadTime;
-      }
-
-      private class PubsubReader extends DoFn<Void, T> {
-        private static final int DEFAULT_PULL_SIZE = 100;
-
-        @Override
-        public void processElement(ProcessContext c) throws IOException {
-          Pubsub pubsubClient =
-              Transport.newPubsubClient(c.getPipelineOptions().as(PubsubOptions.class))
-                  .build();
-
-          String subscription;
-          if (getSubscription() == null) {
-            String topic = getTopic().asPath();
-            String[] split = topic.split("/");
-            subscription =
-                "projects/" + split[1] + "/subscriptions/" + split[3] + "_dataflow_"
-                + new Random().nextLong();
-            Subscription subInfo = new Subscription().setAckDeadlineSeconds(60).setTopic(topic);
-            try {
-              pubsubClient.projects().subscriptions().create(subscription, subInfo).execute();
-            } catch (Exception e) {
-              throw new RuntimeException("Failed to create subscription: ", e);
-            }
-          } else {
-            subscription = getSubscription().asPath();
-          }
-
-          Instant endTime = (getMaxReadTime() == null)
-              ? new Instant(Long.MAX_VALUE) : Instant.now().plus(getMaxReadTime());
-
-          List<PubsubMessage> messages = new ArrayList<>();
-
-          Throwable finallyBlockException = null;
-          try {
-            while ((getMaxNumRecords() == 0 || messages.size() < getMaxNumRecords())
-                && Instant.now().isBefore(endTime)) {
-              PullRequest pullRequest = new PullRequest().setReturnImmediately(false);
-              if (getMaxNumRecords() > 0) {
-                pullRequest.setMaxMessages(getMaxNumRecords() - messages.size());
-              } else {
-                pullRequest.setMaxMessages(DEFAULT_PULL_SIZE);
-              }
-
-              PullResponse pullResponse =
-                  pubsubClient.projects().subscriptions().pull(subscription, pullRequest).execute();
-              List<String> ackIds = new ArrayList<>();
-              if (pullResponse.getReceivedMessages() != null) {
-                for (ReceivedMessage received : pullResponse.getReceivedMessages()) {
-                  messages.add(received.getMessage());
-                  ackIds.add(received.getAckId());
-                }
-              }
-
-              if (ackIds.size() != 0) {
-                AcknowledgeRequest ackRequest = new AcknowledgeRequest().setAckIds(ackIds);
-                pubsubClient.projects()
-                    .subscriptions()
-                    .acknowledge(subscription, ackRequest)
-                    .execute();
-              }
-            }
-          } catch (IOException e) {
-            throw new RuntimeException("Unexpected exception while reading from Pubsub: ", e);
-          } finally {
-            if (getTopic() != null) {
-              try {
-                pubsubClient.projects().subscriptions().delete(subscription).execute();
-              } catch (IOException e) {
-                finallyBlockException = new RuntimeException("Failed to delete subscription: ", e);
-                LOG.error("Failed to delete subscription: ", e);
-              }
-            }
-          }
-          if (finallyBlockException != null) {
-            Throwables.propagate(finallyBlockException);
-          }
-
-          for (PubsubMessage message : messages) {
-            c.outputWithTimestamp(
-                CoderUtils.decodeFromByteArray(getCoder(), message.decodeData()),
-                assignMessageTimestamp(message, getTimestampLabel(), Clock.SYSTEM));
-          }
-        }
-      }
-    }
-
-    /** Disallow construction of utility class. */
-    private Read() {}
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /** Disallow construction of utility class. */
-  private PubsubIO() {}
-
-  /**
-   * A {@link PTransform} that continuously writes a
-   * {@link PCollection} of {@link String Strings} to a Cloud Pub/Sub stream.
-   */
-  // TODO: Support non-String encodings.
-  public static class Write {
-    /**
-     * Creates a transform that writes to Pub/Sub with the given step name.
-     */
-    public static Bound<String> named(String name) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).named(name);
-    }
-
-    /**
-     * Creates a transform that publishes to the specified topic.
-     *
-     * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
-     * {@code topic} string.
-     */
-    public static Bound<String> topic(String topic) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).topic(topic);
-    }
-
-    /**
-     * Creates a transform that writes to Pub/Sub, adds each record's timestamp to the published
-     * messages in an attribute with the specified name. The value of the attribute will be a number
-     * representing the number of milliseconds since the Unix epoch. For example, if using the Joda
-     * time classes, {@link Instant#Instant(long)} can be used to parse this value.
-     *
-     * <p>If the output from this sink is being read by another Dataflow source, then
-     * {@link PubsubIO.Read#timestampLabel(String)} can be used to ensure the other source reads
-     * these timestamps from the appropriate attribute.
-     */
-    public static Bound<String> timestampLabel(String timestampLabel) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).timestampLabel(timestampLabel);
-    }
-
-    /**
-     * Creates a transform that writes to Pub/Sub, adding each record's unique identifier to the
-     * published messages in an attribute with the specified name. The value of the attribute is an
-     * opaque string.
-     *
-     * <p>If the the output from this sink is being read by another Dataflow source, then
-     * {@link PubsubIO.Read#idLabel(String)} can be used to ensure that* the other source reads
-     * these unique identifiers from the appropriate attribute.
-     */
-    public static Bound<String> idLabel(String idLabel) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).idLabel(idLabel);
-    }
-
-    /**
-     * Creates a transform that  uses the given {@link Coder} to encode each of the
-     * elements of the input collection into an output message.
-     *
-     * <p>By default, uses {@link StringUtf8Coder}, which writes input Java strings directly as
-     * records.
-     *
-     * @param <T> the type of the elements of the input PCollection
-     */
-    public static <T> Bound<T> withCoder(Coder<T> coder) {
-      return new Bound<>(coder);
-    }
-
-    /**
-     * A {@link PTransform} that writes an unbounded {@link PCollection} of {@link String Strings}
-     * to a Cloud Pub/Sub stream.
-     */
-    public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
-      /** The Cloud Pub/Sub topic to publish to. */
-      @Nullable private final PubsubTopic topic;
-      /** The name of the message attribute to publish message timestamps in. */
-      @Nullable private final String timestampLabel;
-      /** The name of the message attribute to publish unique message IDs in. */
-      @Nullable private final String idLabel;
-      private final Coder<T> coder;
-
-      private Bound(Coder<T> coder) {
-        this(null, null, null, null, coder);
-      }
-
-      private Bound(
-          String name, PubsubTopic topic, String timestampLabel, String idLabel, Coder<T> coder) {
-        super(name);
-        this.topic = topic;
-        this.timestampLabel = timestampLabel;
-        this.idLabel = idLabel;
-        this.coder = coder;
-      }
-
-      /**
-       * Returns a new transform that's like this one but with the specified step
-       * name.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> named(String name) {
-        return new Bound<>(name, topic, timestampLabel, idLabel, coder);
-      }
-
-      /**
-       * Returns a new transform that's like this one but that writes to the specified
-       * topic.
-       *
-       * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
-       * {@code topic} string.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> topic(String topic) {
-        return new Bound<>(name, PubsubTopic.fromPath(topic), timestampLabel, idLabel, coder);
-      }
-
-      /**
-       * Returns a new transform that's like this one but that publishes record timestamps
-       * to a message attribute with the specified name. See
-       * {@link PubsubIO.Write#timestampLabel(String)} for more details.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> timestampLabel(String timestampLabel) {
-        return new Bound<>(name, topic, timestampLabel, idLabel, coder);
-      }
-
-      /**
-       * Returns a new transform that's like this one but that publishes unique record IDs
-       * to a message attribute with the specified name. See {@link PubsubIO.Write#idLabel(String)}
-       * for more details.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> idLabel(String idLabel) {
-        return new Bound<>(name, topic, timestampLabel, idLabel, coder);
-      }
-
-      /**
-       * Returns a new transform that's like this one
-       * but that uses the given {@link Coder} to encode each of
-       * the elements of the input {@link PCollection} into an
-       * output record.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param <X> the type of the elements of the input {@link PCollection}
-       */
-      public <X> Bound<X> withCoder(Coder<X> coder) {
-        return new Bound<>(name, topic, timestampLabel, idLabel, coder);
-      }
-
-      @Override
-      public PDone apply(PCollection<T> input) {
-        if (topic == null) {
-          throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
-        }
-        input.apply(ParDo.of(new PubsubWriter()));
-        return PDone.in(input.getPipeline());
-      }
-
-      @Override
-      protected Coder<Void> getDefaultOutputCoder() {
-        return VoidCoder.of();
-      }
-
-      public PubsubTopic getTopic() {
-        return topic;
-      }
-
-      public String getTimestampLabel() {
-        return timestampLabel;
-      }
-
-      public String getIdLabel() {
-        return idLabel;
-      }
-
-      public Coder<T> getCoder() {
-        return coder;
-      }
-
-      private class PubsubWriter extends DoFn<T, Void> {
-        private static final int MAX_PUBLISH_BATCH_SIZE = 100;
-        private transient List<PubsubMessage> output;
-        private transient Pubsub pubsubClient;
-
-        @Override
-        public void startBundle(Context c) {
-          this.output = new ArrayList<>();
-          this.pubsubClient =
-              Transport.newPubsubClient(c.getPipelineOptions().as(PubsubOptions.class))
-                  .build();
-        }
-
-        @Override
-        public void processElement(ProcessContext c) throws IOException {
-          PubsubMessage message =
-              new PubsubMessage().encodeData(CoderUtils.encodeToByteArray(getCoder(), c.element()));
-          if (getTimestampLabel() != null) {
-            Map<String, String> attributes = message.getAttributes();
-            if (attributes == null) {
-              attributes = new HashMap<>();
-              message.setAttributes(attributes);
-            }
-            attributes.put(getTimestampLabel(), String.valueOf(c.timestamp().getMillis()));
-          }
-          output.add(message);
-
-          if (output.size() >= MAX_PUBLISH_BATCH_SIZE) {
-            publish();
-          }
-        }
-
-        @Override
-        public void finishBundle(Context c) throws IOException {
-          if (!output.isEmpty()) {
-            publish();
-          }
-        }
-
-        private void publish() throws IOException {
-          PublishRequest publishRequest = new PublishRequest().setMessages(output);
-          pubsubClient.projects().topics()
-              .publish(getTopic().asPath(), publishRequest)
-              .execute();
-          output.clear();
-        }
-      }
-    }
-
-    /** Disallow construction of utility class. */
-    private Write() {}
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Read.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Read.java
deleted file mode 100644
index fb103ee..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Read.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-import static com.google.cloud.dataflow.sdk.util.StringUtils.approximateSimpleName;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.SerializableUtils;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
-import com.google.cloud.dataflow.sdk.values.PInput;
-
-import org.joda.time.Duration;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link PTransform} for reading from a {@link Source}.
- *
- * <p>Usage example:
- * <pre>
- * Pipeline p = Pipeline.create();
- * p.apply(Read.from(new MySource().withFoo("foo").withBar("bar"))
- *             .named("foobar"));
- * </pre>
- */
-public class Read {
-  /**
-   * Returns a new {@code Read} {@code PTransform} builder with the given name.
-   */
-  public static Builder named(String name) {
-    return new Builder(name);
-  }
-
-  /**
-   * Returns a new {@code Read.Bounded} {@code PTransform} reading from the given
-   * {@code BoundedSource}.
-   */
-  public static <T> Bounded<T> from(BoundedSource<T> source) {
-    return new Bounded<>(null, source);
-  }
-
-  /**
-   * Returns a new {@code Read.Unbounded} {@code PTransform} reading from the given
-   * {@code UnboundedSource}.
-   */
-  public static <T> Unbounded<T> from(UnboundedSource<T, ?> source) {
-    return new Unbounded<>(null, source);
-  }
-
-  /**
-   * Helper class for building {@code Read} transforms.
-   */
-  public static class Builder {
-    private final String name;
-
-    private Builder(String name) {
-      this.name = name;
-    }
-
-    /**
-     * Returns a new {@code Read.Bounded} {@code PTransform} reading from the given
-     * {@code BoundedSource}.
-     */
-    public <T> Bounded<T> from(BoundedSource<T> source) {
-      return new Bounded<>(name, source);
-    }
-
-    /**
-     * Returns a new {@code Read.Unbounded} {@code PTransform} reading from the given
-     * {@code UnboundedSource}.
-     */
-    public <T> Unbounded<T> from(UnboundedSource<T, ?> source) {
-      return new Unbounded<>(name, source);
-    }
-  }
-
-  /**
-   * {@link PTransform} that reads from a {@link BoundedSource}.
-   */
-  public static class Bounded<T> extends PTransform<PInput, PCollection<T>> {
-    private final BoundedSource<T> source;
-
-    private Bounded(@Nullable String name, BoundedSource<T> source) {
-      super(name);
-      this.source = SerializableUtils.ensureSerializable(source);
-    }
-
-    /**
-     * Returns a new {@code Bounded} {@code PTransform} that's like this one but
-     * has the given name.
-     *
-     * <p>Does not modify this object.
-     */
-    public Bounded<T> named(String name) {
-      return new Bounded<T>(name, source);
-    }
-
-    @Override
-    protected Coder<T> getDefaultOutputCoder() {
-      return source.getDefaultOutputCoder();
-    }
-
-    @Override
-    public final PCollection<T> apply(PInput input) {
-      source.validate();
-
-      return PCollection.<T>createPrimitiveOutputInternal(input.getPipeline(),
-          WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
-          .setCoder(getDefaultOutputCoder());
-    }
-
-    /**
-     * Returns the {@code BoundedSource} used to create this {@code Read} {@code PTransform}.
-     */
-    public BoundedSource<T> getSource() {
-      return source;
-    }
-
-    @Override
-    public String getKindString() {
-      return "Read(" + approximateSimpleName(source.getClass()) + ")";
-    }
-
-    static {
-      registerDefaultTransformEvaluator();
-    }
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private static void registerDefaultTransformEvaluator() {
-      DirectPipelineRunner.registerDefaultTransformEvaluator(
-          Bounded.class,
-          new DirectPipelineRunner.TransformEvaluator<Bounded>() {
-            @Override
-            public void evaluate(
-                Bounded transform, DirectPipelineRunner.EvaluationContext context) {
-              evaluateReadHelper(transform, context);
-            }
-
-            private <T> void evaluateReadHelper(
-                Read.Bounded<T> transform, DirectPipelineRunner.EvaluationContext context) {
-              try {
-                List<DirectPipelineRunner.ValueWithMetadata<T>> output = new ArrayList<>();
-                BoundedSource<T> source = transform.getSource();
-                try (BoundedSource.BoundedReader<T> reader =
-                    source.createReader(context.getPipelineOptions())) {
-                  for (boolean available = reader.start();
-                      available;
-                      available = reader.advance()) {
-                    output.add(
-                        DirectPipelineRunner.ValueWithMetadata.of(
-                            WindowedValue.timestampedValueInGlobalWindow(
-                                reader.getCurrent(), reader.getCurrentTimestamp())));
-                  }
-                }
-                context.setPCollectionValuesWithMetadata(context.getOutput(transform), output);
-              } catch (Exception e) {
-                throw new RuntimeException(e);
-              }
-            }
-          });
-    }
-  }
-
-  /**
-   * {@link PTransform} that reads from a {@link UnboundedSource}.
-   */
-  public static class Unbounded<T> extends PTransform<PInput, PCollection<T>> {
-    private final UnboundedSource<T, ?> source;
-
-    private Unbounded(@Nullable String name, UnboundedSource<T, ?> source) {
-      super(name);
-      this.source = SerializableUtils.ensureSerializable(source);
-    }
-
-    /**
-     * Returns a new {@code Unbounded} {@code PTransform} that's like this one but
-     * has the given name.
-     *
-     * <p>Does not modify this object.
-     */
-    public Unbounded<T> named(String name) {
-      return new Unbounded<T>(name, source);
-    }
-
-    /**
-     * Returns a new {@link BoundedReadFromUnboundedSource} that reads a bounded amount
-     * of data from the given {@link UnboundedSource}.  The bound is specified as a number
-     * of records to read.
-     *
-     * <p>This may take a long time to execute if the splits of this source are slow to read
-     * records.
-     */
-    public BoundedReadFromUnboundedSource<T> withMaxNumRecords(long maxNumRecords) {
-      return new BoundedReadFromUnboundedSource<T>(source, maxNumRecords, null);
-    }
-
-    /**
-     * Returns a new {@link BoundedReadFromUnboundedSource} that reads a bounded amount
-     * of data from the given {@link UnboundedSource}.  The bound is specified as an amount
-     * of time to read for.  Each split of the source will read for this much time.
-     */
-    public BoundedReadFromUnboundedSource<T> withMaxReadTime(Duration maxReadTime) {
-      return new BoundedReadFromUnboundedSource<T>(source, Long.MAX_VALUE, maxReadTime);
-    }
-
-    @Override
-    protected Coder<T> getDefaultOutputCoder() {
-      return source.getDefaultOutputCoder();
-    }
-
-    @Override
-    public final PCollection<T> apply(PInput input) {
-      source.validate();
-
-      return PCollection.<T>createPrimitiveOutputInternal(
-          input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED);
-    }
-
-    /**
-     * Returns the {@code UnboundedSource} used to create this {@code Read} {@code PTransform}.
-     */
-    public UnboundedSource<T, ?> getSource() {
-      return source;
-    }
-
-    @Override
-    public String getKindString() {
-      return "Read(" + approximateSimpleName(source.getClass()) + ")";
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/ShardNameTemplate.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/ShardNameTemplate.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/ShardNameTemplate.java
deleted file mode 100644
index cc233f1..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/ShardNameTemplate.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-/**
- * Standard shard naming templates.
- *
- * <p>Shard naming templates are strings that may contain placeholders for
- * the shard number and shard count.  When constructing a filename for a
- * particular shard number, the upper-case letters 'S' and 'N' are replaced
- * with the 0-padded shard number and shard count respectively.
- *
- * <p>Left-padding of the numbers enables lexicographical sorting of the
- * resulting filenames.  If the shard number or count are too large for the
- * space provided in the template, then the result may no longer sort
- * lexicographically.  For example, a shard template of "S-of-N", for 200
- * shards, will result in outputs named "0-of-200", ... '10-of-200',
- * '100-of-200", etc.
- *
- * <p>Shard numbers start with 0, so the last shard number is the shard count
- * minus one.  For example, the template "-SSSSS-of-NNNNN" will be
- * instantiated as "-00000-of-01000" for the first shard (shard 0) of a
- * 1000-way sharded output.
- *
- * <p>A shard name template is typically provided along with a name prefix
- * and suffix, which allows constructing complex paths that have embedded
- * shard information.  For example, outputs in the form
- * "gs://bucket/path-01-of-99.txt" could be constructed by providing the
- * individual components:
- *
- * <pre>{@code
- *   pipeline.apply(
- *       TextIO.Write.to("gs://bucket/path")
- *                   .withShardNameTemplate("-SS-of-NN")
- *                   .withSuffix(".txt"))
- * }</pre>
- *
- * <p>In the example above, you could make parts of the output configurable
- * by users without the user having to specify all components of the output
- * name.
- *
- * <p>If a shard name template does not contain any repeating 'S', then
- * the output shard count must be 1, as otherwise the same filename would be
- * generated for multiple shards.
- */
-public class ShardNameTemplate {
-  /**
-   * Shard name containing the index and max.
-   *
-   * <p>Eg: [prefix]-00000-of-00100[suffix] and
-   * [prefix]-00001-of-00100[suffix]
-   */
-  public static final String INDEX_OF_MAX = "-SSSSS-of-NNNNN";
-
-  /**
-   * Shard is a file within a directory.
-   *
-   * <p>Eg: [prefix]/part-00000[suffix] and [prefix]/part-00001[suffix]
-   */
-  public static final String DIRECTORY_CONTAINER = "/part-SSSSS";
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Sink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Sink.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Sink.java
deleted file mode 100644
index 7845f47..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Sink.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import java.io.Serializable;
-
-/**
- * A {@code Sink} represents a resource that can be written to using the {@link Write} transform.
- *
- * <p>A parallel write to a {@code Sink} consists of three phases:
- * <ol>
- * <li>A sequential <i>initialization</i> phase (e.g., creating a temporary output directory, etc.)
- * <li>A <i>parallel write</i> phase where workers write bundles of records
- * <li>A sequential <i>finalization</i> phase (e.g., committing the writes, merging output files,
- * etc.)
- * </ol>
- *
- * <p>The {@link Write} transform can be used in a Dataflow pipeline to perform this write.
- * Specifically, a Write transform can be applied to a {@link PCollection} {@code p} by:
- *
- * <p>{@code p.apply(Write.to(new MySink()));}
- *
- * <p>Implementing a {@link Sink} and the corresponding write operations requires extending three
- * abstract classes:
- *
- * <ul>
- * <li>{@link Sink}: an immutable logical description of the location/resource to write to.
- * Depending on the type of sink, it may contain fields such as the path to an output directory
- * on a filesystem, a database table name, etc. Implementors of {@link Sink} must
- * implement two methods: {@link Sink#validate} and {@link Sink#createWriteOperation}.
- * {@link Sink#validate Validate} is called by the Write transform at pipeline creation, and should
- * validate that the Sink can be written to. The createWriteOperation method is also called at
- * pipeline creation, and should return a WriteOperation object that defines how to write to the
- * Sink. Note that implementations of Sink must be serializable and Sinks must be immutable.
- *
- * <li>{@link WriteOperation}: The WriteOperation implements the <i>initialization</i> and
- * <i>finalization</i> phases of a write. Implementors of {@link WriteOperation} must implement
- * corresponding {@link WriteOperation#initialize} and {@link WriteOperation#finalize} methods. A
- * WriteOperation must also implement {@link WriteOperation#createWriter} that creates Writers,
- * {@link WriteOperation#getWriterResultCoder} that returns a {@link Coder} for the result of a
- * parallel write, and a {@link WriteOperation#getSink} that returns the Sink that the write
- * operation corresponds to. See below for more information about these methods and restrictions on
- * their implementation.
- *
- * <li>{@link Writer}: A Writer writes a bundle of records. Writer defines four methods:
- * {@link Writer#open}, which is called once at the start of writing a bundle; {@link Writer#write},
- * which writes a single record from the bundle; {@link Writer#close}, which is called once at the
- * end of writing a bundle; and {@link Writer#getWriteOperation}, which returns the write operation
- * that the writer belongs to.
- * </ul>
- *
- * <h2>WriteOperation</h2>
- * <p>{@link WriteOperation#initialize} and {@link WriteOperation#finalize} are conceptually called
- * once: at the beginning and end of a Write transform. However, implementors must ensure that these
- * methods are idempotent, as they may be called multiple times on different machines in the case of
- * failure/retry or for redundancy.
- *
- * <p>The finalize method of WriteOperation is passed an Iterable of a writer result type. This
- * writer result type should encode the result of a write and, in most cases, some encoding of the
- * unique bundle id.
- *
- * <p>All implementations of {@link WriteOperation} must be serializable.
- *
- * <p>WriteOperation may have mutable state. For instance, {@link WriteOperation#initialize} may
- * mutate the object state. These mutations will be visible in {@link WriteOperation#createWriter}
- * and {@link WriteOperation#finalize} because the object will be serialized after initialize and
- * deserialized before these calls. However, it is not serialized again after createWriter is
- * called, as createWriter will be called within workers to create Writers for the bundles that are
- * distributed to these workers. Therefore, newWriter should not mutate the WriteOperation state (as
- * these mutations will not be visible in finalize).
- *
- * <h2>Bundle Ids:</h2>
- * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
- * event of failure/retry or for redundancy). However, exactly one of these executions will have its
- * result passed to the WriteOperation's finalize method. Each call to {@link Writer#open} is passed
- * a unique <i>bundle id</i> when it is called by the Write transform, so even redundant or retried
- * bundles will have a unique way of identifying their output.
- *
- * <p>The bundle id should be used to guarantee that a bundle's output is unique. This uniqueness
- * guarantee is important; if a bundle is to be output to a file, for example, the name of the file
- * must be unique to avoid conflicts with other Writers. The bundle id should be encoded in the
- * writer result returned by the Writer and subsequently used by the WriteOperation's finalize
- * method to identify the results of successful writes.
- *
- * <p>For example, consider the scenario where a Writer writes files containing serialized records
- * and the WriteOperation's finalization step is to merge or rename these output files. In this
- * case, a Writer may use its unique id to name its output file (to avoid conflicts) and return the
- * name of the file it wrote as its writer result. The WriteOperation will then receive an Iterable
- * of output file names that it can then merge or rename using some bundle naming scheme.
- *
- * <h2>Writer Results:</h2>
- * <p>{@link WriteOperation}s and {@link Writer}s must agree on a writer result type that will be
- * returned by a Writer after it writes a bundle. This type can be a client-defined object or an
- * existing type; {@link WriteOperation#getWriterResultCoder} should return a {@link Coder} for the
- * type.
- *
- * <p>A note about thread safety: Any use of static members or methods in Writer should be thread
- * safe, as different instances of Writer objects may be created in different threads on the same
- * worker.
- *
- * @param <T> the type that will be written to the Sink.
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public abstract class Sink<T> implements Serializable {
-  /**
-   * Ensures that the sink is valid and can be written to before the write operation begins. One
-   * should use {@link com.google.common.base.Preconditions} to implement this method.
-   */
-  public abstract void validate(PipelineOptions options);
-
-  /**
-   * Returns an instance of a {@link WriteOperation} that can write to this Sink.
-   */
-  public abstract WriteOperation<T, ?> createWriteOperation(PipelineOptions options);
-
-  /**
-   * A {@link WriteOperation} defines the process of a parallel write of objects to a Sink.
-   *
-   * <p>The {@code WriteOperation} defines how to perform initialization and finalization of a
-   * parallel write to a sink as well as how to create a {@link Sink.Writer} object that can write
-   * a bundle to the sink.
-   *
-   * <p>Since operations in Dataflow may be run multiple times for redundancy or fault-tolerance,
-   * the initialization and finalization defined by a WriteOperation <b>must be idempotent</b>.
-   *
-   * <p>{@code WriteOperation}s may be mutable; a {@code WriteOperation} is serialized after the
-   * call to {@code initialize} method and deserialized before calls to
-   * {@code createWriter} and {@code finalized}. However, it is not
-   * reserialized after {@code createWriter}, so {@code createWriter} should not mutate the
-   * state of the {@code WriteOperation}.
-   *
-   * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
-   *
-   * @param <T> The type of objects to write
-   * @param <WriteT> The result of a per-bundle write
-   */
-  public abstract static class WriteOperation<T, WriteT> implements Serializable {
-    /**
-     * Performs initialization before writing to the sink. Called before writing begins.
-     */
-    public abstract void initialize(PipelineOptions options) throws Exception;
-
-    /**
-     * Given an Iterable of results from bundle writes, performs finalization after writing and
-     * closes the sink. Called after all bundle writes are complete.
-     *
-     * <p>The results that are passed to finalize are those returned by bundles that completed
-     * successfully. Although bundles may have been run multiple times (for fault-tolerance), only
-     * one writer result will be passed to finalize for each bundle. An implementation of finalize
-     * should perform clean up of any failed and successfully retried bundles.  Note that these
-     * failed bundles will not have their writer result passed to finalize, so finalize should be
-     * capable of locating any temporary/partial output written by failed bundles.
-     *
-     * <p>A best practice is to make finalize atomic. If this is impossible given the semantics
-     * of the sink, finalize should be idempotent, as it may be called multiple times in the case of
-     * failure/retry or for redundancy.
-     *
-     * <p>Note that the iteration order of the writer results is not guaranteed to be consistent if
-     * finalize is called multiple times.
-     *
-     * @param writerResults an Iterable of results from successful bundle writes.
-     */
-    public abstract void finalize(Iterable<WriteT> writerResults, PipelineOptions options)
-        throws Exception;
-
-    /**
-     * Creates a new {@link Sink.Writer} to write a bundle of the input to the sink.
-     *
-     * <p>The bundle id that the writer will use to uniquely identify its output will be passed to
-     * {@link Writer#open}.
-     *
-     * <p>Must not mutate the state of the WriteOperation.
-     */
-    public abstract Writer<T, WriteT> createWriter(PipelineOptions options) throws Exception;
-
-    /**
-     * Returns the Sink that this write operation writes to.
-     */
-    public abstract Sink<T> getSink();
-
-    /**
-     * Returns a coder for the writer result type.
-     */
-    public Coder<WriteT> getWriterResultCoder() {
-      return null;
-    }
-  }
-
-  /**
-   * A Writer writes a bundle of elements from a PCollection to a sink. {@link Writer#open} is
-   * called before writing begins and {@link Writer#close} is called after all elements in the
-   * bundle have been written. {@link Writer#write} writes an element to the sink.
-   *
-   * <p>Note that any access to static members or methods of a Writer must be thread-safe, as
-   * multiple instances of a Writer may be instantiated in different threads on the same worker.
-   *
-   * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
-   *
-   * @param <T> The type of object to write
-   * @param <WriteT> The writer results type (e.g., the bundle's output filename, as String)
-   */
-  public abstract static class Writer<T, WriteT> {
-    /**
-     * Performs bundle initialization. For example, creates a temporary file for writing or
-     * initializes any state that will be used across calls to {@link Writer#write}.
-     *
-     * <p>The unique id that is given to open should be used to ensure that the writer's output does
-     * not interfere with the output of other Writers, as a bundle may be executed many times for
-     * fault tolerance. See {@link Sink} for more information about bundle ids.
-     */
-    public abstract void open(String uId) throws Exception;
-
-    /**
-     * Called for each value in the bundle.
-     */
-    public abstract void write(T value) throws Exception;
-
-    /**
-     * Finishes writing the bundle. Closes any resources used for writing the bundle.
-     *
-     * <p>Returns a writer result that will be used in the {@link Sink.WriteOperation}'s
-     * finalization. The result should contain some way to identify the output of this bundle (using
-     * the bundle id). {@link WriteOperation#finalize} will use the writer result to identify
-     * successful writes. See {@link Sink} for more information about bundle ids.
-     *
-     * @return the writer result
-     */
-    public abstract WriteT close() throws Exception;
-
-    /**
-     * Returns the write operation this writer belongs to.
-     */
-    public abstract WriteOperation<T, WriteT> getWriteOperation();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java
deleted file mode 100644
index 86a8e64..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.NoSuchElementException;
-
-/**
- * Base class for defining input formats and creating a {@code Source} for reading the input.
- *
- * <p>This class is not intended to be subclassed directly. Instead, to define
- * a bounded source (a source which produces a finite amount of input), subclass
- * {@link BoundedSource}; to define an unbounded source, subclass {@link UnboundedSource}.
- *
- * <p>A {@code Source} passed to a {@code Read} transform must be
- * {@code Serializable}.  This allows the {@code Source} instance
- * created in this "main program" to be sent (in serialized form) to
- * remote worker machines and reconstituted for each batch of elements
- * of the input {@code PCollection} being processed or for each source splitting
- * operation. A {@code Source} can have instance variable state, and
- * non-transient instance variable state will be serialized in the main program
- * and then deserialized on remote worker machines.
- *
- * <p>{@code Source} classes MUST be effectively immutable. The only acceptable use of
- * mutable fields is to cache the results of expensive operations, and such fields MUST be
- * marked {@code transient}.
- *
- * <p>{@code Source} objects should override {@link Object#toString}, as it will be
- * used in important error and debugging messages.
- *
- * @param <T> Type of elements read by the source.
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public abstract class Source<T> implements Serializable {
-  /**
-   * Checks that this source is valid, before it can be used in a pipeline.
-   *
-   * <p>It is recommended to use {@link com.google.common.base.Preconditions} for implementing
-   * this method.
-   */
-  public abstract void validate();
-
-  /**
-   * Returns the default {@code Coder} to use for the data read from this source.
-   */
-  public abstract Coder<T> getDefaultOutputCoder();
-
-  /**
-   * The interface that readers of custom input sources must implement.
-   *
-   * <p>This interface is deliberately distinct from {@link java.util.Iterator} because
-   * the current model tends to be easier to program and more efficient in practice
-   * for iterating over sources such as files, databases etc. (rather than pure collections).
-   *
-   * <p>Reading data from the {@link Reader} must obey the following access pattern:
-   * <ul>
-   * <li> One call to {@link #start}
-   * <ul><li>If {@link #start} returned true, any number of calls to {@code getCurrent}*
-   *   methods</ul>
-   * <li> Repeatedly, a call to {@link #advance}. This may be called regardless
-   *   of what the previous {@link #start}/{@link #advance} returned.
-   * <ul><li>If {@link #advance} returned true, any number of calls to {@code getCurrent}*
-   *   methods</ul>
-   * </ul>
-   *
-   * <p>For example, if the reader is reading a fixed set of data:
-   * <pre>
-   *   try {
-   *     for (boolean available = reader.start(); available; available = reader.advance()) {
-   *       T item = reader.getCurrent();
-   *       Instant timestamp = reader.getCurrentTimestamp();
-   *       ...
-   *     }
-   *   } finally {
-   *     reader.close();
-   *   }
-   * </pre>
-   *
-   * <p>If the set of data being read is continually growing:
-   * <pre>
-   *   try {
-   *     boolean available = reader.start();
-   *     while (true) {
-   *       if (available) {
-   *         T item = reader.getCurrent();
-   *         Instant timestamp = reader.getCurrentTimestamp();
-   *         ...
-   *         resetExponentialBackoff();
-   *       } else {
-   *         exponentialBackoff();
-   *       }
-   *       available = reader.advance();
-   *     }
-   *   } finally {
-   *     reader.close();
-   *   }
-   * </pre>
-   *
-   * <p>Note: this interface is a work-in-progress and may change.
-   *
-   * <p>All {@code Reader} functions except {@link #getCurrentSource} do not need to be thread-safe;
-   * they may only be accessed by a single thread at once. However, {@link #getCurrentSource} needs
-   * to be thread-safe, and other functions should assume that its returned value can change
-   * asynchronously.
-   */
-  public abstract static class Reader<T> implements AutoCloseable {
-    /**
-     * Initializes the reader and advances the reader to the first record.
-     *
-     * <p>This method should be called exactly once. The invocation should occur prior to calling
-     * {@link #advance} or {@link #getCurrent}. This method may perform expensive operations that
-     * are needed to initialize the reader.
-     *
-     * @return {@code true} if a record was read, {@code false} if there is no more input available.
-     */
-    public abstract boolean start() throws IOException;
-
-    /**
-     * Advances the reader to the next valid record.
-     *
-     * <p>It is an error to call this without having called {@link #start} first.
-     *
-     * @return {@code true} if a record was read, {@code false} if there is no more input available.
-     */
-    public abstract boolean advance() throws IOException;
-
-    /**
-     * Returns the value of the data item that was read by the last {@link #start} or
-     * {@link #advance} call. The returned value must be effectively immutable and remain valid
-     * indefinitely.
-     *
-     * <p>Multiple calls to this method without an intervening call to {@link #advance} should
-     * return the same result.
-     *
-     * @throws java.util.NoSuchElementException if {@link #start} was never called, or if
-     *         the last {@link #start} or {@link #advance} returned {@code false}.
-     */
-    public abstract T getCurrent() throws NoSuchElementException;
-
-    /**
-     * Returns the timestamp associated with the current data item.
-     *
-     * <p>If the source does not support timestamps, this should return
-     * {@code BoundedWindow.TIMESTAMP_MIN_VALUE}.
-     *
-     * <p>Multiple calls to this method without an intervening call to {@link #advance} should
-     * return the same result.
-     *
-     * @throws NoSuchElementException if the reader is at the beginning of the input and
-     *         {@link #start} or {@link #advance} wasn't called, or if the last {@link #start} or
-     *         {@link #advance} returned {@code false}.
-     */
-    public abstract Instant getCurrentTimestamp() throws NoSuchElementException;
-
-    /**
-     * Closes the reader. The reader cannot be used after this method is called.
-     */
-    @Override
-    public abstract void close() throws IOException;
-
-    /**
-     * Returns a {@code Source} describing the same input that this {@code Reader} currently reads
-     * (including items already read).
-     *
-     * <p>Usually, an implementation will simply return the immutable {@link Source} object from
-     * which the current {@link Reader} was constructed, or delegate to the base class.
-     * However, when using or implementing this method on a {@link BoundedSource.BoundedReader},
-     * special considerations apply, see documentation for
-     * {@link BoundedSource.BoundedReader#getCurrentSource}.
-     */
-    public abstract Source<T> getCurrentSource();
-  }
-}