You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:33 UTC

[25/55] [abbrv] beam git commit: Change Nexmark pom structure to mirror other modules on Beam

Change Nexmark pom structure to mirror other modules on Beam

Fix compile after PubsubIO refactor


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8098bb1d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8098bb1d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8098bb1d

Branch: refs/heads/master
Commit: 8098bb1dbcc22153960d9b4483327e2977641148
Parents: 7ef49dc
Author: Ismaël Mejía <ie...@apache.org>
Authored: Thu Apr 13 10:47:54 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:27 2017 +0200

----------------------------------------------------------------------
 integration/java/nexmark/pom.xml                |  48 +-
 .../beam/integration/nexmark/NexmarkRunner.java |   2 +-
 .../integration/nexmark/io/PubsubClient.java    | 543 +++++++++++++++++++
 .../integration/nexmark/io/PubsubHelper.java    |   2 -
 .../nexmark/io/PubsubJsonClient.java            | 318 +++++++++++
 .../nexmark/io/PubsubTestClient.java            | 436 +++++++++++++++
 integration/java/pom.xml                        |  37 ++
 integration/pom.xml                             |  37 ++
 pom.xml                                         |   2 +-
 9 files changed, 1401 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml
index 7cd7d39..67d6117 100644
--- a/integration/java/nexmark/pom.xml
+++ b/integration/java/nexmark/pom.xml
@@ -22,19 +22,17 @@
 
   <parent>
     <groupId>org.apache.beam</groupId>
-    <artifactId>beam-parent</artifactId>
+    <artifactId>beam-integration-java-parent</artifactId>
     <version>0.7.0-SNAPSHOT</version>
-    <relativePath>../../../pom.xml</relativePath>
+    <relativePath>../pom.xml</relativePath>
   </parent>
 
-  <artifactId>beam-integration-java</artifactId>
+  <artifactId>beam-integration-java-nexmark</artifactId>
   <name>Apache Beam :: Integration Tests :: Java :: Nexmark</name>
 
   <packaging>jar</packaging>
 
   <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     <flink.version>1.2.0</flink.version>
     <spark.version>1.6.3</spark.version>
     <apex.codehaus.jackson.version>1.9.3</apex.codehaus.jackson.version>
@@ -253,11 +251,36 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-pubsub</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.auth</groupId>
+      <artifactId>google-auth-library-credentials</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.auth</groupId>
+      <artifactId>google-auth-library-oauth2-http</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.cloud.bigdataoss</groupId>
       <artifactId>gcsio</artifactId>
     </dependency>
 
     <dependency>
+      <groupId>com.google.cloud.bigdataoss</groupId>
+      <artifactId>util</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-annotations</artifactId>
     </dependency>
@@ -288,13 +311,6 @@
       <scope>compile</scope>
     </dependency>
 
-    <!--<dependency>-->
-      <!--<groupId>org.slf4j</groupId>-->
-      <!--<artifactId>slf4j-jdk14</artifactId>-->
-      <!--&lt;!&ndash; When loaded at runtime this will wire up slf4j to the JUL backend &ndash;&gt;-->
-      <!--<scope>runtime</scope>-->
-    <!--</dependency>-->
-
     <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
@@ -305,13 +321,5 @@
       <artifactId>junit</artifactId>
       <scope>compile</scope>
     </dependency>
-
-    <!--<dependency>-->
-      <!--<groupId>io.netty</groupId>-->
-      <!--<artifactId>netty-tcnative-boringssl-static</artifactId>-->
-      <!--<version>1.1.33.Fork13</version>-->
-      <!--<classifier>${os.detected.classifier}</classifier>-->
-      <!--<scope>runtime</scope>-->
-    <!--</dependency>-->
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index df1000a..3a0452f 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -67,9 +67,9 @@ import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.io.PubsubIO;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;

http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java
new file mode 100644
index 0000000..687aa35
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark.io;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.client.util.DateTime;
+import com.google.common.base.Objects;
+import com.google.common.base.Strings;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PubsubOptions;
+
+/**
+ * An (abstract) helper class for talking to Pubsub via an underlying transport.
+ */
+abstract class PubsubClient implements Closeable {
+  /**
+   * Factory for creating clients.
+   */
+  public interface PubsubClientFactory extends Serializable {
+    /**
+     * Construct a new Pubsub client. It should be closed via {@link #close} in order
+     * to ensure tidy cleanup of underlying netty resources (or use the try-with-resources
+     * construct). Uses {@code options} to derive pubsub endpoints and application credentials.
+     * If non-{@literal null}, use {@code timestampLabel} and {@code idLabel} to store custom
+     * timestamps/ids within message metadata.
+     */
+    PubsubClient newClient(@Nullable String timestampLabel,
+        @Nullable String idLabel, PubsubOptions options) throws IOException;
+
+    /**
+     * Return the display name for this factory. Eg "Json", "gRPC".
+     */
+    String getKind();
+  }
+
+  /**
+   * Return timestamp as ms-since-unix-epoch corresponding to {@code timestamp}.
+   * Return {@literal null} if no timestamp could be found. Throw {@link IllegalArgumentException}
+   * if timestamp cannot be recognized.
+   */
+  @Nullable
+  private static Long asMsSinceEpoch(@Nullable String timestamp) {
+    if (Strings.isNullOrEmpty(timestamp)) {
+      return null;
+    }
+    try {
+      // Try parsing as milliseconds since epoch. Note there is no way to parse a
+      // string in RFC 3339 format here.
+      // Expected IllegalArgumentException if parsing fails; we use that to fall back
+      // to RFC 3339.
+      return Long.parseLong(timestamp);
+    } catch (IllegalArgumentException e1) {
+      // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an
+      // IllegalArgumentException if parsing fails, and the caller should handle.
+      return DateTime.parseRfc3339(timestamp).getValue();
+    }
+  }
+
+  /**
+   * Return the timestamp (in ms since unix epoch) to use for a Pubsub message with {@code
+   * attributes} and {@code pubsubTimestamp}.
+   *
+   * <p>If {@code timestampLabel} is non-{@literal null} then the message attributes must contain
+   * that label, and the value of that label will be taken as the timestamp.
+   * Otherwise the timestamp will be taken from the Pubsub publish timestamp {@code
+   * pubsubTimestamp}.
+   *
+   * @throws IllegalArgumentException if the timestamp cannot be recognized as a ms-since-unix-epoch
+   * or RFC3339 time.
+   */
+  protected static long extractTimestamp(
+      @Nullable String timestampLabel,
+      @Nullable String pubsubTimestamp,
+      @Nullable Map<String, String> attributes) {
+    Long timestampMsSinceEpoch;
+    if (Strings.isNullOrEmpty(timestampLabel)) {
+      timestampMsSinceEpoch = asMsSinceEpoch(pubsubTimestamp);
+      checkArgument(timestampMsSinceEpoch != null,
+                    "Cannot interpret PubSub publish timestamp: %s",
+                    pubsubTimestamp);
+    } else {
+      String value = attributes == null ? null : attributes.get(timestampLabel);
+      checkArgument(value != null,
+                    "PubSub message is missing a value for timestamp label %s",
+                    timestampLabel);
+      timestampMsSinceEpoch = asMsSinceEpoch(value);
+      checkArgument(timestampMsSinceEpoch != null,
+                    "Cannot interpret value of label %s as timestamp: %s",
+                    timestampLabel, value);
+    }
+    return timestampMsSinceEpoch;
+  }
+
+  /**
+   * Path representing a cloud project id.
+   */
+  static class ProjectPath implements Serializable {
+    private final String projectId;
+
+    /**
+     * Creates a {@link ProjectPath} from a {@link String} representation, which
+     * must be of the form {@code "projects/" + projectId}.
+     */
+    ProjectPath(String path) {
+      String[] splits = path.split("/");
+      checkArgument(
+          splits.length == 2 && splits[0].equals("projects"),
+          "Malformed project path \"%s\": must be of the form \"projects/\" + <project id>",
+          path);
+      this.projectId = splits[1];
+    }
+
+    public String getPath() {
+      return String.format("projects/%s", projectId);
+    }
+
+    public String getId() {
+      return projectId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      ProjectPath that = (ProjectPath) o;
+
+      return projectId.equals(that.projectId);
+    }
+
+    @Override
+    public int hashCode() {
+      return projectId.hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return getPath();
+    }
+  }
+
+  public static ProjectPath projectPathFromPath(String path) {
+    return new ProjectPath(path);
+  }
+
+  public static ProjectPath projectPathFromId(String projectId) {
+    return new ProjectPath(String.format("projects/%s", projectId));
+  }
+
+  /**
+   * Path representing a Pubsub subscription.
+   */
+  public static class SubscriptionPath implements Serializable {
+    private final String projectId;
+    private final String subscriptionName;
+
+    SubscriptionPath(String path) {
+      String[] splits = path.split("/");
+      checkState(
+          splits.length == 4 && splits[0].equals("projects") && splits[2].equals("subscriptions"),
+          "Malformed subscription path %s: "
+          + "must be of the form \"projects/\" + <project id> + \"subscriptions\"", path);
+      this.projectId = splits[1];
+      this.subscriptionName = splits[3];
+    }
+
+    public String getPath() {
+      return String.format("projects/%s/subscriptions/%s", projectId, subscriptionName);
+    }
+
+    public String getName() {
+      return subscriptionName;
+    }
+
+    public String getV1Beta1Path() {
+      return String.format("/subscriptions/%s/%s", projectId, subscriptionName);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      SubscriptionPath that = (SubscriptionPath) o;
+      return this.subscriptionName.equals(that.subscriptionName)
+          && this.projectId.equals(that.projectId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(projectId, subscriptionName);
+    }
+
+    @Override
+    public String toString() {
+      return getPath();
+    }
+  }
+
+  public static SubscriptionPath subscriptionPathFromPath(String path) {
+    return new SubscriptionPath(path);
+  }
+
+  public static SubscriptionPath subscriptionPathFromName(
+      String projectId, String subscriptionName) {
+    return new SubscriptionPath(String.format("projects/%s/subscriptions/%s",
+                                              projectId, subscriptionName));
+  }
+
+  /**
+   * Path representing a Pubsub topic.
+   */
+  public static class TopicPath implements Serializable {
+    private final String path;
+
+    TopicPath(String path) {
+      this.path = path;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    public String getName() {
+      String[] splits = path.split("/");
+      checkState(splits.length == 4, "Malformed topic path %s", path);
+      return splits[3];
+    }
+
+    public String getV1Beta1Path() {
+      String[] splits = path.split("/");
+      checkState(splits.length == 4, "Malformed topic path %s", path);
+      return String.format("/topics/%s/%s", splits[1], splits[3]);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      TopicPath topicPath = (TopicPath) o;
+      return path.equals(topicPath.path);
+    }
+
+    @Override
+    public int hashCode() {
+      return path.hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return path;
+    }
+  }
+
+  public static TopicPath topicPathFromPath(String path) {
+    return new TopicPath(path);
+  }
+
+  public static TopicPath topicPathFromName(String projectId, String topicName) {
+    return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName));
+  }
+
+  /**
+   * A message to be sent to Pubsub.
+   *
+   * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}.
+   * Java serialization is never used for non-test clients.
+   */
+  static class OutgoingMessage implements Serializable {
+    /**
+     * Underlying (encoded) element.
+     */
+    public final byte[] elementBytes;
+
+    public final Map<String, String> attributes;
+
+    /**
+     * Timestamp for element (ms since epoch).
+     */
+    public final long timestampMsSinceEpoch;
+
+    /**
+     * If using an id label, the record id to associate with this record's metadata so the receiver
+     * can reject duplicates. Otherwise {@literal null}.
+     */
+    @Nullable
+    public final String recordId;
+
+    public OutgoingMessage(byte[] elementBytes, Map<String, String> attributes,
+                           long timestampMsSinceEpoch, @Nullable String recordId) {
+      this.elementBytes = elementBytes;
+      this.attributes = attributes;
+      this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+      this.recordId = recordId;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("OutgoingMessage(%db, %dms)",
+                           elementBytes.length, timestampMsSinceEpoch);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      OutgoingMessage that = (OutgoingMessage) o;
+
+      return timestampMsSinceEpoch == that.timestampMsSinceEpoch
+              && Arrays.equals(elementBytes, that.elementBytes)
+              && Objects.equal(attributes, that.attributes)
+              && Objects.equal(recordId, that.recordId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch,
+              recordId);
+    }
+  }
+
+  /**
+   * A message received from Pubsub.
+   *
+   * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}.
+   * Java serialization is never used for non-test clients.
+   */
+  static class IncomingMessage implements Serializable {
+    /**
+     * Underlying (encoded) element.
+     */
+    public final byte[] elementBytes;
+
+    public Map<String, String> attributes;
+
+    /**
+     * Timestamp for element (ms since epoch). Either Pubsub's processing time,
+     * or the custom timestamp associated with the message.
+     */
+    public final long timestampMsSinceEpoch;
+
+    /**
+     * Timestamp (in system time) at which we requested the message (ms since epoch).
+     */
+    public final long requestTimeMsSinceEpoch;
+
+    /**
+     * Id to pass back to Pubsub to acknowledge receipt of this message.
+     */
+    public final String ackId;
+
+    /**
+     * Id to pass to the runner to distinguish this message from all others.
+     */
+    public final String recordId;
+
+    public IncomingMessage(
+        byte[] elementBytes,
+        Map<String, String> attributes,
+        long timestampMsSinceEpoch,
+        long requestTimeMsSinceEpoch,
+        String ackId,
+        String recordId) {
+      this.elementBytes = elementBytes;
+      this.attributes = attributes;
+      this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+      this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
+      this.ackId = ackId;
+      this.recordId = recordId;
+    }
+
+    public IncomingMessage withRequestTime(long requestTimeMsSinceEpoch) {
+      return new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
+              requestTimeMsSinceEpoch, ackId, recordId);
+    }
+
+    @Override
+    public String toString() {
+      return String.format("IncomingMessage(%db, %dms)",
+                           elementBytes.length, timestampMsSinceEpoch);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      IncomingMessage that = (IncomingMessage) o;
+
+      return timestampMsSinceEpoch == that.timestampMsSinceEpoch
+             && requestTimeMsSinceEpoch == that.requestTimeMsSinceEpoch
+             && ackId.equals(that.ackId)
+             && recordId.equals(that.recordId)
+             && Arrays.equals(elementBytes, that.elementBytes)
+              && Objects.equal(attributes, that.attributes);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch,
+                              requestTimeMsSinceEpoch,
+                              ackId, recordId);
+    }
+  }
+
+  /**
+   * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages
+   * published.
+   */
+  public abstract int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
+      throws IOException;
+
+  /**
+   * Request the next batch of up to {@code batchSize} messages from {@code subscription}.
+   * Return the received messages, or empty collection if none were available. Does not
+   * wait for messages to arrive if {@code returnImmediately} is {@literal true}.
+   * Returned messages will record their request time as {@code requestTimeMsSinceEpoch}.
+   */
+  public abstract List<IncomingMessage> pull(
+      long requestTimeMsSinceEpoch,
+      SubscriptionPath subscription,
+      int batchSize,
+      boolean returnImmediately)
+      throws IOException;
+
+  /**
+   * Acknowldege messages from {@code subscription} with {@code ackIds}.
+   */
+  public abstract void acknowledge(SubscriptionPath subscription, List<String> ackIds)
+      throws IOException;
+
+  /**
+   * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to
+   * be {@code deadlineSeconds} from now.
+   */
+  public abstract void modifyAckDeadline(
+      SubscriptionPath subscription, List<String> ackIds,
+      int deadlineSeconds) throws IOException;
+
+  /**
+   * Create {@code topic}.
+   */
+  public abstract void createTopic(TopicPath topic) throws IOException;
+
+  /*
+   * Delete {@code topic}.
+   */
+  public abstract void deleteTopic(TopicPath topic) throws IOException;
+
+  /**
+   * Return a list of topics for {@code project}.
+   */
+  public abstract List<TopicPath> listTopics(ProjectPath project) throws IOException;
+
+  /**
+   * Create {@code subscription} to {@code topic}.
+   */
+  public abstract void createSubscription(
+      TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException;
+
+  /**
+   * Create a random subscription for {@code topic}. Return the {@link SubscriptionPath}. It
+   * is the responsibility of the caller to later delete the subscription.
+   */
+  public SubscriptionPath createRandomSubscription(
+      ProjectPath project, TopicPath topic, int ackDeadlineSeconds) throws IOException {
+    // Create a randomized subscription derived from the topic name.
+    String subscriptionName = topic.getName() + "_beam_" + ThreadLocalRandom.current().nextLong();
+    SubscriptionPath subscription =
+        PubsubClient
+            .subscriptionPathFromName(project.getId(), subscriptionName);
+    createSubscription(topic, subscription, ackDeadlineSeconds);
+    return subscription;
+  }
+
+  /**
+   * Delete {@code subscription}.
+   */
+  public abstract void deleteSubscription(SubscriptionPath subscription) throws IOException;
+
+  /**
+   * Return a list of subscriptions for {@code topic} in {@code project}.
+   */
+  public abstract List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
+      throws IOException;
+
+  /**
+   * Return the ack deadline, in seconds, for {@code subscription}.
+   */
+  public abstract int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException;
+
+  /**
+   * Return {@literal true} if {@link #pull} will always return empty list. Actual clients
+   * will return {@literal false}. Test clients may return {@literal true} to signal that all
+   * expected messages have been pulled and the test may complete.
+   */
+  public abstract boolean isEOF();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
index f5cfc2b..15401b7 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
@@ -24,8 +24,6 @@ import java.util.List;
 
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.sdk.options.PubsubOptions;
-import org.apache.beam.sdk.util.PubsubClient;
-import org.apache.beam.sdk.util.PubsubJsonClient;
 
 /**
  * Helper for working with pubsub.

http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java
new file mode 100644
index 0000000..b778a09
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark.io;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.Pubsub.Builder;
+import com.google.api.services.pubsub.model.AcknowledgeRequest;
+import com.google.api.services.pubsub.model.ListSubscriptionsResponse;
+import com.google.api.services.pubsub.model.ListTopicsResponse;
+import com.google.api.services.pubsub.model.ModifyAckDeadlineRequest;
+import com.google.api.services.pubsub.model.PublishRequest;
+import com.google.api.services.pubsub.model.PublishResponse;
+import com.google.api.services.pubsub.model.PubsubMessage;
+import com.google.api.services.pubsub.model.PullRequest;
+import com.google.api.services.pubsub.model.PullResponse;
+import com.google.api.services.pubsub.model.ReceivedMessage;
+import com.google.api.services.pubsub.model.Subscription;
+import com.google.api.services.pubsub.model.Topic;
+import com.google.auth.Credentials;
+import com.google.auth.http.HttpCredentialsAdapter;
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+import org.apache.beam.sdk.util.Transport;
+
+/**
+ * A Pubsub client using JSON transport.
+ */
+class PubsubJsonClient extends PubsubClient {
+
+  private static class PubsubJsonClientFactory implements PubsubClientFactory {
+    private static HttpRequestInitializer chainHttpRequestInitializer(
+        Credentials credential, HttpRequestInitializer httpRequestInitializer) {
+      if (credential == null) {
+        return httpRequestInitializer;
+      } else {
+        return new ChainingHttpRequestInitializer(
+            new HttpCredentialsAdapter(credential),
+            httpRequestInitializer);
+      }
+    }
+
+    @Override
+    public PubsubClient newClient(
+        @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+        throws IOException {
+      Pubsub pubsub = new Builder(
+          Transport.getTransport(),
+          Transport.getJsonFactory(),
+          chainHttpRequestInitializer(
+              options.getGcpCredential(),
+              // Do not log 404. It clutters the output and is possibly even required by the caller.
+              new RetryHttpRequestInitializer(ImmutableList.of(404))))
+          .setRootUrl(options.getPubsubRootUrl())
+          .setApplicationName(options.getAppName())
+          .setGoogleClientRequestInitializer(options.getGoogleApiTrace())
+          .build();
+      return new PubsubJsonClient(timestampLabel, idLabel, pubsub);
+    }
+
+    @Override
+    public String getKind() {
+      return "Json";
+    }
+  }
+
+  /**
+   * Factory for creating Pubsub clients using Json transport.
+   */
+  public static final PubsubClientFactory FACTORY = new PubsubJsonClientFactory();
+
+  /**
+   * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
+   * instead.
+   */
+  @Nullable
+  private final String timestampLabel;
+
+  /**
+   * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
+   */
+  @Nullable
+  private final String idLabel;
+
+  /**
+   * Underlying JSON transport.
+   */
+  private Pubsub pubsub;
+
+  @VisibleForTesting PubsubJsonClient(
+      @Nullable String timestampLabel,
+      @Nullable String idLabel,
+      Pubsub pubsub) {
+    this.timestampLabel = timestampLabel;
+    this.idLabel = idLabel;
+    this.pubsub = pubsub;
+  }
+
+  @Override
+  public void close() {
+    // Nothing to close.
+  }
+
+  @Override
+  public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
+      throws IOException {
+    List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size());
+    for (OutgoingMessage outgoingMessage : outgoingMessages) {
+      PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes);
+
+      Map<String, String> attributes = outgoingMessage.attributes;
+      if ((timestampLabel != null || idLabel != null) && attributes == null) {
+        attributes = new TreeMap<>();
+      }
+      if (attributes != null) {
+        pubsubMessage.setAttributes(attributes);
+      }
+
+      if (timestampLabel != null) {
+        attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+      }
+
+      if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+        attributes.put(idLabel, outgoingMessage.recordId);
+      }
+
+      pubsubMessages.add(pubsubMessage);
+    }
+    PublishRequest request = new PublishRequest().setMessages(pubsubMessages);
+    PublishResponse response = pubsub.projects()
+                                     .topics()
+                                     .publish(topic.getPath(), request)
+                                     .execute();
+    return response.getMessageIds().size();
+  }
+
+  @Override
+  public List<IncomingMessage> pull(
+      long requestTimeMsSinceEpoch,
+      SubscriptionPath subscription,
+      int batchSize,
+      boolean returnImmediately) throws IOException {
+    PullRequest request = new PullRequest()
+        .setReturnImmediately(returnImmediately)
+        .setMaxMessages(batchSize);
+    PullResponse response = pubsub.projects()
+                                  .subscriptions()
+                                  .pull(subscription.getPath(), request)
+                                  .execute();
+    if (response.getReceivedMessages() == null || response.getReceivedMessages().size() == 0) {
+      return ImmutableList.of();
+    }
+    List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessages().size());
+    for (ReceivedMessage message : response.getReceivedMessages()) {
+      PubsubMessage pubsubMessage = message.getMessage();
+      @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
+
+      // Payload.
+      byte[] elementBytes = pubsubMessage.decodeData();
+
+      // Timestamp.
+      long timestampMsSinceEpoch =
+          extractTimestamp(timestampLabel, message.getMessage().getPublishTime(), attributes);
+
+      // Ack id.
+      String ackId = message.getAckId();
+      checkState(!Strings.isNullOrEmpty(ackId));
+
+      // Record id, if any.
+      @Nullable String recordId = null;
+      if (idLabel != null && attributes != null) {
+        recordId = attributes.get(idLabel);
+      }
+      if (Strings.isNullOrEmpty(recordId)) {
+        // Fall back to the Pubsub provided message id.
+        recordId = pubsubMessage.getMessageId();
+      }
+
+      incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
+                                               requestTimeMsSinceEpoch, ackId, recordId));
+    }
+
+    return incomingMessages;
+  }
+
+  @Override
+  public void acknowledge(SubscriptionPath subscription, List<String> ackIds) throws IOException {
+    AcknowledgeRequest request = new AcknowledgeRequest().setAckIds(ackIds);
+    pubsub.projects()
+          .subscriptions()
+          .acknowledge(subscription.getPath(), request)
+          .execute(); // ignore Empty result.
+  }
+
+  @Override
+  public void modifyAckDeadline(
+      SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds)
+      throws IOException {
+    ModifyAckDeadlineRequest request =
+        new ModifyAckDeadlineRequest().setAckIds(ackIds)
+                                      .setAckDeadlineSeconds(deadlineSeconds);
+    pubsub.projects()
+          .subscriptions()
+          .modifyAckDeadline(subscription.getPath(), request)
+          .execute(); // ignore Empty result.
+  }
+
+  @Override
+  public void createTopic(TopicPath topic) throws IOException {
+    pubsub.projects()
+          .topics()
+          .create(topic.getPath(), new Topic())
+          .execute(); // ignore Topic result.
+  }
+
+  @Override
+  public void deleteTopic(TopicPath topic) throws IOException {
+    pubsub.projects()
+          .topics()
+          .delete(topic.getPath())
+          .execute(); // ignore Empty result.
+  }
+
+  @Override
+  public List<TopicPath> listTopics(ProjectPath project) throws IOException {
+    ListTopicsResponse response = pubsub.projects()
+                                        .topics()
+                                        .list(project.getPath())
+                                        .execute();
+    if (response.getTopics() == null || response.getTopics().isEmpty()) {
+      return ImmutableList.of();
+    }
+    List<TopicPath> topics = new ArrayList<>(response.getTopics().size());
+    for (Topic topic : response.getTopics()) {
+      topics.add(topicPathFromPath(topic.getName()));
+    }
+    return topics;
+  }
+
+  @Override
+  public void createSubscription(
+      TopicPath topic, SubscriptionPath subscription,
+      int ackDeadlineSeconds) throws IOException {
+    Subscription request = new Subscription()
+        .setTopic(topic.getPath())
+        .setAckDeadlineSeconds(ackDeadlineSeconds);
+    pubsub.projects()
+          .subscriptions()
+          .create(subscription.getPath(), request)
+          .execute(); // ignore Subscription result.
+  }
+
+  @Override
+  public void deleteSubscription(SubscriptionPath subscription) throws IOException {
+    pubsub.projects()
+          .subscriptions()
+          .delete(subscription.getPath())
+          .execute(); // ignore Empty result.
+  }
+
+  @Override
+  public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
+      throws IOException {
+    ListSubscriptionsResponse response = pubsub.projects()
+                                               .subscriptions()
+                                               .list(project.getPath())
+                                               .execute();
+    if (response.getSubscriptions() == null || response.getSubscriptions().isEmpty()) {
+      return ImmutableList.of();
+    }
+    List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptions().size());
+    for (Subscription subscription : response.getSubscriptions()) {
+      if (subscription.getTopic().equals(topic.getPath())) {
+        subscriptions.add(subscriptionPathFromPath(subscription.getName()));
+      }
+    }
+    return subscriptions;
+  }
+
+  @Override
+  public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
+    Subscription response = pubsub.projects().subscriptions().get(subscription.getPath()).execute();
+    return response.getAckDeadlineSeconds();
+  }
+
+  @Override
+  public boolean isEOF() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java
new file mode 100644
index 0000000..125a8d6
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark.io;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.client.util.Clock;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PubsubOptions;
+
+/**
+ * A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for
+ * testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline}
+ * methods. Relies on statics to mimic the Pubsub service, though we try to hide that.
+ */
+class PubsubTestClient extends PubsubClient implements Serializable {
+  /**
+   * Mimic the state of the simulated Pubsub 'service'.
+   *
+   * <p>Note that the {@link PubsubTestClientFactory} is serialized/deserialized even when running
+   * test pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s to be created
+   * from the same client factory and run in parallel. Thus we can't enforce aliasing of the
+   * following data structures over all clients and must resort to a static.
+   */
+  private static class State {
+    /**
+     * True if has been primed for a test but not yet validated.
+     */
+    boolean isActive;
+
+    /**
+     * Publish mode only: Only publish calls for this topic are allowed.
+     */
+    @Nullable
+    TopicPath expectedTopic;
+
+    /**
+     * Publish mode only: Messages yet to seen in a {@link #publish} call.
+     */
+    @Nullable
+    Set<OutgoingMessage> remainingExpectedOutgoingMessages;
+
+    /**
+     * Publish mode only: Messages which should throw when first sent to simulate transient publish
+     * failure.
+     */
+    @Nullable
+    Set<OutgoingMessage> remainingFailingOutgoingMessages;
+
+    /**
+     * Pull mode only: Clock from which to get current time.
+     */
+    @Nullable
+    Clock clock;
+
+    /**
+     * Pull mode only: Only pull calls for this subscription are allowed.
+     */
+    @Nullable
+    SubscriptionPath expectedSubscription;
+
+    /**
+     * Pull mode only: Timeout to simulate.
+     */
+    int ackTimeoutSec;
+
+    /**
+     * Pull mode only: Messages waiting to be received by a {@link #pull} call.
+     */
+    @Nullable
+    List<IncomingMessage> remainingPendingIncomingMessages;
+
+    /**
+     * Pull mode only: Messages which have been returned from a {@link #pull} call and
+     * not yet ACKed by an {@link #acknowledge} call.
+     */
+    @Nullable
+    Map<String, IncomingMessage> pendingAckIncomingMessages;
+
+    /**
+     * Pull mode only: When above messages are due to have their ACK deadlines expire.
+     */
+    @Nullable
+    Map<String, Long> ackDeadline;
+  }
+
+  private static final State STATE = new State();
+
+  /** Closing the factory will validate all expected messages were processed. */
+  public interface PubsubTestClientFactory
+          extends PubsubClientFactory, Closeable, Serializable {
+  }
+
+  /**
+   * Return a factory for testing publishers. Only one factory may be in-flight at a time.
+   * The factory must be closed when the test is complete, at which point final validation will
+   * occur.
+   */
+  static PubsubTestClientFactory createFactoryForPublish(
+      final TopicPath expectedTopic,
+      final Iterable<OutgoingMessage> expectedOutgoingMessages,
+      final Iterable<OutgoingMessage> failingOutgoingMessages) {
+    synchronized (STATE) {
+      checkState(!STATE.isActive, "Test still in flight");
+      STATE.expectedTopic = expectedTopic;
+      STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages);
+      STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages);
+      STATE.isActive = true;
+    }
+    return new PubsubTestClientFactory() {
+      @Override
+      public PubsubClient newClient(
+          @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+          throws IOException {
+        return new PubsubTestClient();
+      }
+
+      @Override
+      public String getKind() {
+        return "PublishTest";
+      }
+
+      @Override
+      public void close() {
+        synchronized (STATE) {
+          checkState(STATE.isActive, "No test still in flight");
+          checkState(STATE.remainingExpectedOutgoingMessages.isEmpty(),
+                     "Still waiting for %s messages to be published",
+                     STATE.remainingExpectedOutgoingMessages.size());
+          STATE.isActive = false;
+          STATE.remainingExpectedOutgoingMessages = null;
+        }
+      }
+    };
+  }
+
+  /**
+   * Return a factory for testing subscribers. Only one factory may be in-flight at a time.
+   * The factory must be closed when the test in complete
+   */
+  public static PubsubTestClientFactory createFactoryForPull(
+      final Clock clock,
+      final SubscriptionPath expectedSubscription,
+      final int ackTimeoutSec,
+      final Iterable<IncomingMessage> expectedIncomingMessages) {
+    synchronized (STATE) {
+      checkState(!STATE.isActive, "Test still in flight");
+      STATE.clock = clock;
+      STATE.expectedSubscription = expectedSubscription;
+      STATE.ackTimeoutSec = ackTimeoutSec;
+      STATE.remainingPendingIncomingMessages = Lists.newArrayList(expectedIncomingMessages);
+      STATE.pendingAckIncomingMessages = new HashMap<>();
+      STATE.ackDeadline = new HashMap<>();
+      STATE.isActive = true;
+    }
+    return new PubsubTestClientFactory() {
+      @Override
+      public PubsubClient newClient(
+          @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+          throws IOException {
+        return new PubsubTestClient();
+      }
+
+      @Override
+      public String getKind() {
+        return "PullTest";
+      }
+
+      @Override
+      public void close() {
+        synchronized (STATE) {
+          checkState(STATE.isActive, "No test still in flight");
+          checkState(STATE.remainingPendingIncomingMessages.isEmpty(),
+                     "Still waiting for %s messages to be pulled",
+                     STATE.remainingPendingIncomingMessages.size());
+          checkState(STATE.pendingAckIncomingMessages.isEmpty(),
+                     "Still waiting for %s messages to be ACKed",
+                     STATE.pendingAckIncomingMessages.size());
+          checkState(STATE.ackDeadline.isEmpty(),
+                     "Still waiting for %s messages to be ACKed",
+                     STATE.ackDeadline.size());
+          STATE.isActive = false;
+          STATE.remainingPendingIncomingMessages = null;
+          STATE.pendingAckIncomingMessages = null;
+          STATE.ackDeadline = null;
+        }
+      }
+    };
+  }
+
+  public static PubsubTestClientFactory createFactoryForCreateSubscription() {
+    return new PubsubTestClientFactory() {
+      int numCalls = 0;
+
+      @Override
+      public void close() throws IOException {
+        checkState(
+            numCalls == 1, "Expected exactly one subscription to be created, got %s", numCalls);
+      }
+
+      @Override
+      public PubsubClient newClient(
+          @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+          throws IOException {
+        return new PubsubTestClient() {
+          @Override
+          public void createSubscription(
+              TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds)
+              throws IOException {
+            checkState(numCalls == 0, "Expected at most one subscription to be created");
+            numCalls++;
+          }
+        };
+      }
+
+      @Override
+      public String getKind() {
+        return "CreateSubscriptionTest";
+      }
+    };
+  }
+
+  /**
+   * Return true if in pull mode.
+   */
+  private boolean inPullMode() {
+    checkState(STATE.isActive, "No test is active");
+    return STATE.expectedSubscription != null;
+  }
+
+  /**
+   * Return true if in publish mode.
+   */
+  private boolean inPublishMode() {
+    checkState(STATE.isActive, "No test is active");
+    return STATE.expectedTopic != null;
+  }
+
+  /**
+   * For subscription mode only:
+   * Track progression of time according to the {@link Clock} passed . This will simulate Pubsub
+   * expiring
+   * outstanding ACKs.
+   */
+  public void advance() {
+    synchronized (STATE) {
+      checkState(inPullMode(), "Can only advance in pull mode");
+      // Any messages who's ACKs timed out are available for re-pulling.
+      Iterator<Map.Entry<String, Long>> deadlineItr = STATE.ackDeadline.entrySet().iterator();
+      while (deadlineItr.hasNext()) {
+        Map.Entry<String, Long> entry = deadlineItr.next();
+        if (entry.getValue() <= STATE.clock.currentTimeMillis()) {
+          STATE.remainingPendingIncomingMessages.add(
+              STATE.pendingAckIncomingMessages.remove(entry.getKey()));
+          deadlineItr.remove();
+        }
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public int publish(
+      TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException {
+    synchronized (STATE) {
+      checkState(inPublishMode(), "Can only publish in publish mode");
+      checkState(topic.equals(STATE.expectedTopic), "Topic %s does not match expected %s", topic,
+                 STATE.expectedTopic);
+      for (OutgoingMessage outgoingMessage : outgoingMessages) {
+        if (STATE.remainingFailingOutgoingMessages.remove(outgoingMessage)) {
+          throw new RuntimeException("Simulating failure for " + outgoingMessage);
+        }
+        checkState(STATE.remainingExpectedOutgoingMessages.remove(outgoingMessage),
+                   "Unexpected outgoing message %s", outgoingMessage);
+      }
+      return outgoingMessages.size();
+    }
+  }
+
+  @Override
+  public List<IncomingMessage> pull(
+      long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize,
+      boolean returnImmediately) throws IOException {
+    synchronized (STATE) {
+      checkState(inPullMode(), "Can only pull in pull mode");
+      long now = STATE.clock.currentTimeMillis();
+      checkState(requestTimeMsSinceEpoch == now,
+                 "Simulated time %s does not match request time %s", now, requestTimeMsSinceEpoch);
+      checkState(subscription.equals(STATE.expectedSubscription),
+                 "Subscription %s does not match expected %s", subscription,
+                 STATE.expectedSubscription);
+      checkState(returnImmediately, "Pull only supported if returning immediately");
+
+      List<IncomingMessage> incomingMessages = new ArrayList<>();
+      Iterator<IncomingMessage> pendItr = STATE.remainingPendingIncomingMessages.iterator();
+      while (pendItr.hasNext()) {
+        IncomingMessage incomingMessage = pendItr.next();
+        pendItr.remove();
+        IncomingMessage incomingMessageWithRequestTime =
+            incomingMessage.withRequestTime(requestTimeMsSinceEpoch);
+        incomingMessages.add(incomingMessageWithRequestTime);
+        STATE.pendingAckIncomingMessages.put(incomingMessageWithRequestTime.ackId,
+                                             incomingMessageWithRequestTime);
+        STATE.ackDeadline.put(incomingMessageWithRequestTime.ackId,
+                              requestTimeMsSinceEpoch + STATE.ackTimeoutSec * 1000);
+        if (incomingMessages.size() >= batchSize) {
+          break;
+        }
+      }
+      return incomingMessages;
+    }
+  }
+
+  @Override
+  public void acknowledge(
+      SubscriptionPath subscription,
+      List<String> ackIds) throws IOException {
+    synchronized (STATE) {
+      checkState(inPullMode(), "Can only acknowledge in pull mode");
+      checkState(subscription.equals(STATE.expectedSubscription),
+                 "Subscription %s does not match expected %s", subscription,
+                 STATE.expectedSubscription);
+
+      for (String ackId : ackIds) {
+        checkState(STATE.ackDeadline.remove(ackId) != null,
+                   "No message with ACK id %s is waiting for an ACK", ackId);
+        checkState(STATE.pendingAckIncomingMessages.remove(ackId) != null,
+                   "No message with ACK id %s is waiting for an ACK", ackId);
+      }
+    }
+  }
+
+  @Override
+  public void modifyAckDeadline(
+      SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) throws IOException {
+    synchronized (STATE) {
+      checkState(inPullMode(), "Can only modify ack deadline in pull mode");
+      checkState(subscription.equals(STATE.expectedSubscription),
+                 "Subscription %s does not match expected %s", subscription,
+                 STATE.expectedSubscription);
+
+      for (String ackId : ackIds) {
+        if (deadlineSeconds > 0) {
+          checkState(STATE.ackDeadline.remove(ackId) != null,
+                     "No message with ACK id %s is waiting for an ACK", ackId);
+          checkState(STATE.pendingAckIncomingMessages.containsKey(ackId),
+                     "No message with ACK id %s is waiting for an ACK", ackId);
+          STATE.ackDeadline.put(ackId, STATE.clock.currentTimeMillis() + deadlineSeconds * 1000);
+        } else {
+          checkState(STATE.ackDeadline.remove(ackId) != null,
+                     "No message with ACK id %s is waiting for an ACK", ackId);
+          IncomingMessage message = STATE.pendingAckIncomingMessages.remove(ackId);
+          checkState(message != null, "No message with ACK id %s is waiting for an ACK", ackId);
+          STATE.remainingPendingIncomingMessages.add(message);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void createTopic(TopicPath topic) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void deleteTopic(TopicPath topic) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<TopicPath> listTopics(ProjectPath project) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void createSubscription(
+      TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void deleteSubscription(SubscriptionPath subscription) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<SubscriptionPath> listSubscriptions(
+      ProjectPath project, TopicPath topic) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
+    synchronized (STATE) {
+      return STATE.ackTimeoutSec;
+    }
+  }
+
+  @Override
+  public boolean isEOF() {
+    synchronized (STATE) {
+      checkState(inPullMode(), "Can only check EOF in pull mode");
+      return STATE.remainingPendingIncomingMessages.isEmpty();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/pom.xml b/integration/java/pom.xml
new file mode 100644
index 0000000..dcad4c3
--- /dev/null
+++ b/integration/java/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-integration-parent</artifactId>
+    <version>0.7.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-integration-java-parent</artifactId>
+  <packaging>pom</packaging>
+  <name>Apache Beam :: Integration Tests :: Java</name>
+
+  <modules>
+    <module>nexmark</module>
+  </modules>
+
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/pom.xml
----------------------------------------------------------------------
diff --git a/integration/pom.xml b/integration/pom.xml
new file mode 100644
index 0000000..4839da5
--- /dev/null
+++ b/integration/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-parent</artifactId>
+    <version>0.7.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-integration-parent</artifactId>
+  <packaging>pom</packaging>
+  <name>Apache Beam :: Integration Tests</name>
+
+  <modules>
+    <module>java</module>
+  </modules>
+
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c92d391..bddbf1f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -187,7 +187,7 @@
     <module>sdks</module>
     <module>runners</module>
     <module>examples</module>
-    <module>integration/java/nexmark</module>
+    <module>integration</module>
     <!-- sdks/java/javadoc builds project-wide Javadoc. It has to run last. -->
     <module>sdks/java/javadoc</module>
   </modules>