You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/04/12 19:57:03 UTC

[32/50] [abbrv] beam git commit: [BEAM-1722] Move PubsubIO into the google-cloud-platform module

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
deleted file mode 100644
index 4a6ddac..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
+++ /dev/null
@@ -1,424 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.auth.Credentials;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Timestamp;
-import com.google.pubsub.v1.AcknowledgeRequest;
-import com.google.pubsub.v1.DeleteSubscriptionRequest;
-import com.google.pubsub.v1.DeleteTopicRequest;
-import com.google.pubsub.v1.GetSubscriptionRequest;
-import com.google.pubsub.v1.ListSubscriptionsRequest;
-import com.google.pubsub.v1.ListSubscriptionsResponse;
-import com.google.pubsub.v1.ListTopicsRequest;
-import com.google.pubsub.v1.ListTopicsResponse;
-import com.google.pubsub.v1.ModifyAckDeadlineRequest;
-import com.google.pubsub.v1.PublishRequest;
-import com.google.pubsub.v1.PublishResponse;
-import com.google.pubsub.v1.PublisherGrpc;
-import com.google.pubsub.v1.PublisherGrpc.PublisherBlockingStub;
-import com.google.pubsub.v1.PubsubMessage;
-import com.google.pubsub.v1.PullRequest;
-import com.google.pubsub.v1.PullResponse;
-import com.google.pubsub.v1.ReceivedMessage;
-import com.google.pubsub.v1.SubscriberGrpc;
-import com.google.pubsub.v1.SubscriberGrpc.SubscriberBlockingStub;
-import com.google.pubsub.v1.Subscription;
-import com.google.pubsub.v1.Topic;
-import io.grpc.Channel;
-import io.grpc.ClientInterceptors;
-import io.grpc.ManagedChannel;
-import io.grpc.auth.ClientAuthInterceptor;
-import io.grpc.netty.GrpcSslContexts;
-import io.grpc.netty.NegotiationType;
-import io.grpc.netty.NettyChannelBuilder;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.GcpOptions;
-import org.apache.beam.sdk.options.PubsubOptions;
-
-/**
- * A helper class for talking to Pubsub via grpc.
- *
- * <p>CAUTION: Currently uses the application default credentials and does not respect any
- * credentials-related arguments in {@link GcpOptions}.
- */
-public class PubsubGrpcClient extends PubsubClient {
-  private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com";
-  private static final int PUBSUB_PORT = 443;
-  private static final int LIST_BATCH_SIZE = 1000;
-
-  private static final int DEFAULT_TIMEOUT_S = 15;
-
-  private static class PubsubGrpcClientFactory implements PubsubClientFactory {
-    @Override
-    public PubsubClient newClient(
-        @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
-        throws IOException {
-      ManagedChannel channel = NettyChannelBuilder
-          .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT)
-          .negotiationType(NegotiationType.TLS)
-          .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
-          .build();
-
-      return new PubsubGrpcClient(timestampLabel,
-                                  idLabel,
-                                  DEFAULT_TIMEOUT_S,
-                                  channel,
-                                  options.getGcpCredential());
-    }
-
-    @Override
-    public String getKind() {
-      return "Grpc";
-    }
-  }
-
-  /**
-   * Factory for creating Pubsub clients using gRCP transport.
-   */
-  public static final PubsubClientFactory FACTORY = new PubsubGrpcClientFactory();
-
-  /**
-   * Timeout for grpc calls (in s).
-   */
-  private final int timeoutSec;
-
-  /**
-   * Underlying netty channel, or {@literal null} if closed.
-   */
-  @Nullable
-  private ManagedChannel publisherChannel;
-
-  /**
-   * Credentials determined from options and environment.
-   */
-  private final Credentials credentials;
-
-  /**
-   * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
-   * instead.
-   */
-  @Nullable
-  private final String timestampLabel;
-
-  /**
-   * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
-   */
-  @Nullable
-  private final String idLabel;
-
-
-  /**
-   * Cached stubs, or null if not cached.
-   */
-  @Nullable
-  private PublisherGrpc.PublisherBlockingStub cachedPublisherStub;
-  private SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub;
-
-  @VisibleForTesting
-  PubsubGrpcClient(
-      @Nullable String timestampLabel,
-      @Nullable String idLabel,
-      int timeoutSec,
-      ManagedChannel publisherChannel,
-      Credentials credentials) {
-    this.timestampLabel = timestampLabel;
-    this.idLabel = idLabel;
-    this.timeoutSec = timeoutSec;
-    this.publisherChannel = publisherChannel;
-    this.credentials = credentials;
-  }
-
-  /**
-   * Gracefully close the underlying netty channel.
-   */
-  @Override
-  public void close() {
-    if (publisherChannel == null) {
-      // Already closed.
-      return;
-    }
-    // Can gc the underlying stubs.
-    cachedPublisherStub = null;
-    cachedSubscriberStub = null;
-    // Mark the client as having been closed before going further
-    // in case we have an exception from the channel.
-    ManagedChannel publisherChannel = this.publisherChannel;
-    this.publisherChannel = null;
-    // Gracefully shutdown the channel.
-    publisherChannel.shutdown();
-    try {
-      publisherChannel.awaitTermination(timeoutSec, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      // Ignore.
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  /**
-   * Return channel with interceptor for returning credentials.
-   */
-  private Channel newChannel() throws IOException {
-    checkState(publisherChannel != null, "PubsubGrpcClient has been closed");
-    ClientAuthInterceptor interceptor =
-        new ClientAuthInterceptor(credentials, Executors.newSingleThreadExecutor());
-    return ClientInterceptors.intercept(publisherChannel, interceptor);
-  }
-
-  /**
-   * Return a stub for making a publish request with a timeout.
-   */
-  private PublisherBlockingStub publisherStub() throws IOException {
-    if (cachedPublisherStub == null) {
-      cachedPublisherStub = PublisherGrpc.newBlockingStub(newChannel());
-    }
-    return cachedPublisherStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS);
-  }
-
-  /**
-   * Return a stub for making a subscribe request with a timeout.
-   */
-  private SubscriberBlockingStub subscriberStub() throws IOException {
-    if (cachedSubscriberStub == null) {
-      cachedSubscriberStub = SubscriberGrpc.newBlockingStub(newChannel());
-    }
-    return cachedSubscriberStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS);
-  }
-
-  @Override
-  public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
-      throws IOException {
-    PublishRequest.Builder request = PublishRequest.newBuilder()
-                                                   .setTopic(topic.getPath());
-    for (OutgoingMessage outgoingMessage : outgoingMessages) {
-      PubsubMessage.Builder message =
-          PubsubMessage.newBuilder()
-                       .setData(ByteString.copyFrom(outgoingMessage.elementBytes));
-
-      if (outgoingMessage.attributes != null) {
-        message.putAllAttributes(outgoingMessage.attributes);
-      }
-
-      if (timestampLabel != null) {
-        message.getMutableAttributes()
-               .put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
-      }
-
-      if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
-        message.getMutableAttributes().put(idLabel, outgoingMessage.recordId);
-      }
-
-      request.addMessages(message);
-    }
-
-    PublishResponse response = publisherStub().publish(request.build());
-    return response.getMessageIdsCount();
-  }
-
-  @Override
-  public List<IncomingMessage> pull(
-      long requestTimeMsSinceEpoch,
-      SubscriptionPath subscription,
-      int batchSize,
-      boolean returnImmediately) throws IOException {
-    PullRequest request = PullRequest.newBuilder()
-                                     .setSubscription(subscription.getPath())
-                                     .setReturnImmediately(returnImmediately)
-                                     .setMaxMessages(batchSize)
-                                     .build();
-    PullResponse response = subscriberStub().pull(request);
-    if (response.getReceivedMessagesCount() == 0) {
-      return ImmutableList.of();
-    }
-    List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessagesCount());
-    for (ReceivedMessage message : response.getReceivedMessagesList()) {
-      PubsubMessage pubsubMessage = message.getMessage();
-      @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
-
-      // Payload.
-      byte[] elementBytes = pubsubMessage.getData().toByteArray();
-
-      // Timestamp.
-      String pubsubTimestampString = null;
-      Timestamp timestampProto = pubsubMessage.getPublishTime();
-      if (timestampProto != null) {
-        pubsubTimestampString = String.valueOf(timestampProto.getSeconds()
-                                               + timestampProto.getNanos() / 1000L);
-      }
-      long timestampMsSinceEpoch =
-          extractTimestamp(timestampLabel, pubsubTimestampString, attributes);
-
-      // Ack id.
-      String ackId = message.getAckId();
-      checkState(!Strings.isNullOrEmpty(ackId));
-
-      // Record id, if any.
-      @Nullable String recordId = null;
-      if (idLabel != null && attributes != null) {
-        recordId = attributes.get(idLabel);
-      }
-      if (Strings.isNullOrEmpty(recordId)) {
-        // Fall back to the Pubsub provided message id.
-        recordId = pubsubMessage.getMessageId();
-      }
-
-      incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
-                                               requestTimeMsSinceEpoch, ackId, recordId));
-    }
-    return incomingMessages;
-  }
-
-  @Override
-  public void acknowledge(SubscriptionPath subscription, List<String> ackIds)
-      throws IOException {
-    AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
-                                                   .setSubscription(subscription.getPath())
-                                                   .addAllAckIds(ackIds)
-                                                   .build();
-    subscriberStub().acknowledge(request); // ignore Empty result.
-  }
-
-  @Override
-  public void modifyAckDeadline(
-      SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds)
-      throws IOException {
-    ModifyAckDeadlineRequest request =
-        ModifyAckDeadlineRequest.newBuilder()
-                                .setSubscription(subscription.getPath())
-                                .addAllAckIds(ackIds)
-                                .setAckDeadlineSeconds(deadlineSeconds)
-                                .build();
-    subscriberStub().modifyAckDeadline(request); // ignore Empty result.
-  }
-
-  @Override
-  public void createTopic(TopicPath topic) throws IOException {
-    Topic request = Topic.newBuilder()
-                         .setName(topic.getPath())
-                         .build();
-    publisherStub().createTopic(request); // ignore Topic result.
-  }
-
-  @Override
-  public void deleteTopic(TopicPath topic) throws IOException {
-    DeleteTopicRequest request = DeleteTopicRequest.newBuilder()
-                                                   .setTopic(topic.getPath())
-                                                   .build();
-    publisherStub().deleteTopic(request); // ignore Empty result.
-  }
-
-  @Override
-  public List<TopicPath> listTopics(ProjectPath project) throws IOException {
-    ListTopicsRequest.Builder request =
-        ListTopicsRequest.newBuilder()
-                         .setProject(project.getPath())
-                         .setPageSize(LIST_BATCH_SIZE);
-    ListTopicsResponse response = publisherStub().listTopics(request.build());
-    if (response.getTopicsCount() == 0) {
-      return ImmutableList.of();
-    }
-    List<TopicPath> topics = new ArrayList<>(response.getTopicsCount());
-    while (true) {
-      for (Topic topic : response.getTopicsList()) {
-        topics.add(topicPathFromPath(topic.getName()));
-      }
-      if (response.getNextPageToken().isEmpty()) {
-        break;
-      }
-      request.setPageToken(response.getNextPageToken());
-      response = publisherStub().listTopics(request.build());
-    }
-    return topics;
-  }
-
-  @Override
-  public void createSubscription(
-      TopicPath topic, SubscriptionPath subscription,
-      int ackDeadlineSeconds) throws IOException {
-    Subscription request = Subscription.newBuilder()
-                                       .setTopic(topic.getPath())
-                                       .setName(subscription.getPath())
-                                       .setAckDeadlineSeconds(ackDeadlineSeconds)
-                                       .build();
-    subscriberStub().createSubscription(request); // ignore Subscription result.
-  }
-
-  @Override
-  public void deleteSubscription(SubscriptionPath subscription) throws IOException {
-    DeleteSubscriptionRequest request =
-        DeleteSubscriptionRequest.newBuilder()
-                                 .setSubscription(subscription.getPath())
-                                 .build();
-    subscriberStub().deleteSubscription(request); // ignore Empty result.
-  }
-
-  @Override
-  public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
-      throws IOException {
-    ListSubscriptionsRequest.Builder request =
-        ListSubscriptionsRequest.newBuilder()
-                                .setProject(project.getPath())
-                                .setPageSize(LIST_BATCH_SIZE);
-    ListSubscriptionsResponse response = subscriberStub().listSubscriptions(request.build());
-    if (response.getSubscriptionsCount() == 0) {
-      return ImmutableList.of();
-    }
-    List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptionsCount());
-    while (true) {
-      for (Subscription subscription : response.getSubscriptionsList()) {
-        if (subscription.getTopic().equals(topic.getPath())) {
-          subscriptions.add(subscriptionPathFromPath(subscription.getName()));
-        }
-      }
-      if (response.getNextPageToken().isEmpty()) {
-        break;
-      }
-      request.setPageToken(response.getNextPageToken());
-      response = subscriberStub().listSubscriptions(request.build());
-    }
-    return subscriptions;
-  }
-
-  @Override
-  public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
-    GetSubscriptionRequest request =
-        GetSubscriptionRequest.newBuilder()
-                              .setSubscription(subscription.getPath())
-                              .build();
-    Subscription response = subscriberStub().getSubscription(request);
-    return response.getAckDeadlineSeconds();
-  }
-
-  @Override
-  public boolean isEOF() {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
deleted file mode 100644
index ef8abfd..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.Pubsub.Builder;
-import com.google.api.services.pubsub.model.AcknowledgeRequest;
-import com.google.api.services.pubsub.model.ListSubscriptionsResponse;
-import com.google.api.services.pubsub.model.ListTopicsResponse;
-import com.google.api.services.pubsub.model.ModifyAckDeadlineRequest;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PublishResponse;
-import com.google.api.services.pubsub.model.PubsubMessage;
-import com.google.api.services.pubsub.model.PullRequest;
-import com.google.api.services.pubsub.model.PullResponse;
-import com.google.api.services.pubsub.model.ReceivedMessage;
-import com.google.api.services.pubsub.model.Subscription;
-import com.google.api.services.pubsub.model.Topic;
-import com.google.auth.Credentials;
-import com.google.auth.http.HttpCredentialsAdapter;
-import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PubsubOptions;
-
-/**
- * A Pubsub client using JSON transport.
- */
-public class PubsubJsonClient extends PubsubClient {
-
-  private static class PubsubJsonClientFactory implements PubsubClientFactory {
-    private static HttpRequestInitializer chainHttpRequestInitializer(
-        Credentials credential, HttpRequestInitializer httpRequestInitializer) {
-      if (credential == null) {
-        return httpRequestInitializer;
-      } else {
-        return new ChainingHttpRequestInitializer(
-            new HttpCredentialsAdapter(credential),
-            httpRequestInitializer);
-      }
-    }
-
-    @Override
-    public PubsubClient newClient(
-        @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
-        throws IOException {
-      Pubsub pubsub = new Builder(
-          Transport.getTransport(),
-          Transport.getJsonFactory(),
-          chainHttpRequestInitializer(
-              options.getGcpCredential(),
-              // Do not log 404. It clutters the output and is possibly even required by the caller.
-              new RetryHttpRequestInitializer(ImmutableList.of(404))))
-          .setRootUrl(options.getPubsubRootUrl())
-          .setApplicationName(options.getAppName())
-          .setGoogleClientRequestInitializer(options.getGoogleApiTrace())
-          .build();
-      return new PubsubJsonClient(timestampLabel, idLabel, pubsub);
-    }
-
-    @Override
-    public String getKind() {
-      return "Json";
-    }
-  }
-
-  /**
-   * Factory for creating Pubsub clients using Json transport.
-   */
-  public static final PubsubClientFactory FACTORY = new PubsubJsonClientFactory();
-
-  /**
-   * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
-   * instead.
-   */
-  @Nullable
-  private final String timestampLabel;
-
-  /**
-   * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
-   */
-  @Nullable
-  private final String idLabel;
-
-  /**
-   * Underlying JSON transport.
-   */
-  private Pubsub pubsub;
-
-  @VisibleForTesting
-  PubsubJsonClient(
-      @Nullable String timestampLabel,
-      @Nullable String idLabel,
-      Pubsub pubsub) {
-    this.timestampLabel = timestampLabel;
-    this.idLabel = idLabel;
-    this.pubsub = pubsub;
-  }
-
-  @Override
-  public void close() {
-    // Nothing to close.
-  }
-
-  @Override
-  public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
-      throws IOException {
-    List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size());
-    for (OutgoingMessage outgoingMessage : outgoingMessages) {
-      PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes);
-
-      Map<String, String> attributes = outgoingMessage.attributes;
-      if ((timestampLabel != null || idLabel != null) && attributes == null) {
-        attributes = new TreeMap<>();
-      }
-      if (attributes != null) {
-        pubsubMessage.setAttributes(attributes);
-      }
-
-      if (timestampLabel != null) {
-        attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
-      }
-
-      if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
-        attributes.put(idLabel, outgoingMessage.recordId);
-      }
-
-      pubsubMessages.add(pubsubMessage);
-    }
-    PublishRequest request = new PublishRequest().setMessages(pubsubMessages);
-    PublishResponse response = pubsub.projects()
-                                     .topics()
-                                     .publish(topic.getPath(), request)
-                                     .execute();
-    return response.getMessageIds().size();
-  }
-
-  @Override
-  public List<IncomingMessage> pull(
-      long requestTimeMsSinceEpoch,
-      SubscriptionPath subscription,
-      int batchSize,
-      boolean returnImmediately) throws IOException {
-    PullRequest request = new PullRequest()
-        .setReturnImmediately(returnImmediately)
-        .setMaxMessages(batchSize);
-    PullResponse response = pubsub.projects()
-                                  .subscriptions()
-                                  .pull(subscription.getPath(), request)
-                                  .execute();
-    if (response.getReceivedMessages() == null || response.getReceivedMessages().size() == 0) {
-      return ImmutableList.of();
-    }
-    List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessages().size());
-    for (ReceivedMessage message : response.getReceivedMessages()) {
-      PubsubMessage pubsubMessage = message.getMessage();
-      @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
-
-      // Payload.
-      byte[] elementBytes = pubsubMessage.decodeData();
-
-      // Timestamp.
-      long timestampMsSinceEpoch =
-          extractTimestamp(timestampLabel, message.getMessage().getPublishTime(), attributes);
-
-      // Ack id.
-      String ackId = message.getAckId();
-      checkState(!Strings.isNullOrEmpty(ackId));
-
-      // Record id, if any.
-      @Nullable String recordId = null;
-      if (idLabel != null && attributes != null) {
-        recordId = attributes.get(idLabel);
-      }
-      if (Strings.isNullOrEmpty(recordId)) {
-        // Fall back to the Pubsub provided message id.
-        recordId = pubsubMessage.getMessageId();
-      }
-
-      incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
-                                               requestTimeMsSinceEpoch, ackId, recordId));
-    }
-
-    return incomingMessages;
-  }
-
-  @Override
-  public void acknowledge(SubscriptionPath subscription, List<String> ackIds) throws IOException {
-    AcknowledgeRequest request = new AcknowledgeRequest().setAckIds(ackIds);
-    pubsub.projects()
-          .subscriptions()
-          .acknowledge(subscription.getPath(), request)
-          .execute(); // ignore Empty result.
-  }
-
-  @Override
-  public void modifyAckDeadline(
-      SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds)
-      throws IOException {
-    ModifyAckDeadlineRequest request =
-        new ModifyAckDeadlineRequest().setAckIds(ackIds)
-                                      .setAckDeadlineSeconds(deadlineSeconds);
-    pubsub.projects()
-          .subscriptions()
-          .modifyAckDeadline(subscription.getPath(), request)
-          .execute(); // ignore Empty result.
-  }
-
-  @Override
-  public void createTopic(TopicPath topic) throws IOException {
-    pubsub.projects()
-          .topics()
-          .create(topic.getPath(), new Topic())
-          .execute(); // ignore Topic result.
-  }
-
-  @Override
-  public void deleteTopic(TopicPath topic) throws IOException {
-    pubsub.projects()
-          .topics()
-          .delete(topic.getPath())
-          .execute(); // ignore Empty result.
-  }
-
-  @Override
-  public List<TopicPath> listTopics(ProjectPath project) throws IOException {
-    ListTopicsResponse response = pubsub.projects()
-                                        .topics()
-                                        .list(project.getPath())
-                                        .execute();
-    if (response.getTopics() == null || response.getTopics().isEmpty()) {
-      return ImmutableList.of();
-    }
-    List<TopicPath> topics = new ArrayList<>(response.getTopics().size());
-    for (Topic topic : response.getTopics()) {
-      topics.add(topicPathFromPath(topic.getName()));
-    }
-    return topics;
-  }
-
-  @Override
-  public void createSubscription(
-      TopicPath topic, SubscriptionPath subscription,
-      int ackDeadlineSeconds) throws IOException {
-    Subscription request = new Subscription()
-        .setTopic(topic.getPath())
-        .setAckDeadlineSeconds(ackDeadlineSeconds);
-    pubsub.projects()
-          .subscriptions()
-          .create(subscription.getPath(), request)
-          .execute(); // ignore Subscription result.
-  }
-
-  @Override
-  public void deleteSubscription(SubscriptionPath subscription) throws IOException {
-    pubsub.projects()
-          .subscriptions()
-          .delete(subscription.getPath())
-          .execute(); // ignore Empty result.
-  }
-
-  @Override
-  public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
-      throws IOException {
-    ListSubscriptionsResponse response = pubsub.projects()
-                                               .subscriptions()
-                                               .list(project.getPath())
-                                               .execute();
-    if (response.getSubscriptions() == null || response.getSubscriptions().isEmpty()) {
-      return ImmutableList.of();
-    }
-    List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptions().size());
-    for (Subscription subscription : response.getSubscriptions()) {
-      if (subscription.getTopic().equals(topic.getPath())) {
-        subscriptions.add(subscriptionPathFromPath(subscription.getName()));
-      }
-    }
-    return subscriptions;
-  }
-
-  @Override
-  public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
-    Subscription response = pubsub.projects().subscriptions().get(subscription.getPath()).execute();
-    return response.getAckDeadlineSeconds();
-  }
-
-  @Override
-  public boolean isEOF() {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
deleted file mode 100644
index 61479f9..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
+++ /dev/null
@@ -1,436 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.api.client.util.Clock;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PubsubOptions;
-
-/**
- * A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for
- * testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline}
- * methods. Relies on statics to mimic the Pubsub service, though we try to hide that.
- */
-public class PubsubTestClient extends PubsubClient implements Serializable {
-  /**
-   * Mimic the state of the simulated Pubsub 'service'.
-   *
-   * <p>Note that the {@link PubsubTestClientFactory} is serialized/deserialized even when running
-   * test pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s to be created
-   * from the same client factory and run in parallel. Thus we can't enforce aliasing of the
-   * following data structures over all clients and must resort to a static.
-   */
-  private static class State {
-    /**
-     * True if has been primed for a test but not yet validated.
-     */
-    boolean isActive;
-
-    /**
-     * Publish mode only: Only publish calls for this topic are allowed.
-     */
-    @Nullable
-    TopicPath expectedTopic;
-
-    /**
-     * Publish mode only: Messages yet to seen in a {@link #publish} call.
-     */
-    @Nullable
-    Set<OutgoingMessage> remainingExpectedOutgoingMessages;
-
-    /**
-     * Publish mode only: Messages which should throw when first sent to simulate transient publish
-     * failure.
-     */
-    @Nullable
-    Set<OutgoingMessage> remainingFailingOutgoingMessages;
-
-    /**
-     * Pull mode only: Clock from which to get current time.
-     */
-    @Nullable
-    Clock clock;
-
-    /**
-     * Pull mode only: Only pull calls for this subscription are allowed.
-     */
-    @Nullable
-    SubscriptionPath expectedSubscription;
-
-    /**
-     * Pull mode only: Timeout to simulate.
-     */
-    int ackTimeoutSec;
-
-    /**
-     * Pull mode only: Messages waiting to be received by a {@link #pull} call.
-     */
-    @Nullable
-    List<IncomingMessage> remainingPendingIncomingMessages;
-
-    /**
-     * Pull mode only: Messages which have been returned from a {@link #pull} call and
-     * not yet ACKed by an {@link #acknowledge} call.
-     */
-    @Nullable
-    Map<String, IncomingMessage> pendingAckIncomingMessages;
-
-    /**
-     * Pull mode only: When above messages are due to have their ACK deadlines expire.
-     */
-    @Nullable
-    Map<String, Long> ackDeadline;
-  }
-
-  private static final State STATE = new State();
-
-  /** Closing the factory will validate all expected messages were processed. */
-  public interface PubsubTestClientFactory
-          extends PubsubClientFactory, Closeable, Serializable {
-  }
-
-  /**
-   * Return a factory for testing publishers. Only one factory may be in-flight at a time.
-   * The factory must be closed when the test is complete, at which point final validation will
-   * occur.
-   */
-  public static PubsubTestClientFactory createFactoryForPublish(
-      final TopicPath expectedTopic,
-      final Iterable<OutgoingMessage> expectedOutgoingMessages,
-      final Iterable<OutgoingMessage> failingOutgoingMessages) {
-    synchronized (STATE) {
-      checkState(!STATE.isActive, "Test still in flight");
-      STATE.expectedTopic = expectedTopic;
-      STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages);
-      STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages);
-      STATE.isActive = true;
-    }
-    return new PubsubTestClientFactory() {
-      @Override
-      public PubsubClient newClient(
-          @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
-          throws IOException {
-        return new PubsubTestClient();
-      }
-
-      @Override
-      public String getKind() {
-        return "PublishTest";
-      }
-
-      @Override
-      public void close() {
-        synchronized (STATE) {
-          checkState(STATE.isActive, "No test still in flight");
-          checkState(STATE.remainingExpectedOutgoingMessages.isEmpty(),
-                     "Still waiting for %s messages to be published",
-                     STATE.remainingExpectedOutgoingMessages.size());
-          STATE.isActive = false;
-          STATE.remainingExpectedOutgoingMessages = null;
-        }
-      }
-    };
-  }
-
-  /**
-   * Return a factory for testing subscribers. Only one factory may be in-flight at a time.
-   * The factory must be closed when the test in complete
-   */
-  public static PubsubTestClientFactory createFactoryForPull(
-      final Clock clock,
-      final SubscriptionPath expectedSubscription,
-      final int ackTimeoutSec,
-      final Iterable<IncomingMessage> expectedIncomingMessages) {
-    synchronized (STATE) {
-      checkState(!STATE.isActive, "Test still in flight");
-      STATE.clock = clock;
-      STATE.expectedSubscription = expectedSubscription;
-      STATE.ackTimeoutSec = ackTimeoutSec;
-      STATE.remainingPendingIncomingMessages = Lists.newArrayList(expectedIncomingMessages);
-      STATE.pendingAckIncomingMessages = new HashMap<>();
-      STATE.ackDeadline = new HashMap<>();
-      STATE.isActive = true;
-    }
-    return new PubsubTestClientFactory() {
-      @Override
-      public PubsubClient newClient(
-          @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
-          throws IOException {
-        return new PubsubTestClient();
-      }
-
-      @Override
-      public String getKind() {
-        return "PullTest";
-      }
-
-      @Override
-      public void close() {
-        synchronized (STATE) {
-          checkState(STATE.isActive, "No test still in flight");
-          checkState(STATE.remainingPendingIncomingMessages.isEmpty(),
-                     "Still waiting for %s messages to be pulled",
-                     STATE.remainingPendingIncomingMessages.size());
-          checkState(STATE.pendingAckIncomingMessages.isEmpty(),
-                     "Still waiting for %s messages to be ACKed",
-                     STATE.pendingAckIncomingMessages.size());
-          checkState(STATE.ackDeadline.isEmpty(),
-                     "Still waiting for %s messages to be ACKed",
-                     STATE.ackDeadline.size());
-          STATE.isActive = false;
-          STATE.remainingPendingIncomingMessages = null;
-          STATE.pendingAckIncomingMessages = null;
-          STATE.ackDeadline = null;
-        }
-      }
-    };
-  }
-
-  public static PubsubTestClientFactory createFactoryForCreateSubscription() {
-    return new PubsubTestClientFactory() {
-      int numCalls = 0;
-
-      @Override
-      public void close() throws IOException {
-        checkState(
-            numCalls == 1, "Expected exactly one subscription to be created, got %s", numCalls);
-      }
-
-      @Override
-      public PubsubClient newClient(
-          @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
-          throws IOException {
-        return new PubsubTestClient() {
-          @Override
-          public void createSubscription(
-              TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds)
-              throws IOException {
-            checkState(numCalls == 0, "Expected at most one subscription to be created");
-            numCalls++;
-          }
-        };
-      }
-
-      @Override
-      public String getKind() {
-        return "CreateSubscriptionTest";
-      }
-    };
-  }
-
-  /**
-   * Return true if in pull mode.
-   */
-  private boolean inPullMode() {
-    checkState(STATE.isActive, "No test is active");
-    return STATE.expectedSubscription != null;
-  }
-
-  /**
-   * Return true if in publish mode.
-   */
-  private boolean inPublishMode() {
-    checkState(STATE.isActive, "No test is active");
-    return STATE.expectedTopic != null;
-  }
-
-  /**
-   * For subscription mode only:
-   * Track progression of time according to the {@link Clock} passed . This will simulate Pubsub
-   * expiring
-   * outstanding ACKs.
-   */
-  public void advance() {
-    synchronized (STATE) {
-      checkState(inPullMode(), "Can only advance in pull mode");
-      // Any messages who's ACKs timed out are available for re-pulling.
-      Iterator<Map.Entry<String, Long>> deadlineItr = STATE.ackDeadline.entrySet().iterator();
-      while (deadlineItr.hasNext()) {
-        Map.Entry<String, Long> entry = deadlineItr.next();
-        if (entry.getValue() <= STATE.clock.currentTimeMillis()) {
-          STATE.remainingPendingIncomingMessages.add(
-              STATE.pendingAckIncomingMessages.remove(entry.getKey()));
-          deadlineItr.remove();
-        }
-      }
-    }
-  }
-
-  @Override
-  public void close() {
-  }
-
-  @Override
-  public int publish(
-      TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException {
-    synchronized (STATE) {
-      checkState(inPublishMode(), "Can only publish in publish mode");
-      checkState(topic.equals(STATE.expectedTopic), "Topic %s does not match expected %s", topic,
-                 STATE.expectedTopic);
-      for (OutgoingMessage outgoingMessage : outgoingMessages) {
-        if (STATE.remainingFailingOutgoingMessages.remove(outgoingMessage)) {
-          throw new RuntimeException("Simulating failure for " + outgoingMessage);
-        }
-        checkState(STATE.remainingExpectedOutgoingMessages.remove(outgoingMessage),
-                   "Unexpected outgoing message %s", outgoingMessage);
-      }
-      return outgoingMessages.size();
-    }
-  }
-
-  @Override
-  public List<IncomingMessage> pull(
-      long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize,
-      boolean returnImmediately) throws IOException {
-    synchronized (STATE) {
-      checkState(inPullMode(), "Can only pull in pull mode");
-      long now = STATE.clock.currentTimeMillis();
-      checkState(requestTimeMsSinceEpoch == now,
-                 "Simulated time %s does not match request time %s", now, requestTimeMsSinceEpoch);
-      checkState(subscription.equals(STATE.expectedSubscription),
-                 "Subscription %s does not match expected %s", subscription,
-                 STATE.expectedSubscription);
-      checkState(returnImmediately, "Pull only supported if returning immediately");
-
-      List<IncomingMessage> incomingMessages = new ArrayList<>();
-      Iterator<IncomingMessage> pendItr = STATE.remainingPendingIncomingMessages.iterator();
-      while (pendItr.hasNext()) {
-        IncomingMessage incomingMessage = pendItr.next();
-        pendItr.remove();
-        IncomingMessage incomingMessageWithRequestTime =
-            incomingMessage.withRequestTime(requestTimeMsSinceEpoch);
-        incomingMessages.add(incomingMessageWithRequestTime);
-        STATE.pendingAckIncomingMessages.put(incomingMessageWithRequestTime.ackId,
-                                             incomingMessageWithRequestTime);
-        STATE.ackDeadline.put(incomingMessageWithRequestTime.ackId,
-                              requestTimeMsSinceEpoch + STATE.ackTimeoutSec * 1000);
-        if (incomingMessages.size() >= batchSize) {
-          break;
-        }
-      }
-      return incomingMessages;
-    }
-  }
-
-  @Override
-  public void acknowledge(
-      SubscriptionPath subscription,
-      List<String> ackIds) throws IOException {
-    synchronized (STATE) {
-      checkState(inPullMode(), "Can only acknowledge in pull mode");
-      checkState(subscription.equals(STATE.expectedSubscription),
-                 "Subscription %s does not match expected %s", subscription,
-                 STATE.expectedSubscription);
-
-      for (String ackId : ackIds) {
-        checkState(STATE.ackDeadline.remove(ackId) != null,
-                   "No message with ACK id %s is waiting for an ACK", ackId);
-        checkState(STATE.pendingAckIncomingMessages.remove(ackId) != null,
-                   "No message with ACK id %s is waiting for an ACK", ackId);
-      }
-    }
-  }
-
-  @Override
-  public void modifyAckDeadline(
-      SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) throws IOException {
-    synchronized (STATE) {
-      checkState(inPullMode(), "Can only modify ack deadline in pull mode");
-      checkState(subscription.equals(STATE.expectedSubscription),
-                 "Subscription %s does not match expected %s", subscription,
-                 STATE.expectedSubscription);
-
-      for (String ackId : ackIds) {
-        if (deadlineSeconds > 0) {
-          checkState(STATE.ackDeadline.remove(ackId) != null,
-                     "No message with ACK id %s is waiting for an ACK", ackId);
-          checkState(STATE.pendingAckIncomingMessages.containsKey(ackId),
-                     "No message with ACK id %s is waiting for an ACK", ackId);
-          STATE.ackDeadline.put(ackId, STATE.clock.currentTimeMillis() + deadlineSeconds * 1000);
-        } else {
-          checkState(STATE.ackDeadline.remove(ackId) != null,
-                     "No message with ACK id %s is waiting for an ACK", ackId);
-          IncomingMessage message = STATE.pendingAckIncomingMessages.remove(ackId);
-          checkState(message != null, "No message with ACK id %s is waiting for an ACK", ackId);
-          STATE.remainingPendingIncomingMessages.add(message);
-        }
-      }
-    }
-  }
-
-  @Override
-  public void createTopic(TopicPath topic) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void deleteTopic(TopicPath topic) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public List<TopicPath> listTopics(ProjectPath project) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void createSubscription(
-      TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void deleteSubscription(SubscriptionPath subscription) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public List<SubscriptionPath> listSubscriptions(
-      ProjectPath project, TopicPath topic) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
-    synchronized (STATE) {
-      return STATE.ackTimeoutSec;
-    }
-  }
-
-  @Override
-  public boolean isEOF() {
-    synchronized (STATE) {
-      checkState(inPullMode(), "Can only check EOF in pull mode");
-      return STATE.remainingPendingIncomingMessages.isEmpty();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
index 1edfa1d..80c093b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
@@ -107,8 +107,7 @@ public class Transport {
   /**
    * Returns a Pubsub client builder using the specified {@link PubsubOptions}.
    *
-   * @deprecated Use an appropriate
-   * {@link org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory}
+   * @deprecated Use an appropriate org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory
    */
   @Deprecated
   public static Pubsub.Builder

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
deleted file mode 100644
index c996409..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static org.hamcrest.Matchers.hasItem;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-
-import java.util.Set;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.testing.UsesUnboundedPCollections;
-import org.apache.beam.sdk.testing.ValidatesRunner;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-import org.joda.time.Duration;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for PubsubIO Read and Write transforms.
- */
-@RunWith(JUnit4.class)
-public class PubsubIOTest {
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void testPubsubIOGetName() {
-    assertEquals("PubsubIO.Read",
-        PubsubIO.<String>read().topic("projects/myproject/topics/mytopic").getName());
-    assertEquals("PubsubIO.Write",
-        PubsubIO.<String>write().topic("projects/myproject/topics/mytopic").getName());
-  }
-
-  @Test
-  public void testTopicValidationSuccess() throws Exception {
-    PubsubIO.<String>read().topic("projects/my-project/topics/abc");
-    PubsubIO.<String>read().topic("projects/my-project/topics/ABC");
-    PubsubIO.<String>read().topic("projects/my-project/topics/AbC-DeF");
-    PubsubIO.<String>read().topic("projects/my-project/topics/AbC-1234");
-    PubsubIO.<String>read().topic("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc");
-    PubsubIO.<String>read().topic(new StringBuilder()
-        .append("projects/my-project/topics/A-really-long-one-")
-        .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
-        .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
-        .append("11111111111111111111111111111111111111111111111111111111111111111111111111")
-        .toString());
-  }
-
-  @Test
-  public void testTopicValidationBadCharacter() throws Exception {
-    thrown.expect(IllegalArgumentException.class);
-    PubsubIO.<String>read().topic("projects/my-project/topics/abc-*-abc");
-  }
-
-  @Test
-  public void testTopicValidationTooLong() throws Exception {
-    thrown.expect(IllegalArgumentException.class);
-    PubsubIO.<String>read().topic(new StringBuilder().append
-        ("projects/my-project/topics/A-really-long-one-")
-        .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
-        .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
-        .append("1111111111111111111111111111111111111111111111111111111111111111111111111111")
-        .toString());
-  }
-
-  @Test
-  public void testReadTopicDisplayData() {
-    String topic = "projects/project/topics/topic";
-    String subscription = "projects/project/subscriptions/subscription";
-    Duration maxReadTime = Duration.standardMinutes(5);
-    PubsubIO.Read<String> read = PubsubIO.<String>read()
-        .topic(StaticValueProvider.of(topic))
-        .timestampLabel("myTimestamp")
-        .idLabel("myId");
-
-    DisplayData displayData = DisplayData.from(read);
-
-    assertThat(displayData, hasDisplayItem("topic", topic));
-    assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
-    assertThat(displayData, hasDisplayItem("idLabel", "myId"));
-  }
-
-  @Test
-  public void testReadSubscriptionDisplayData() {
-    String topic = "projects/project/topics/topic";
-    String subscription = "projects/project/subscriptions/subscription";
-    Duration maxReadTime = Duration.standardMinutes(5);
-    PubsubIO.Read<String> read = PubsubIO.<String>read()
-        .subscription(StaticValueProvider.of(subscription))
-        .timestampLabel("myTimestamp")
-        .idLabel("myId");
-
-    DisplayData displayData = DisplayData.from(read);
-
-    assertThat(displayData, hasDisplayItem("subscription", subscription));
-    assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
-    assertThat(displayData, hasDisplayItem("idLabel", "myId"));
-  }
-
-  @Test
-  public void testNullTopic() {
-    String subscription = "projects/project/subscriptions/subscription";
-    PubsubIO.Read<String> read = PubsubIO.<String>read()
-        .subscription(StaticValueProvider.of(subscription));
-    assertNull(read.getTopic());
-    assertNotNull(read.getSubscription());
-    assertNotNull(DisplayData.from(read));
-  }
-
-  @Test
-  public void testNullSubscription() {
-    String topic = "projects/project/topics/topic";
-    PubsubIO.Read<String> read = PubsubIO.<String>read()
-        .topic(StaticValueProvider.of(topic));
-    assertNotNull(read.getTopic());
-    assertNull(read.getSubscription());
-    assertNotNull(DisplayData.from(read));
-  }
-
-  @Test
-  @Category({ValidatesRunner.class, UsesUnboundedPCollections.class})
-  public void testPrimitiveReadDisplayData() {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    Set<DisplayData> displayData;
-    PubsubIO.Read<String> read = PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
-
-    // Reading from a subscription.
-    read = read.subscription("projects/project/subscriptions/subscription");
-    displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
-    assertThat("PubsubIO.Read should include the subscription in its primitive display data",
-        displayData, hasItem(hasDisplayItem("subscription")));
-
-    // Reading from a topic.
-    read = read.topic("projects/project/topics/topic");
-    displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
-    assertThat("PubsubIO.Read should include the topic in its primitive display data",
-        displayData, hasItem(hasDisplayItem("topic")));
-  }
-
-  @Test
-  public void testWriteDisplayData() {
-    String topic = "projects/project/topics/topic";
-    PubsubIO.Write<?> write = PubsubIO.<String>write()
-        .topic(topic)
-        .timestampLabel("myTimestamp")
-        .idLabel("myId");
-
-    DisplayData displayData = DisplayData.from(write);
-
-    assertThat(displayData, hasDisplayItem("topic", topic));
-    assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
-    assertThat(displayData, hasDisplayItem("idLabel", "myId"));
-  }
-
-  @Test
-  @Category(ValidatesRunner.class)
-  public void testPrimitiveWriteDisplayData() {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PubsubIO.Write<?> write = PubsubIO.<String>write().topic("projects/project/topics/topic");
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat("PubsubIO.Write should include the topic in its primitive display data",
-        displayData, hasItem(hasDisplayItem("topic")));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
deleted file mode 100644
index 7a4be62..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.hash.Hashing;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.PubsubUnboundedSink.RecordIdMethod;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.util.PubsubClient;
-import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
-import org.apache.beam.sdk.util.PubsubClient.TopicPath;
-import org.apache.beam.sdk.util.PubsubTestClient;
-import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Test PubsubUnboundedSink.
- */
-@RunWith(JUnit4.class)
-public class PubsubUnboundedSinkTest implements Serializable {
-  private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
-  private static final String DATA = "testData";
-  private static final Map<String, String> ATTRIBUTES =
-          ImmutableMap.<String, String>builder().put("a", "b").put("c", "d").build();
-  private static final long TIMESTAMP = 1234L;
-  private static final String TIMESTAMP_LABEL = "timestamp";
-  private static final String ID_LABEL = "id";
-  private static final int NUM_SHARDS = 10;
-
-  private static class Stamp extends DoFn<String, String> {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.outputWithTimestamp(c.element(), new Instant(TIMESTAMP));
-    }
-  }
-
-  private String getRecordId(String data) {
-    return Hashing.murmur3_128().hashBytes(data.getBytes()).toString();
-  }
-
-  @Rule
-  public transient TestPipeline p = TestPipeline.create();
-
-  @Test
-  public void saneCoder() throws Exception {
-    OutgoingMessage message = new OutgoingMessage(
-            DATA.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(DATA));
-    CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message);
-    CoderProperties.coderSerializable(PubsubUnboundedSink.CODER);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void sendOneMessage() throws IOException {
-    List<OutgoingMessage> outgoing =
-        ImmutableList.of(new OutgoingMessage(
-                DATA.getBytes(),
-                ATTRIBUTES,
-                TIMESTAMP, getRecordId(DATA)));
-    int batchSize = 1;
-    int batchBytes = 1;
-    try (PubsubTestClientFactory factory =
-             PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
-                                                      ImmutableList.<OutgoingMessage>of())) {
-      PubsubUnboundedSink<String> sink =
-          new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(),
-              TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
-              Duration.standardSeconds(2),
-              new SimpleFunction<String, PubsubIO.PubsubMessage>() {
-                @Override
-                public PubsubIO.PubsubMessage apply(String input) {
-                  return new PubsubIO.PubsubMessage(input.getBytes(), ATTRIBUTES);
-                }
-              },
-              RecordIdMethod.DETERMINISTIC);
-      p.apply(Create.of(ImmutableList.of(DATA)))
-       .apply(ParDo.of(new Stamp()))
-       .apply(sink);
-      p.run();
-    }
-    // The PubsubTestClientFactory will assert fail on close if the actual published
-    // message does not match the expected publish message.
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void sendMoreThanOneBatchByNumMessages() throws IOException {
-    List<OutgoingMessage> outgoing = new ArrayList<>();
-    List<String> data = new ArrayList<>();
-    int batchSize = 2;
-    int batchBytes = 1000;
-    for (int i = 0; i < batchSize * 10; i++) {
-      String str = String.valueOf(i);
-      outgoing.add(new OutgoingMessage(
-              str.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(str)));
-      data.add(str);
-    }
-    try (PubsubTestClientFactory factory =
-             PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
-                                                      ImmutableList.<OutgoingMessage>of())) {
-      PubsubUnboundedSink<String> sink =
-          new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(),
-              TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
-              Duration.standardSeconds(2), null, RecordIdMethod.DETERMINISTIC);
-      p.apply(Create.of(data))
-       .apply(ParDo.of(new Stamp()))
-       .apply(sink);
-      p.run();
-    }
-    // The PubsubTestClientFactory will assert fail on close if the actual published
-    // message does not match the expected publish message.
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void sendMoreThanOneBatchByByteSize() throws IOException {
-    List<OutgoingMessage> outgoing = new ArrayList<>();
-    List<String> data = new ArrayList<>();
-    int batchSize = 100;
-    int batchBytes = 10;
-    int n = 0;
-    while (n < batchBytes * 10) {
-      StringBuilder sb = new StringBuilder();
-      for (int i = 0; i < batchBytes; i++) {
-        sb.append(String.valueOf(n));
-      }
-      String str = sb.toString();
-      outgoing.add(new OutgoingMessage(
-              str.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(str)));
-      data.add(str);
-      n += str.length();
-    }
-    try (PubsubTestClientFactory factory =
-             PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
-                                                      ImmutableList.<OutgoingMessage>of())) {
-      PubsubUnboundedSink<String> sink =
-          new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC),
-              StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
-              NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2),
-              null, RecordIdMethod.DETERMINISTIC);
-      p.apply(Create.of(data))
-       .apply(ParDo.of(new Stamp()))
-       .apply(sink);
-      p.run();
-    }
-    // The PubsubTestClientFactory will assert fail on close if the actual published
-    // message does not match the expected publish message.
-  }
-
-  // TODO: We would like to test that failed Pubsub publish calls cause the already assigned
-  // (and random) record ids to be reused. However that can't be done without the test runnner
-  // supporting retrying bundles.
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
deleted file mode 100644
index d9df2ca..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io;
-
-import static junit.framework.TestCase.assertFalse;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import com.google.api.client.util.Clock;
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubCheckpoint;
-import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubReader;
-import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.PubsubClient;
-import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
-import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
-import org.apache.beam.sdk.util.PubsubClient.TopicPath;
-import org.apache.beam.sdk.util.PubsubTestClient;
-import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory;
-import org.joda.time.Instant;
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Test PubsubUnboundedSource.
- */
-@RunWith(JUnit4.class)
-public class PubsubUnboundedSourceTest {
-  private static final SubscriptionPath SUBSCRIPTION =
-      PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
-  private static final String DATA = "testData";
-  private static final long TIMESTAMP = 1234L;
-  private static final long REQ_TIME = 6373L;
-  private static final String TIMESTAMP_LABEL = "timestamp";
-  private static final String ID_LABEL = "id";
-  private static final String ACK_ID = "testAckId";
-  private static final String RECORD_ID = "testRecordId";
-  private static final int ACK_TIMEOUT_S = 60;
-
-  private AtomicLong now;
-  private Clock clock;
-  private PubsubTestClientFactory factory;
-  private PubsubSource<String> primSource;
-
-  @Rule
-  public TestPipeline p = TestPipeline.create();
-
-  private void setupOneMessage(Iterable<IncomingMessage> incoming) {
-    now = new AtomicLong(REQ_TIME);
-    clock = new Clock() {
-      @Override
-      public long currentTimeMillis() {
-        return now.get();
-      }
-    };
-    factory = PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, incoming);
-    PubsubUnboundedSource<String> source =
-        new PubsubUnboundedSource<>(
-            clock, factory, null, null, StaticValueProvider.of(SUBSCRIPTION),
-            StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, null);
-    primSource = new PubsubSource<>(source);
-  }
-
-  private void setupOneMessage() {
-    setupOneMessage(ImmutableList.of(
-        new IncomingMessage(DATA.getBytes(), null, TIMESTAMP, 0, ACK_ID, RECORD_ID)));
-  }
-
-  @After
-  public void after() throws IOException {
-    factory.close();
-    now = null;
-    clock = null;
-    primSource = null;
-    factory = null;
-  }
-
-  @Test
-  public void checkpointCoderIsSane() throws Exception {
-    setupOneMessage(ImmutableList.<IncomingMessage>of());
-    CoderProperties.coderSerializable(primSource.getCheckpointMarkCoder());
-    // Since we only serialize/deserialize the 'notYetReadIds', and we don't want to make
-    // equals on checkpoints ignore those fields, we'll test serialization and deserialization
-    // of checkpoints in multipleReaders below.
-  }
-
-  @Test
-  public void readOneMessage() throws IOException {
-    setupOneMessage();
-    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
-    // Read one message.
-    assertTrue(reader.start());
-    assertEquals(DATA, reader.getCurrent());
-    assertFalse(reader.advance());
-    // ACK the message.
-    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
-    checkpoint.finalizeCheckpoint();
-    reader.close();
-  }
-
-  @Test
-  public void timeoutAckAndRereadOneMessage() throws IOException {
-    setupOneMessage();
-    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
-    PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
-    assertTrue(reader.start());
-    assertEquals(DATA, reader.getCurrent());
-    // Let the ACK deadline for the above expire.
-    now.addAndGet(65 * 1000);
-    pubsubClient.advance();
-    // We'll now receive the same message again.
-    assertTrue(reader.advance());
-    assertEquals(DATA, reader.getCurrent());
-    assertFalse(reader.advance());
-    // Now ACK the message.
-    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
-    checkpoint.finalizeCheckpoint();
-    reader.close();
-  }
-
-  @Test
-  public void extendAck() throws IOException {
-    setupOneMessage();
-    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
-    PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
-    // Pull the first message but don't take a checkpoint for it.
-    assertTrue(reader.start());
-    assertEquals(DATA, reader.getCurrent());
-    // Extend the ack
-    now.addAndGet(55 * 1000);
-    pubsubClient.advance();
-    assertFalse(reader.advance());
-    // Extend the ack again
-    now.addAndGet(25 * 1000);
-    pubsubClient.advance();
-    assertFalse(reader.advance());
-    // Now ACK the message.
-    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
-    checkpoint.finalizeCheckpoint();
-    reader.close();
-  }
-
-  @Test
-  public void timeoutAckExtensions() throws IOException {
-    setupOneMessage();
-    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
-    PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
-    // Pull the first message but don't take a checkpoint for it.
-    assertTrue(reader.start());
-    assertEquals(DATA, reader.getCurrent());
-    // Extend the ack.
-    now.addAndGet(55 * 1000);
-    pubsubClient.advance();
-    assertFalse(reader.advance());
-    // Let the ack expire.
-    for (int i = 0; i < 3; i++) {
-      now.addAndGet(25 * 1000);
-      pubsubClient.advance();
-      assertFalse(reader.advance());
-    }
-    // Wait for resend.
-    now.addAndGet(25 * 1000);
-    pubsubClient.advance();
-    // Reread the same message.
-    assertTrue(reader.advance());
-    assertEquals(DATA, reader.getCurrent());
-    // Now ACK the message.
-    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
-    checkpoint.finalizeCheckpoint();
-    reader.close();
-  }
-
-  @Test
-  public void multipleReaders() throws IOException {
-    List<IncomingMessage> incoming = new ArrayList<>();
-    for (int i = 0; i < 2; i++) {
-      String data = String.format("data_%d", i);
-      String ackid = String.format("ackid_%d", i);
-      incoming.add(new IncomingMessage(data.getBytes(), null, TIMESTAMP, 0, ackid, RECORD_ID));
-    }
-    setupOneMessage(incoming);
-    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
-    // Consume two messages, only read one.
-    assertTrue(reader.start());
-    assertEquals("data_0", reader.getCurrent());
-
-    // Grab checkpoint.
-    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
-    checkpoint.finalizeCheckpoint();
-    assertEquals(1, checkpoint.notYetReadIds.size());
-    assertEquals("ackid_1", checkpoint.notYetReadIds.get(0));
-
-    // Read second message.
-    assertTrue(reader.advance());
-    assertEquals("data_1", reader.getCurrent());
-
-    // Restore from checkpoint.
-    byte[] checkpointBytes =
-        CoderUtils.encodeToByteArray(primSource.getCheckpointMarkCoder(), checkpoint);
-    checkpoint = CoderUtils.decodeFromByteArray(primSource.getCheckpointMarkCoder(),
-                                                checkpointBytes);
-    assertEquals(1, checkpoint.notYetReadIds.size());
-    assertEquals("ackid_1", checkpoint.notYetReadIds.get(0));
-
-    // Re-read second message.
-    reader = primSource.createReader(p.getOptions(), checkpoint);
-    assertTrue(reader.start());
-    assertEquals("data_1", reader.getCurrent());
-
-    // We are done.
-    assertFalse(reader.advance());
-
-    // ACK final message.
-    checkpoint = reader.getCheckpointMark();
-    checkpoint.finalizeCheckpoint();
-    reader.close();
-  }
-
-  private long messageNumToTimestamp(int messageNum) {
-    return TIMESTAMP + messageNum * 100;
-  }
-
-  @Test
-  public void readManyMessages() throws IOException {
-    Map<String, Integer> dataToMessageNum = new HashMap<>();
-
-    final int m = 97;
-    final int n = 10000;
-    List<IncomingMessage> incoming = new ArrayList<>();
-    for (int i = 0; i < n; i++) {
-      // Make the messages timestamps slightly out of order.
-      int messageNum = ((i / m) * m) + (m - 1) - (i % m);
-      String data = String.format("data_%d", messageNum);
-      dataToMessageNum.put(data, messageNum);
-      String recid = String.format("recordid_%d", messageNum);
-      String ackId = String.format("ackid_%d", messageNum);
-      incoming.add(new IncomingMessage(data.getBytes(), null, messageNumToTimestamp(messageNum), 0,
-                                       ackId, recid));
-    }
-    setupOneMessage(incoming);
-
-    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
-    PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
-
-    for (int i = 0; i < n; i++) {
-      if (i == 0) {
-        assertTrue(reader.start());
-      } else {
-        assertTrue(reader.advance());
-      }
-      // We'll checkpoint and ack within the 2min limit.
-      now.addAndGet(30);
-      pubsubClient.advance();
-      String data = reader.getCurrent();
-      Integer messageNum = dataToMessageNum.remove(data);
-      // No duplicate messages.
-      assertNotNull(messageNum);
-      // Preserve timestamp.
-      assertEquals(new Instant(messageNumToTimestamp(messageNum)), reader.getCurrentTimestamp());
-      // Preserve record id.
-      String recid = String.format("recordid_%d", messageNum);
-      assertArrayEquals(recid.getBytes(), reader.getCurrentRecordId());
-
-      if (i % 1000 == 999) {
-        // Estimated watermark can never get ahead of actual outstanding messages.
-        long watermark = reader.getWatermark().getMillis();
-        long minOutstandingTimestamp = Long.MAX_VALUE;
-        for (Integer outstandingMessageNum : dataToMessageNum.values()) {
-          minOutstandingTimestamp =
-              Math.min(minOutstandingTimestamp, messageNumToTimestamp(outstandingMessageNum));
-        }
-        assertThat(watermark, lessThanOrEqualTo(minOutstandingTimestamp));
-        // Ack messages, but only every other finalization.
-        PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
-        if (i % 2000 == 1999) {
-          checkpoint.finalizeCheckpoint();
-        }
-      }
-    }
-    // We are done.
-    assertFalse(reader.advance());
-    // We saw each message exactly once.
-    assertTrue(dataToMessageNum.isEmpty());
-    reader.close();
-  }
-
-  @Test
-  public void noSubscriptionSplitIntoBundlesGeneratesSubscription() throws Exception {
-    TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic");
-    factory = PubsubTestClient.createFactoryForCreateSubscription();
-    PubsubUnboundedSource<String> source =
-        new PubsubUnboundedSource<>(
-            factory,
-            StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")),
-            StaticValueProvider.of(topicPath),
-            null,
-            StringUtf8Coder.of(),
-            null,
-            null,
-            null);
-    assertThat(source.getSubscription(), nullValue());
-
-    assertThat(source.getSubscription(), nullValue());
-
-    PipelineOptions options = PipelineOptionsFactory.create();
-    List<PubsubSource<String>> splits =
-        (new PubsubSource<>(source)).generateInitialSplits(3, options);
-    // We have at least one returned split
-    assertThat(splits, hasSize(greaterThan(0)));
-    for (PubsubSource<String> split : splits) {
-      // Each split is equal
-      assertThat(split, equalTo(splits.get(0)));
-    }
-
-    assertThat(splits.get(0).subscriptionPath, not(nullValue()));
-  }
-
-  @Test
-  public void noSubscriptionNoSplitGeneratesSubscription() throws Exception {
-    TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic");
-    factory = PubsubTestClient.createFactoryForCreateSubscription();
-    PubsubUnboundedSource<String> source =
-        new PubsubUnboundedSource<>(
-            factory,
-            StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")),
-            StaticValueProvider.of(topicPath),
-            null,
-            StringUtf8Coder.of(),
-            null,
-            null,
-            null);
-    assertThat(source.getSubscription(), nullValue());
-
-    assertThat(source.getSubscription(), nullValue());
-
-    PipelineOptions options = PipelineOptionsFactory.create();
-    PubsubSource<String> actualSource = new PubsubSource<>(source);
-    PubsubReader<String> reader = actualSource.createReader(options, null);
-    SubscriptionPath createdSubscription = reader.subscription;
-    assertThat(createdSubscription, not(nullValue()));
-
-    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
-    assertThat(checkpoint.subscriptionPath, equalTo(createdSubscription.getPath()));
-
-    checkpoint.finalizeCheckpoint();
-    PubsubCheckpoint<String> deserCheckpoint =
-        CoderUtils.clone(actualSource.getCheckpointMarkCoder(), checkpoint);
-    assertThat(checkpoint.subscriptionPath, not(nullValue()));
-    assertThat(checkpoint.subscriptionPath, equalTo(deserCheckpoint.subscriptionPath));
-
-    PubsubReader<String> readerFromOriginal = actualSource.createReader(options, checkpoint);
-    PubsubReader<String> readerFromDeser = actualSource.createReader(options, deserCheckpoint);
-
-    assertThat(readerFromOriginal.subscription, equalTo(createdSubscription));
-    assertThat(readerFromDeser.subscription, equalTo(createdSubscription));
-  }
-
-  /**
-   * Tests that checkpoints finalized after the reader is closed succeed.
-   */
-  @Test
-  public void closeWithActiveCheckpoints() throws Exception {
-    setupOneMessage();
-    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
-    reader.start();
-    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
-    reader.close();
-    checkpoint.finalizeCheckpoint();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java
deleted file mode 100644
index 1a99d38..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.Map;
-import org.apache.beam.sdk.util.PubsubClient.ProjectPath;
-import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
-import org.apache.beam.sdk.util.PubsubClient.TopicPath;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for helper classes and methods in PubsubClient.
- */
-@RunWith(JUnit4.class)
-public class PubsubClientTest {
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  //
-  // Timestamp handling
-  //
-
-  private long parse(String timestamp) {
-    Map<String, String> map = ImmutableMap.of("myLabel", timestamp);
-    return PubsubClient.extractTimestamp("myLabel", null, map);
-  }
-
-  private void roundTripRfc339(String timestamp) {
-    assertEquals(Instant.parse(timestamp).getMillis(), parse(timestamp));
-  }
-
-  private void truncatedRfc339(String timestamp, String truncatedTimestmap) {
-    assertEquals(Instant.parse(truncatedTimestmap).getMillis(), parse(timestamp));
-  }
-
-  @Test
-  public void noTimestampLabelReturnsPubsubPublish() {
-    final long time = 987654321L;
-    long timestamp = PubsubClient.extractTimestamp(null, String.valueOf(time), null);
-    assertEquals(time, timestamp);
-  }
-
-  @Test
-  public void noTimestampLabelAndInvalidPubsubPublishThrowsError() {
-    thrown.expect(NumberFormatException.class);
-    PubsubClient.extractTimestamp(null, "not-a-date", null);
-  }
-
-  @Test
-  public void timestampLabelWithNullAttributesThrowsError() {
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel");
-    PubsubClient.extractTimestamp("myLabel", null, null);
-  }
-
-  @Test
-  public void timestampLabelSetWithMissingAttributeThrowsError() {
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel");
-    Map<String, String> map = ImmutableMap.of("otherLabel", "whatever");
-    PubsubClient.extractTimestamp("myLabel", null, map);
-  }
-
-  @Test
-  public void timestampLabelParsesMillisecondsSinceEpoch() {
-    long time = 1446162101123L;
-    Map<String, String> map = ImmutableMap.of("myLabel", String.valueOf(time));
-    long timestamp = PubsubClient.extractTimestamp("myLabel", null, map);
-    assertEquals(time, timestamp);
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339Seconds() {
-    roundTripRfc339("2015-10-29T23:41:41Z");
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339Tenths() {
-    roundTripRfc339("2015-10-29T23:41:41.1Z");
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339Hundredths() {
-    roundTripRfc339("2015-10-29T23:41:41.12Z");
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339Millis() {
-    roundTripRfc339("2015-10-29T23:41:41.123Z");
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339Micros() {
-    // Note: micros part 456/1000 is dropped.
-    truncatedRfc339("2015-10-29T23:41:41.123456Z", "2015-10-29T23:41:41.123Z");
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339MicrosRounding() {
-    // Note: micros part 999/1000 is dropped, not rounded up.
-    truncatedRfc339("2015-10-29T23:41:41.123999Z", "2015-10-29T23:41:41.123Z");
-  }
-
-  @Test
-  public void timestampLabelWithInvalidFormatThrowsError() {
-    thrown.expect(NumberFormatException.class);
-    parse("not-a-timestamp");
-  }
-
-  @Test
-  public void timestampLabelWithInvalidFormat2ThrowsError() {
-    thrown.expect(NumberFormatException.class);
-    parse("null");
-  }
-
-  @Test
-  public void timestampLabelWithInvalidFormat3ThrowsError() {
-    thrown.expect(NumberFormatException.class);
-    parse("2015-10");
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339WithSmallYear() {
-    // Google and JodaTime agree on dates after 1582-10-15, when the Gregorian Calendar was adopted
-    // This is therefore a "small year" until this difference is reconciled.
-    roundTripRfc339("1582-10-15T01:23:45.123Z");
-  }
-
-  @Test
-  public void timestampLabelParsesRfc3339WithLargeYear() {
-    // Year 9999 in range.
-    roundTripRfc339("9999-10-29T23:41:41.123999Z");
-  }
-
-  @Test
-  public void timestampLabelRfc3339WithTooLargeYearThrowsError() {
-    thrown.expect(NumberFormatException.class);
-    // Year 10000 out of range.
-    parse("10000-10-29T23:41:41.123999Z");
-  }
-
-  //
-  // Paths
-  //
-
-  @Test
-  public void projectPathFromIdWellFormed() {
-    ProjectPath path = PubsubClient.projectPathFromId("test");
-    assertEquals("projects/test", path.getPath());
-  }
-
-  @Test
-  public void subscriptionPathFromNameWellFormed() {
-    SubscriptionPath path = PubsubClient.subscriptionPathFromName("test", "something");
-    assertEquals("projects/test/subscriptions/something", path.getPath());
-    assertEquals("/subscriptions/test/something", path.getV1Beta1Path());
-  }
-
-  @Test
-  public void topicPathFromNameWellFormed() {
-    TopicPath path = PubsubClient.topicPathFromName("test", "something");
-    assertEquals("projects/test/topics/something", path.getPath());
-    assertEquals("/topics/test/something", path.getV1Beta1Path());
-  }
-}