You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:35 UTC

[27/55] [abbrv] beam git commit: Remove NexmarkDrivers and make execution runner-agnostic

Remove NexmarkDrivers and make execution runner-agnostic

This configuration should be external to the benchmark to avoid
unexpected dependencies and to have a more Beam like
(runner-independent) apprach.

Add maven profiles to execute NexMark in the different runners

Fix compile after extra PubSubIO refactor and remove PubsubClient.
PubsubClient was used to create and reuse topics, this logic should not
be part of Nexmark because this add extra complexity/dependencies. A
simple script should do this, or the user should provide the
corresponding topics.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a6dbdfa5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a6dbdfa5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a6dbdfa5

Branch: refs/heads/master
Commit: a6dbdfa5457344191ebba383174063270239d9fa
Parents: 8b96949
Author: Ismaël Mejía <ie...@apache.org>
Authored: Sat Apr 29 16:50:22 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:28 2017 +0200

----------------------------------------------------------------------
 integration/java/nexmark/pom.xml                | 294 +++++-----
 .../beam/integration/nexmark/NexmarkDriver.java |   9 +
 .../beam/integration/nexmark/NexmarkRunner.java | 106 ++--
 .../nexmark/drivers/NexmarkApexDriver.java      |  50 --
 .../nexmark/drivers/NexmarkApexRunner.java      |  65 ---
 .../nexmark/drivers/NexmarkDirectDriver.java    |  49 --
 .../nexmark/drivers/NexmarkDirectRunner.java    |  60 --
 .../nexmark/drivers/NexmarkFlinkDriver.java     |  50 --
 .../nexmark/drivers/NexmarkFlinkRunner.java     |  55 --
 .../nexmark/drivers/NexmarkGoogleDriver.java    |  67 ---
 .../nexmark/drivers/NexmarkGoogleRunner.java    | 165 ------
 .../nexmark/drivers/NexmarkSparkDriver.java     |  48 --
 .../nexmark/drivers/NexmarkSparkRunner.java     |  56 --
 .../nexmark/drivers/package-info.java           |  22 -
 .../integration/nexmark/io/PubsubClient.java    | 543 -------------------
 .../integration/nexmark/io/PubsubHelper.java    | 215 --------
 .../nexmark/io/PubsubJsonClient.java            | 318 -----------
 .../nexmark/io/PubsubTestClient.java            | 436 ---------------
 .../integration/nexmark/io/package-info.java    |  22 -
 .../nexmark/sources/BoundedEventSourceTest.java |   8 +-
 20 files changed, 172 insertions(+), 2466 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml
index 103c18f..fb213e9 100644
--- a/integration/java/nexmark/pom.xml
+++ b/integration/java/nexmark/pom.xml
@@ -31,44 +31,114 @@
   <name>Apache Beam :: Integration Tests :: Java :: Nexmark</name>
   <packaging>jar</packaging>
 
-  <properties>
-    <flink.version>1.2.0</flink.version>
-    <spark.version>1.6.3</spark.version>
-    <apex.codehaus.jackson.version>1.9.3</apex.codehaus.jackson.version>
-    <skipITs>true</skipITs>
-  </properties>
+  <profiles>
+
+    <!--
+      The direct runner is available by default.
+      You can also include it on the classpath explicitly with -P direct-runner
+    -->
+    <profile>
+      <id>direct-runner</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-direct-java</artifactId>
+          <scope>runtime</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+
+    <!-- Include the Apache Apex runner with -P apex-runner -->
+    <profile>
+      <id>apex-runner</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-apex</artifactId>
+          <scope>runtime</scope>
+        </dependency>
+        <!--
+          Apex depends on httpclient version 4.3.5, project has a transitive dependency to httpclient 4.0.1 from
+          google-http-client. Apex dependency version being specified explicitly so that it gets picked up. This
+          can be removed when the project no longer has a dependency on a different httpclient version.
+        -->
+        <dependency>
+          <groupId>org.apache.httpcomponents</groupId>
+          <artifactId>httpclient</artifactId>
+          <version>4.3.5</version>
+          <scope>runtime</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>commons-codec</groupId>
+              <artifactId>commons-codec</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+      </dependencies>
+    </profile>
+
+    <!-- Include the Apache Flink runner with -P flink-runner -->
+    <profile>
+      <id>flink-runner</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-flink_2.10</artifactId>
+          <scope>runtime</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+
+    <!-- Include the Apache Spark runner -P spark-runner -->
+    <profile>
+      <id>spark-runner</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-spark</artifactId>
+          <scope>runtime</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-streaming_2.10</artifactId>
+          <version>${spark.version}</version>
+          <scope>runtime</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-core_2.10</artifactId>
+          <version>${spark.version}</version>
+          <scope>runtime</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>org.slf4j</groupId>
+              <artifactId>jul-to-slf4j</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+      </dependencies>
+    </profile>
+
+    <!-- Include the Google Cloud Dataflow runner -P dataflow-runner -->
+    <profile>
+      <id>dataflow-runner</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
+          <scope>runtime</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
 
   <build>
     <plugins>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <configuration>
-          <systemPropertyVariables>
-            <beamTestPipelineOptions>
-            </beamTestPipelineOptions>
-          </systemPropertyVariables>
-        </configuration>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-checkstyle-plugin</artifactId>
-      </plugin>
-
-      <!-- Source plugin for generating source and test-source JARs. -->
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-source-plugin</artifactId>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-shade-plugin</artifactId>
         <executions>
           <execution>
@@ -98,11 +168,6 @@
         </executions>
       </plugin>
 
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-jar-plugin</artifactId>
-      </plugin>
-
       <!-- Avro plugin for automatic code generation -->
       <plugin>
         <groupId>org.apache.avro</groupId>
@@ -127,22 +192,6 @@
         <groupId>org.jacoco</groupId>
         <artifactId>jacoco-maven-plugin</artifactId>
       </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-dependency-plugin</artifactId>
-        <executions>
-          <execution>
-            <goals><goal>analyze-only</goal></goals>
-            <configuration>
-              <!-- Ignore runtime-only dependencies in analysis -->
-              <ignoreNonCompile>true</ignoreNonCompile>
-              <failOnWarning>false</failOnWarning>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-
     </plugins>
   </build>
 
@@ -153,73 +202,6 @@
       <artifactId>beam-sdks-java-core</artifactId>
     </dependency>
 
-    <!-- Java runner for Google Cloud Dataflow -->
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
-    </dependency>
-
-    <!-- Direct runner -->
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-direct-java</artifactId>
-    </dependency>
-
-    <!-- Flink runner -->
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-flink_2.10</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-shaded-hadoop2</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-    </dependency>
-
-    <!-- Spark runner -->
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-spark</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_2.10</artifactId>
-      <version>${spark.version}</version>
-      <scope>runtime</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_2.10</artifactId>
-      <version>${spark.version}</version>
-      <scope>runtime</scope>
-    </dependency>
-
-    <!-- Apex runner -->
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-apex</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.esotericsoftware.kryo</groupId>
-      <artifactId>kryo</artifactId>
-      <version>${apex.kryo.version}</version>
-      <scope>runtime</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.codehaus.jackson</groupId>
-      <artifactId>jackson-mapper-asl</artifactId>
-      <version>${apex.codehaus.jackson.version}</version>
-      <scope>runtime</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.codehaus.jackson</groupId>
-      <artifactId>jackson-core-asl</artifactId>
-      <version>${apex.codehaus.jackson.version}</version>
-      <scope>runtime</scope>
-    </dependency>
-
     <!-- IOs -->
     <dependency>
       <groupId>org.apache.beam</groupId>
@@ -231,57 +213,20 @@
       <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
     </dependency>
 
-    <!-- Extra libraries -->
-    <dependency>
-      <groupId>com.google.apis</groupId>
-      <artifactId>google-api-services-dataflow</artifactId>
-      <version>${dataflow.version}</version>
-      <scope>runtime</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>joda-time</groupId>
-      <artifactId>joda-time</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-core</artifactId>
-    </dependency>
-
     <dependency>
       <groupId>com.google.apis</groupId>
       <artifactId>google-api-services-bigquery</artifactId>
     </dependency>
 
     <dependency>
-      <groupId>com.google.apis</groupId>
-      <artifactId>google-api-services-pubsub</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.auth</groupId>
-      <artifactId>google-auth-library-credentials</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.auth</groupId>
-      <artifactId>google-auth-library-oauth2-http</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.cloud.bigdataoss</groupId>
       <artifactId>gcsio</artifactId>
     </dependency>
 
+    <!-- Extra libraries -->
     <dependency>
-      <groupId>com.google.cloud.bigdataoss</groupId>
-      <artifactId>util</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.http-client</groupId>
-      <artifactId>google-http-client</artifactId>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
     </dependency>
 
     <dependency>
@@ -300,19 +245,18 @@
     </dependency>
 
     <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
     </dependency>
 
     <dependency>
-      <groupId>org.hamcrest</groupId>
-      <artifactId>hamcrest-all</artifactId>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
     </dependency>
 
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
-      <scope>compile</scope>
     </dependency>
 
     <dependency>
@@ -325,5 +269,23 @@
       <artifactId>junit</artifactId>
       <scope>compile</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-core</artifactId>
+      <scope>compile</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+    </dependency>
+
+    <!-- Test -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
index 4714124..7d532cc 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
@@ -31,6 +31,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.integration.nexmark.model.Auction;
 import org.apache.beam.integration.nexmark.model.Bid;
 import org.apache.beam.integration.nexmark.model.Person;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -294,4 +295,12 @@ public class NexmarkDriver<OptionT extends NexmarkOptions> {
     }
     NexmarkUtils.console("saved javascript to file %s.", javascriptFilename);
   }
+
+  public static void main(String[] args) {
+    NexmarkOptions options = PipelineOptionsFactory.fromArgs(args)
+      .withValidation()
+      .as(NexmarkOptions.class);
+    NexmarkRunner runner = new NexmarkRunner(options);
+    new NexmarkDriver().runAll(options, runner);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index 87314ce..ebfd196 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -33,7 +33,6 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
-import org.apache.beam.integration.nexmark.io.PubsubHelper;
 import org.apache.beam.integration.nexmark.model.Auction;
 import org.apache.beam.integration.nexmark.model.Bid;
 import org.apache.beam.integration.nexmark.model.Event;
@@ -86,7 +85,7 @@ import org.joda.time.Duration;
 /**
  * Run a single Nexmark query using a given configuration.
  */
-public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
+public class NexmarkRunner<OptionT extends NexmarkOptions> {
   /**
    * Minimum number of samples needed for 'stead-state' rate calculation.
    */
@@ -125,12 +124,6 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
   protected NexmarkConfiguration configuration;
 
   /**
-   * Accumulate the pub/sub subscriptions etc which should be cleaned up on end of run.
-   */
-  @Nullable
-  protected PubsubHelper pubsub;
-
-  /**
    * If in --pubsubMode=COMBINED, the event monitor for the publisher pipeline. Otherwise null.
    */
   @Nullable
@@ -158,16 +151,6 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
     this.options = options;
   }
 
-  /**
-   * Return a Pubsub helper.
-   */
-  private PubsubHelper getPubsub() {
-    if (pubsub == null) {
-      pubsub = PubsubHelper.create(options);
-    }
-    return pubsub;
-  }
-
   // ================================================================================
   // Overridden by each runner.
   // ================================================================================
@@ -175,17 +158,23 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
   /**
    * Is this query running in streaming mode?
    */
-  protected abstract boolean isStreaming();
+  protected boolean isStreaming() {
+    return options.isStreaming();
+  }
 
   /**
    * Return number of cores per worker.
    */
-  protected abstract int coresPerWorker();
+  protected int coresPerWorker() {
+    return 4;
+  }
 
   /**
    * Return maximum number of workers.
    */
-  protected abstract int maxNumWorkers();
+  protected int maxNumWorkers() {
+    return 5;
+  }
 
   /**
    * Return the current value for a long counter, or a default value if can't be retrieved.
@@ -544,13 +533,20 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
   /**
    * Invoke the builder with options suitable for running a publish-only child pipeline.
    */
-  protected abstract void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder);
+  protected void invokeBuilderForPublishOnlyPipeline(
+      PipelineBuilder builder) {
+    builder.build(options);
+//    throw new UnsupportedOperationException(
+//        "Cannot use --pubSubMode=COMBINED with DirectRunner");
+  }
 
   /**
    * If monitoring, wait until the publisher pipeline has run long enough to establish
    * a backlog on the Pubsub topic. Otherwise, return immediately.
    */
-  protected abstract void waitForPublisherPreload();
+  protected void waitForPublisherPreload() {
+    throw new UnsupportedOperationException();
+  }
 
   /**
    * Monitor the performance and progress of a running job. Return final performance if
@@ -841,24 +837,14 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
    * Return source of events from Pubsub.
    */
   private PCollection<Event> sourceEventsFromPubsub(Pipeline p, long now) {
-    String shortTopic = shortTopic(now);
     String shortSubscription = shortSubscription(now);
-
-    // Create/confirm the subscription.
-    String subscription = null;
-    if (!options.getManageResources()) {
-      // The subscription should already have been created by the user.
-      subscription = getPubsub().reuseSubscription(shortTopic, shortSubscription).getPath();
-    } else {
-      subscription = getPubsub().createSubscription(shortTopic, shortSubscription).getPath();
-    }
-    NexmarkUtils.console("Reading events from Pubsub %s", subscription);
+    NexmarkUtils.console("Reading events from Pubsub %s", shortSubscription);
     PubsubIO.Read<Event> io =
-        PubsubIO.<Event>read().subscription(subscription)
-            .idLabel(NexmarkUtils.PUBSUB_ID)
+        PubsubIO.<Event>read().fromSubscription(shortSubscription)
+            .withIdAttribute(NexmarkUtils.PUBSUB_ID)
             .withCoder(Event.CODER);
     if (!configuration.usePubsubPublishTime) {
-      io = io.timestampLabel(NexmarkUtils.PUBSUB_TIMESTAMP);
+      io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
     }
     return p.apply(queryName + ".ReadPubsubEvents", io);
   }
@@ -884,26 +870,13 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
    */
   private void sinkEventsToPubsub(PCollection<Event> events, long now) {
     String shortTopic = shortTopic(now);
-
-    // Create/confirm the topic.
-    String topic;
-    if (!options.getManageResources()
-        || configuration.pubSubMode == NexmarkUtils.PubSubMode.SUBSCRIBE_ONLY) {
-      // The topic should already have been created by the user or
-      // a companion 'PUBLISH_ONLY' process.
-      topic = getPubsub().reuseTopic(shortTopic).getPath();
-    } else {
-      // Create a fresh topic to loopback via. It will be destroyed when the
-      // (necessarily blocking) job is done.
-      topic = getPubsub().createTopic(shortTopic).getPath();
-    }
-    NexmarkUtils.console("Writing events to Pubsub %s", topic);
+    NexmarkUtils.console("Writing events to Pubsub %s", shortTopic);
     PubsubIO.Write<Event> io =
-        PubsubIO.<Event>write().topic(topic)
-                      .idLabel(NexmarkUtils.PUBSUB_ID)
+        PubsubIO.<Event>write().to(shortTopic)
+                      .withIdAttribute(NexmarkUtils.PUBSUB_ID)
                       .withCoder(Event.CODER);
     if (!configuration.usePubsubPublishTime) {
-      io = io.timestampLabel(NexmarkUtils.PUBSUB_TIMESTAMP);
+      io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
     }
     events.apply(queryName + ".WritePubsubEvents", io);
   }
@@ -913,18 +886,12 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
    */
   private void sinkResultsToPubsub(PCollection<String> formattedResults, long now) {
     String shortTopic = shortTopic(now);
-    String topic;
-    if (!options.getManageResources()) {
-      topic = getPubsub().reuseTopic(shortTopic).getPath();
-    } else {
-      topic = getPubsub().createTopic(shortTopic).getPath();
-    }
-    NexmarkUtils.console("Writing results to Pubsub %s", topic);
+    NexmarkUtils.console("Writing results to Pubsub %s", shortTopic);
     PubsubIO.Write<String> io =
-        PubsubIO.<String>write().topic(topic)
-            .idLabel(NexmarkUtils.PUBSUB_ID);
+        PubsubIO.<String>write().to(shortTopic)
+            .withIdAttribute(NexmarkUtils.PUBSUB_ID);
     if (!configuration.usePubsubPublishTime) {
-      io = io.timestampLabel(NexmarkUtils.PUBSUB_TIMESTAMP);
+      io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
     }
     formattedResults.apply(queryName + ".WritePubsubResults", io);
   }
@@ -1168,7 +1135,6 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
     // Setup per-run state.
     //
     checkState(configuration == null);
-    checkState(pubsub == null);
     checkState(queryName == null);
     configuration = runConfiguration;
 
@@ -1282,19 +1248,9 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
       mainResult.waitUntilFinish(Duration.standardSeconds(configuration.streamTimeout));
       return monitor(query);
     } finally {
-      //
-      // Cleanup per-run state.
-      //
-      if (pubsub != null) {
-        // Delete any subscriptions and topics we created.
-        pubsub.close();
-        pubsub = null;
-      }
       configuration = null;
       queryName = null;
       // TODO: Cleanup pathsToDelete
     }
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexDriver.java
deleted file mode 100644
index 265ccf7..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexDriver.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.drivers;
-
-import org.apache.beam.integration.nexmark.NexmarkDriver;
-import org.apache.beam.integration.nexmark.NexmarkOptions;
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunner;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
-/**
- * Run NexMark queries using the Apex runner.
- */
-public class NexmarkApexDriver extends NexmarkDriver<NexmarkApexDriver.NexmarkApexOptions> {
-  /**
-   * Command line flags.
-   */
-  public interface NexmarkApexOptions extends NexmarkOptions, ApexPipelineOptions {
-  }
-
-  /**
-   * Entry point.
-   */
-  public static void main(String[] args) {
-    // Gather command line args, baseline, configurations, etc.
-    NexmarkApexOptions options = PipelineOptionsFactory.fromArgs(args)
-                                                        .withValidation()
-                                                        .as(NexmarkApexOptions.class);
-    options.setRunner(ApexRunner.class);
-    NexmarkApexRunner runner = new NexmarkApexRunner(options);
-    new NexmarkApexDriver().runAll(options, runner);
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexRunner.java
deleted file mode 100644
index 2bcf82d..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexRunner.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.drivers;
-
-import javax.annotation.Nullable;
-
-import org.apache.beam.integration.nexmark.NexmarkPerf;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
-import org.apache.beam.integration.nexmark.NexmarkRunner;
-
-/**
- * Run a query using the Apex runner.
- */
-public class NexmarkApexRunner extends NexmarkRunner<NexmarkApexDriver.NexmarkApexOptions> {
-  @Override
-  protected boolean isStreaming() {
-    return options.isStreaming();
-  }
-
-  @Override
-  protected int coresPerWorker() {
-    return 4;
-  }
-
-  @Override
-  protected int maxNumWorkers() {
-    return 5;
-  }
-
-  @Override
-  protected void invokeBuilderForPublishOnlyPipeline(
-      PipelineBuilder builder) {
-    builder.build(options);
-  }
-
-  @Override
-  protected void waitForPublisherPreload() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  @Nullable
-  protected NexmarkPerf monitor(NexmarkQuery query) {
-    return null;
-  }
-
-  public NexmarkApexRunner(NexmarkApexDriver.NexmarkApexOptions options) {
-    super(options);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectDriver.java
deleted file mode 100644
index 2b825f3..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectDriver.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.drivers;
-
-import org.apache.beam.integration.nexmark.NexmarkDriver;
-import org.apache.beam.integration.nexmark.NexmarkOptions;
-import org.apache.beam.runners.direct.DirectOptions;
-import org.apache.beam.runners.direct.DirectRunner;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
-/**
- * An implementation of the 'NEXMark queries' using the Direct Runner.
- */
-class NexmarkDirectDriver extends NexmarkDriver<NexmarkDirectDriver.NexmarkDirectOptions> {
-  /**
-   * Command line flags.
-   */
-  public interface NexmarkDirectOptions extends NexmarkOptions, DirectOptions {
-  }
-
-  /**
-   * Entry point.
-   */
-  public static void main(String[] args) {
-    NexmarkDirectOptions options =
-        PipelineOptionsFactory.fromArgs(args)
-                              .withValidation()
-                              .as(NexmarkDirectOptions.class);
-    options.setRunner(DirectRunner.class);
-    NexmarkDirectRunner runner = new NexmarkDirectRunner(options);
-    new NexmarkDirectDriver().runAll(options, runner);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectRunner.java
deleted file mode 100644
index 1391040..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectRunner.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.drivers;
-
-import org.apache.beam.integration.nexmark.NexmarkRunner;
-
-/**
- * Run a single query using the Direct Runner.
- */
-class NexmarkDirectRunner extends NexmarkRunner<NexmarkDirectDriver.NexmarkDirectOptions> {
-  public NexmarkDirectRunner(NexmarkDirectDriver.NexmarkDirectOptions options) {
-    super(options);
-  }
-
-  @Override
-  protected boolean isStreaming() {
-    return options.isStreaming();
-  }
-
-  @Override
-  protected int coresPerWorker() {
-    return 4;
-  }
-
-  @Override
-  protected int maxNumWorkers() {
-    return 1;
-  }
-
-  @Override
-  protected void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) {
-    throw new UnsupportedOperationException(
-        "Cannot use --pubSubMode=COMBINED with DirectRunner");
-  }
-
-  /**
-   * Monitor the progress of the publisher job. Return when it has been generating events for
-   * at least {@code configuration.preloadSeconds}.
-   */
-  @Override
-  protected void waitForPublisherPreload() {
-    throw new UnsupportedOperationException(
-        "Cannot use --pubSubMode=COMBINED with DirectRunner");
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkDriver.java
deleted file mode 100644
index bf0b115..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkDriver.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.drivers;
-
-import org.apache.beam.integration.nexmark.NexmarkDriver;
-import org.apache.beam.integration.nexmark.NexmarkOptions;
-import org.apache.beam.runners.flink.FlinkPipelineOptions;
-import org.apache.beam.runners.flink.FlinkRunner;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
-/**
- * Run NexMark queries using the Flink runner.
- */
-public class NexmarkFlinkDriver extends NexmarkDriver<NexmarkFlinkDriver.NexmarkFlinkOptions> {
-  /**
-   * Command line flags.
-   */
-  public interface NexmarkFlinkOptions extends NexmarkOptions, FlinkPipelineOptions {
-  }
-
-  /**
-   * Entry point.
-   */
-  public static void main(String[] args) {
-    // Gather command line args, baseline, configurations, etc.
-    NexmarkFlinkOptions options = PipelineOptionsFactory.fromArgs(args)
-                                                        .withValidation()
-                                                        .as(NexmarkFlinkOptions.class);
-    options.setRunner(FlinkRunner.class);
-    NexmarkFlinkRunner runner = new NexmarkFlinkRunner(options);
-    new NexmarkFlinkDriver().runAll(options, runner);
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkRunner.java
deleted file mode 100644
index 9d547ef..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkRunner.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.drivers;
-
-import org.apache.beam.integration.nexmark.NexmarkRunner;
-
-/**
- * Run a query using the Flink runner.
- */
-public class NexmarkFlinkRunner extends NexmarkRunner<NexmarkFlinkDriver.NexmarkFlinkOptions> {
-  @Override
-  protected boolean isStreaming() {
-    return options.isStreaming();
-  }
-
-  @Override
-  protected int coresPerWorker() {
-    return 4;
-  }
-
-  @Override
-  protected int maxNumWorkers() {
-    return 5;
-  }
-
-  @Override
-  protected void invokeBuilderForPublishOnlyPipeline(
-      PipelineBuilder builder) {
-    builder.build(options);
-  }
-
-  @Override
-  protected void waitForPublisherPreload() {
-    throw new UnsupportedOperationException();
-  }
-
-  public NexmarkFlinkRunner(NexmarkFlinkDriver.NexmarkFlinkOptions options) {
-    super(options);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleDriver.java
deleted file mode 100644
index f5a9751..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleDriver.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.drivers;
-
-import org.apache.beam.integration.nexmark.NexmarkDriver;
-import org.apache.beam.integration.nexmark.NexmarkOptions;
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Person;
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
-/**
- * An implementation of the 'NEXMark queries' for Google Dataflow.
- * These are multiple queries over a three table schema representing an online auction system:
- * <ul>
- * <li>{@link Person} represents a person submitting an item for auction and/or making a bid
- * on an auction.
- * <li>{@link Auction} represents an item under auction.
- * <li>{@link Bid} represents a bid for an item under auction.
- * </ul>
- * The queries exercise many aspects of streaming dataflow.
- *
- * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not
- * particularly sensible.
- *
- * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/">
- * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a>
- */
-class NexmarkGoogleDriver extends NexmarkDriver<NexmarkGoogleDriver.NexmarkGoogleOptions> {
-  /**
-   * Command line flags.
-   */
-  public interface NexmarkGoogleOptions extends NexmarkOptions, DataflowPipelineOptions {
-
-  }
-
-  /**
-   * Entry point.
-   */
-  public static void main(String[] args) {
-    // Gather command line args, baseline, configurations, etc.
-    NexmarkGoogleOptions options = PipelineOptionsFactory.fromArgs(args)
-                                                         .withValidation()
-                                                         .as(NexmarkGoogleOptions.class);
-    options.setRunner(DataflowRunner.class);
-    NexmarkGoogleRunner runner = new NexmarkGoogleRunner(options);
-    new NexmarkGoogleDriver().runAll(options, runner);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java
deleted file mode 100644
index 935bf0d..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.drivers;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.integration.nexmark.Monitor;
-import org.apache.beam.integration.nexmark.NexmarkRunner;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.sdk.PipelineResult;
-import org.joda.time.Duration;
-
-/**
- * Run a singe Nexmark query using a given configuration on Google Dataflow.
- */
-class NexmarkGoogleRunner extends NexmarkRunner<NexmarkGoogleDriver.NexmarkGoogleOptions> {
-
-  public NexmarkGoogleRunner(NexmarkGoogleDriver.NexmarkGoogleOptions options) {
-    super(options);
-  }
-
-  @Override
-  protected boolean isStreaming() {
-    return options.isStreaming();
-  }
-
-  @Override
-  protected int coresPerWorker() {
-    String machineType = options.getWorkerMachineType();
-    if (machineType == null || machineType.isEmpty()) {
-      return 1;
-    }
-    String[] split = machineType.split("-");
-    if (split.length != 3) {
-      return 1;
-    }
-    try {
-      return Integer.parseInt(split[2]);
-    } catch (NumberFormatException ex) {
-      return 1;
-    }
-  }
-
-  @Override
-  protected int maxNumWorkers() {
-    return Math.max(options.getNumWorkers(), options.getMaxNumWorkers());
-  }
-
-  @Override
-  protected String getJobId(PipelineResult job) {
-    return ((DataflowPipelineJob) job).getJobId();
-  }
-
-  @Override
-  protected void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) {
-    String jobName = options.getJobName();
-    String appName = options.getAppName();
-    options.setJobName("p-" + jobName);
-    options.setAppName("p-" + appName);
-    int coresPerWorker = coresPerWorker();
-    int eventGeneratorWorkers = (configuration.numEventGenerators + coresPerWorker - 1)
-                                / coresPerWorker;
-    options.setMaxNumWorkers(Math.min(options.getMaxNumWorkers(), eventGeneratorWorkers));
-    options.setNumWorkers(Math.min(options.getNumWorkers(), eventGeneratorWorkers));
-    publisherMonitor = new Monitor<Event>(queryName, "publisher");
-    try {
-      builder.build(options);
-    } finally {
-      options.setJobName(jobName);
-      options.setAppName(appName);
-      options.setMaxNumWorkers(options.getMaxNumWorkers());
-      options.setNumWorkers(options.getNumWorkers());
-    }
-  }
-
-  /**
-   * Monitor the progress of the publisher job. Return when it has been generating events for
-   * at least {@code configuration.preloadSeconds}.
-   */
-  @Override
-  protected void waitForPublisherPreload() {
-    checkNotNull(publisherMonitor);
-    checkNotNull(publisherResult);
-    if (!options.getMonitorJobs()) {
-      return;
-    }
-    if (!(publisherResult instanceof DataflowPipelineJob)) {
-      return;
-    }
-    if (configuration.preloadSeconds <= 0) {
-      return;
-    }
-
-    NexmarkUtils.console("waiting for publisher to pre-load");
-
-    DataflowPipelineJob job = (DataflowPipelineJob) publisherResult;
-
-    long numEvents = 0;
-    long startMsSinceEpoch = -1;
-    long endMsSinceEpoch = -1;
-    while (true) {
-      PipelineResult.State state = job.getState();
-      switch (state) {
-        case UNKNOWN:
-          // Keep waiting.
-          NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
-          break;
-        case STOPPED:
-        case DONE:
-        case CANCELLED:
-        case FAILED:
-        case UPDATED:
-          NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
-          return;
-        case RUNNING:
-          //TODO Ismael Validate that this counter is ok
-          numEvents =
-            getCounterMetric(job, publisherMonitor.name, publisherMonitor.prefix + ".elements", -1);
-          if (startMsSinceEpoch < 0 && numEvents > 0) {
-            startMsSinceEpoch = System.currentTimeMillis();
-            endMsSinceEpoch = startMsSinceEpoch
-                              + Duration.standardSeconds(configuration.preloadSeconds).getMillis();
-          }
-          if (endMsSinceEpoch < 0) {
-            NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
-          } else {
-            long remainMs = endMsSinceEpoch - System.currentTimeMillis();
-            if (remainMs > 0) {
-              NexmarkUtils.console("%s publisher (%d events, waiting for %ds)", state, numEvents,
-                  remainMs / 1000);
-            } else {
-              NexmarkUtils.console("publisher preloaded %d events", numEvents);
-              return;
-            }
-          }
-          break;
-      }
-
-      try {
-        Thread.sleep(PERF_DELAY.getMillis());
-      } catch (InterruptedException e) {
-        Thread.interrupted();
-        throw new RuntimeException("Interrupted: publisher still running.");
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkDriver.java
deleted file mode 100644
index c7c32c2..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkDriver.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.drivers;
-
-import org.apache.beam.integration.nexmark.NexmarkDriver;
-import org.apache.beam.integration.nexmark.NexmarkOptions;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
-/**
- * Run NexMark queries using the Spark runner.
- */
-class NexmarkSparkDriver extends NexmarkDriver<NexmarkSparkDriver.NexmarkSparkOptions> {
-    /**
-     * Command line flags.
-     */
-    public interface NexmarkSparkOptions extends NexmarkOptions, SparkPipelineOptions {
-    }
-
-    /**
-     * Entry point.
-     */
-    public static void main(String[] args) {
-        NexmarkSparkOptions options =
-                PipelineOptionsFactory.fromArgs(args)
-                        .withValidation()
-                        .as(NexmarkSparkOptions.class);
-        options.setRunner(SparkRunner.class);
-        NexmarkSparkRunner runner = new NexmarkSparkRunner(options);
-        new NexmarkSparkDriver().runAll(options, runner);
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkRunner.java
deleted file mode 100644
index 1d49a3a..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkRunner.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.drivers;
-
-import org.apache.beam.integration.nexmark.NexmarkRunner;
-
-/**
- * Run a query using the Spark runner.
- */
-public class NexmarkSparkRunner extends NexmarkRunner<NexmarkSparkDriver.NexmarkSparkOptions> {
-    @Override
-    protected boolean isStreaming() {
-        return options.isStreaming();
-    }
-
-    @Override
-    protected int coresPerWorker() {
-        return 4;
-    }
-
-    @Override
-    protected int maxNumWorkers() {
-        return 5;
-    }
-
-    @Override
-    protected void invokeBuilderForPublishOnlyPipeline(
-            PipelineBuilder builder) {
-        builder.build(options);
-    }
-
-    @Override
-    protected void waitForPublisherPreload() {
-        throw new UnsupportedOperationException();
-    }
-
-
-    public NexmarkSparkRunner(NexmarkSparkDriver.NexmarkSparkOptions options) {
-        super(options);
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/package-info.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/package-info.java
deleted file mode 100644
index c8aa144..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Nexmark Benchmark Execution Drivers.
- */
-package org.apache.beam.integration.nexmark.drivers;

http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java
deleted file mode 100644
index 931fe6e..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java
+++ /dev/null
@@ -1,543 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.integration.nexmark.io;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.api.client.util.DateTime;
-import com.google.common.base.Objects;
-import com.google.common.base.Strings;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
-
-/**
- * An (abstract) helper class for talking to Pubsub via an underlying transport.
- */
-abstract class PubsubClient implements Closeable {
-  /**
-   * Factory for creating clients.
-   */
-  public interface PubsubClientFactory extends Serializable {
-    /**
-     * Construct a new Pubsub client. It should be closed via {@link #close} in order
-     * to ensure tidy cleanup of underlying netty resources (or use the try-with-resources
-     * construct). Uses {@code options} to derive pubsub endpoints and application credentials.
-     * If non-{@literal null}, use {@code timestampLabel} and {@code idLabel} to store custom
-     * timestamps/ids within message metadata.
-     */
-    PubsubClient newClient(@Nullable String timestampLabel,
-        @Nullable String idLabel, PubsubOptions options) throws IOException;
-
-    /**
-     * Return the display name for this factory. Eg "Json", "gRPC".
-     */
-    String getKind();
-  }
-
-  /**
-   * Return timestamp as ms-since-unix-epoch corresponding to {@code timestamp}.
-   * Return {@literal null} if no timestamp could be found. Throw {@link IllegalArgumentException}
-   * if timestamp cannot be recognized.
-   */
-  @Nullable
-  private static Long asMsSinceEpoch(@Nullable String timestamp) {
-    if (Strings.isNullOrEmpty(timestamp)) {
-      return null;
-    }
-    try {
-      // Try parsing as milliseconds since epoch. Note there is no way to parse a
-      // string in RFC 3339 format here.
-      // Expected IllegalArgumentException if parsing fails; we use that to fall back
-      // to RFC 3339.
-      return Long.parseLong(timestamp);
-    } catch (IllegalArgumentException e1) {
-      // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an
-      // IllegalArgumentException if parsing fails, and the caller should handle.
-      return DateTime.parseRfc3339(timestamp).getValue();
-    }
-  }
-
-  /**
-   * Return the timestamp (in ms since unix epoch) to use for a Pubsub message with {@code
-   * attributes} and {@code pubsubTimestamp}.
-   *
-   * <p>If {@code timestampLabel} is non-{@literal null} then the message attributes must contain
-   * that label, and the value of that label will be taken as the timestamp.
-   * Otherwise the timestamp will be taken from the Pubsub publish timestamp {@code
-   * pubsubTimestamp}.
-   *
-   * @throws IllegalArgumentException if the timestamp cannot be recognized as a ms-since-unix-epoch
-   * or RFC3339 time.
-   */
-  protected static long extractTimestamp(
-      @Nullable String timestampLabel,
-      @Nullable String pubsubTimestamp,
-      @Nullable Map<String, String> attributes) {
-    Long timestampMsSinceEpoch;
-    if (Strings.isNullOrEmpty(timestampLabel)) {
-      timestampMsSinceEpoch = asMsSinceEpoch(pubsubTimestamp);
-      checkArgument(timestampMsSinceEpoch != null,
-                    "Cannot interpret PubSub publish timestamp: %s",
-                    pubsubTimestamp);
-    } else {
-      String value = attributes == null ? null : attributes.get(timestampLabel);
-      checkArgument(value != null,
-                    "PubSub message is missing a value for timestamp label %s",
-                    timestampLabel);
-      timestampMsSinceEpoch = asMsSinceEpoch(value);
-      checkArgument(timestampMsSinceEpoch != null,
-                    "Cannot interpret value of label %s as timestamp: %s",
-                    timestampLabel, value);
-    }
-    return timestampMsSinceEpoch;
-  }
-
-  /**
-   * Path representing a cloud project id.
-   */
-  static class ProjectPath implements Serializable {
-    private final String projectId;
-
-    /**
-     * Creates a {@link ProjectPath} from a {@link String} representation, which
-     * must be of the form {@code "projects/" + projectId}.
-     */
-    ProjectPath(String path) {
-      String[] splits = path.split("/");
-      checkArgument(
-          splits.length == 2 && splits[0].equals("projects"),
-          "Malformed project path \"%s\": must be of the form \"projects/\" + <project id>",
-          path);
-      this.projectId = splits[1];
-    }
-
-    public String getPath() {
-      return String.format("projects/%s", projectId);
-    }
-
-    public String getId() {
-      return projectId;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      ProjectPath that = (ProjectPath) o;
-
-      return projectId.equals(that.projectId);
-    }
-
-    @Override
-    public int hashCode() {
-      return projectId.hashCode();
-    }
-
-    @Override
-    public String toString() {
-      return getPath();
-    }
-  }
-
-  public static ProjectPath projectPathFromPath(String path) {
-    return new ProjectPath(path);
-  }
-
-  public static ProjectPath projectPathFromId(String projectId) {
-    return new ProjectPath(String.format("projects/%s", projectId));
-  }
-
-  /**
-   * Path representing a Pubsub subscription.
-   */
-  public static class SubscriptionPath implements Serializable {
-    private final String projectId;
-    private final String subscriptionName;
-
-    SubscriptionPath(String path) {
-      String[] splits = path.split("/");
-      checkState(
-          splits.length == 4 && splits[0].equals("projects") && splits[2].equals("subscriptions"),
-          "Malformed subscription path %s: "
-          + "must be of the form \"projects/\" + <project id> + \"subscriptions\"", path);
-      this.projectId = splits[1];
-      this.subscriptionName = splits[3];
-    }
-
-    public String getPath() {
-      return String.format("projects/%s/subscriptions/%s", projectId, subscriptionName);
-    }
-
-    public String getName() {
-      return subscriptionName;
-    }
-
-    public String getV1Beta1Path() {
-      return String.format("/subscriptions/%s/%s", projectId, subscriptionName);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      SubscriptionPath that = (SubscriptionPath) o;
-      return this.subscriptionName.equals(that.subscriptionName)
-          && this.projectId.equals(that.projectId);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(projectId, subscriptionName);
-    }
-
-    @Override
-    public String toString() {
-      return getPath();
-    }
-  }
-
-  public static SubscriptionPath subscriptionPathFromPath(String path) {
-    return new SubscriptionPath(path);
-  }
-
-  public static SubscriptionPath subscriptionPathFromName(
-      String projectId, String subscriptionName) {
-    return new SubscriptionPath(String.format("projects/%s/subscriptions/%s",
-                                              projectId, subscriptionName));
-  }
-
-  /**
-   * Path representing a Pubsub topic.
-   */
-  public static class TopicPath implements Serializable {
-    private final String path;
-
-    TopicPath(String path) {
-      this.path = path;
-    }
-
-    public String getPath() {
-      return path;
-    }
-
-    public String getName() {
-      String[] splits = path.split("/");
-      checkState(splits.length == 4, "Malformed topic path %s", path);
-      return splits[3];
-    }
-
-    public String getV1Beta1Path() {
-      String[] splits = path.split("/");
-      checkState(splits.length == 4, "Malformed topic path %s", path);
-      return String.format("/topics/%s/%s", splits[1], splits[3]);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      TopicPath topicPath = (TopicPath) o;
-      return path.equals(topicPath.path);
-    }
-
-    @Override
-    public int hashCode() {
-      return path.hashCode();
-    }
-
-    @Override
-    public String toString() {
-      return path;
-    }
-  }
-
-  public static TopicPath topicPathFromPath(String path) {
-    return new TopicPath(path);
-  }
-
-  public static TopicPath topicPathFromName(String projectId, String topicName) {
-    return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName));
-  }
-
-  /**
-   * A message to be sent to Pubsub.
-   *
-   * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}.
-   * Java serialization is never used for non-test clients.
-   */
-  static class OutgoingMessage implements Serializable {
-    /**
-     * Underlying (encoded) element.
-     */
-    public final byte[] elementBytes;
-
-    public final Map<String, String> attributes;
-
-    /**
-     * Timestamp for element (ms since epoch).
-     */
-    public final long timestampMsSinceEpoch;
-
-    /**
-     * If using an id label, the record id to associate with this record's metadata so the receiver
-     * can reject duplicates. Otherwise {@literal null}.
-     */
-    @Nullable
-    public final String recordId;
-
-    public OutgoingMessage(byte[] elementBytes, Map<String, String> attributes,
-                           long timestampMsSinceEpoch, @Nullable String recordId) {
-      this.elementBytes = elementBytes;
-      this.attributes = attributes;
-      this.timestampMsSinceEpoch = timestampMsSinceEpoch;
-      this.recordId = recordId;
-    }
-
-    @Override
-    public String toString() {
-      return String.format("OutgoingMessage(%db, %dms)",
-                           elementBytes.length, timestampMsSinceEpoch);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      OutgoingMessage that = (OutgoingMessage) o;
-
-      return timestampMsSinceEpoch == that.timestampMsSinceEpoch
-              && Arrays.equals(elementBytes, that.elementBytes)
-              && Objects.equal(attributes, that.attributes)
-              && Objects.equal(recordId, that.recordId);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch,
-              recordId);
-    }
-  }
-
-  /**
-   * A message received from Pubsub.
-   *
-   * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}.
-   * Java serialization is never used for non-test clients.
-   */
-  static class IncomingMessage implements Serializable {
-    /**
-     * Underlying (encoded) element.
-     */
-    public final byte[] elementBytes;
-
-    public Map<String, String> attributes;
-
-    /**
-     * Timestamp for element (ms since epoch). Either Pubsub's processing time,
-     * or the custom timestamp associated with the message.
-     */
-    public final long timestampMsSinceEpoch;
-
-    /**
-     * Timestamp (in system time) at which we requested the message (ms since epoch).
-     */
-    public final long requestTimeMsSinceEpoch;
-
-    /**
-     * Id to pass back to Pubsub to acknowledge receipt of this message.
-     */
-    public final String ackId;
-
-    /**
-     * Id to pass to the runner to distinguish this message from all others.
-     */
-    public final String recordId;
-
-    public IncomingMessage(
-        byte[] elementBytes,
-        Map<String, String> attributes,
-        long timestampMsSinceEpoch,
-        long requestTimeMsSinceEpoch,
-        String ackId,
-        String recordId) {
-      this.elementBytes = elementBytes;
-      this.attributes = attributes;
-      this.timestampMsSinceEpoch = timestampMsSinceEpoch;
-      this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
-      this.ackId = ackId;
-      this.recordId = recordId;
-    }
-
-    public IncomingMessage withRequestTime(long requestTimeMsSinceEpoch) {
-      return new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
-              requestTimeMsSinceEpoch, ackId, recordId);
-    }
-
-    @Override
-    public String toString() {
-      return String.format("IncomingMessage(%db, %dms)",
-                           elementBytes.length, timestampMsSinceEpoch);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      IncomingMessage that = (IncomingMessage) o;
-
-      return timestampMsSinceEpoch == that.timestampMsSinceEpoch
-             && requestTimeMsSinceEpoch == that.requestTimeMsSinceEpoch
-             && ackId.equals(that.ackId)
-             && recordId.equals(that.recordId)
-             && Arrays.equals(elementBytes, that.elementBytes)
-              && Objects.equal(attributes, that.attributes);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch,
-                              requestTimeMsSinceEpoch,
-                              ackId, recordId);
-    }
-  }
-
-  /**
-   * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages
-   * published.
-   */
-  public abstract int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
-      throws IOException;
-
-  /**
-   * Request the next batch of up to {@code batchSize} messages from {@code subscription}.
-   * Return the received messages, or empty collection if none were available. Does not
-   * wait for messages to arrive if {@code returnImmediately} is {@literal true}.
-   * Returned messages will record their request time as {@code requestTimeMsSinceEpoch}.
-   */
-  public abstract List<IncomingMessage> pull(
-      long requestTimeMsSinceEpoch,
-      SubscriptionPath subscription,
-      int batchSize,
-      boolean returnImmediately)
-      throws IOException;
-
-  /**
-   * Acknowldege messages from {@code subscription} with {@code ackIds}.
-   */
-  public abstract void acknowledge(SubscriptionPath subscription, List<String> ackIds)
-      throws IOException;
-
-  /**
-   * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to
-   * be {@code deadlineSeconds} from now.
-   */
-  public abstract void modifyAckDeadline(
-      SubscriptionPath subscription, List<String> ackIds,
-      int deadlineSeconds) throws IOException;
-
-  /**
-   * Create {@code topic}.
-   */
-  public abstract void createTopic(TopicPath topic) throws IOException;
-
-  /*
-   * Delete {@code topic}.
-   */
-  public abstract void deleteTopic(TopicPath topic) throws IOException;
-
-  /**
-   * Return a list of topics for {@code project}.
-   */
-  public abstract List<TopicPath> listTopics(ProjectPath project) throws IOException;
-
-  /**
-   * Create {@code subscription} to {@code topic}.
-   */
-  public abstract void createSubscription(
-      TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException;
-
-  /**
-   * Create a random subscription for {@code topic}. Return the {@link SubscriptionPath}. It
-   * is the responsibility of the caller to later delete the subscription.
-   */
-  public SubscriptionPath createRandomSubscription(
-      ProjectPath project, TopicPath topic, int ackDeadlineSeconds) throws IOException {
-    // Create a randomized subscription derived from the topic name.
-    String subscriptionName = topic.getName() + "_beam_" + ThreadLocalRandom.current().nextLong();
-    SubscriptionPath subscription =
-        PubsubClient
-            .subscriptionPathFromName(project.getId(), subscriptionName);
-    createSubscription(topic, subscription, ackDeadlineSeconds);
-    return subscription;
-  }
-
-  /**
-   * Delete {@code subscription}.
-   */
-  public abstract void deleteSubscription(SubscriptionPath subscription) throws IOException;
-
-  /**
-   * Return a list of subscriptions for {@code topic} in {@code project}.
-   */
-  public abstract List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
-      throws IOException;
-
-  /**
-   * Return the ack deadline, in seconds, for {@code subscription}.
-   */
-  public abstract int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException;
-
-  /**
-   * Return {@literal true} if {@link #pull} will always return empty list. Actual clients
-   * will return {@literal false}. Test clients may return {@literal true} to signal that all
-   * expected messages have been pulled and the test may complete.
-   */
-  public abstract boolean isEOF();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
deleted file mode 100644
index bcc5b1c..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.io;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
-
-/**
- * Helper for working with pubsub.
- */
-public class PubsubHelper implements AutoCloseable {
-  /**
-   * Underlying pub/sub client.
-   */
-  private final PubsubClient pubsubClient;
-
-  /**
-   * Project id.
-   */
-  private final String projectId;
-
-  /**
-   * Topics we should delete on close.
-   */
-  private final List<PubsubClient.TopicPath> createdTopics;
-
-  /**
-   * Subscriptions we should delete on close.
-   */
-  private final List<PubsubClient.SubscriptionPath> createdSubscriptions;
-
-  private PubsubHelper(PubsubClient pubsubClient, String projectId) {
-    this.pubsubClient = pubsubClient;
-    this.projectId = projectId;
-    createdTopics = new ArrayList<>();
-    createdSubscriptions = new ArrayList<>();
-  }
-
-  /**
-   * Create a helper.
-   */
-  public static PubsubHelper create(PubsubOptions options) {
-    try {
-      return new PubsubHelper(
-          PubsubJsonClient.FACTORY.newClient(null, null, options),
-          options.getProject());
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to create Pubsub client: ", e);
-    }
-  }
-
-  /**
-   * Create a topic from short name. Delete it if it already exists. Ensure the topic will be
-   * deleted on cleanup. Return full topic name.
-   */
-  public PubsubClient.TopicPath createTopic(String shortTopic) {
-    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
-    try {
-      if (topicExists(shortTopic)) {
-        NexmarkUtils.console("attempting to cleanup topic %s", topic);
-        pubsubClient.deleteTopic(topic);
-      }
-      NexmarkUtils.console("create topic %s", topic);
-      pubsubClient.createTopic(topic);
-      createdTopics.add(topic);
-      return topic;
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to create Pubsub topic " + topic + ": ", e);
-    }
-  }
-
-  /**
-   * Create a topic from short name if it does not already exist. The topic will not be
-   * deleted on cleanup. Return full topic name.
-   */
-  public PubsubClient.TopicPath createOrReuseTopic(String shortTopic) {
-    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
-    try {
-      if (topicExists(shortTopic)) {
-        NexmarkUtils.console("topic %s already exists", topic);
-        return topic;
-      }
-      NexmarkUtils.console("create topic %s", topic);
-      pubsubClient.createTopic(topic);
-      return topic;
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to create or reuse Pubsub topic " + topic + ": ", e);
-    }
-  }
-
-  /**
-   * Check a topic corresponding to short name exists, and throw exception if not. The
-   * topic will not be deleted on cleanup. Return full topic name.
-   */
-  public PubsubClient.TopicPath reuseTopic(String shortTopic) {
-    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
-    if (topicExists(shortTopic)) {
-      NexmarkUtils.console("reusing existing topic %s", topic);
-      return topic;
-    }
-    throw new RuntimeException("topic '" + topic + "' does not already exist");
-  }
-
-  /**
-   * Does topic corresponding to short name exist?
-   */
-  public boolean topicExists(String shortTopic) {
-    PubsubClient.ProjectPath project = PubsubClient.projectPathFromId(projectId);
-    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
-    try {
-      Collection<PubsubClient.TopicPath> existingTopics = pubsubClient.listTopics(project);
-      return existingTopics.contains(topic);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to check Pubsub topic " + topic + ": ", e);
-    }
-  }
-
-  /**
-   * Create subscription from short name. Delete subscription if it already exists. Ensure the
-   * subscription will be deleted on cleanup. Return full subscription name.
-   */
-  public PubsubClient.SubscriptionPath createSubscription(
-      String shortTopic, String shortSubscription) {
-    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
-    PubsubClient.SubscriptionPath subscription =
-        PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
-    try {
-      if (subscriptionExists(shortTopic, shortSubscription)) {
-        NexmarkUtils.console("attempting to cleanup subscription %s", subscription);
-        pubsubClient.deleteSubscription(subscription);
-      }
-      NexmarkUtils.console("create subscription %s", subscription);
-      pubsubClient.createSubscription(topic, subscription, 60);
-      createdSubscriptions.add(subscription);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to create Pubsub subscription " + subscription + ": ", e);
-    }
-    return subscription;
-  }
-
-  /**
-   * Check a subscription corresponding to short name exists, and throw exception if not. The
-   * subscription will not be deleted on cleanup. Return full topic name.
-   */
-  public PubsubClient.SubscriptionPath reuseSubscription(
-      String shortTopic, String shortSubscription) {
-    PubsubClient.SubscriptionPath subscription =
-        PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
-    if (subscriptionExists(shortTopic, shortSubscription)) {
-      NexmarkUtils.console("reusing existing subscription %s", subscription);
-      return subscription;
-    }
-    throw new RuntimeException("subscription'" + subscription + "' does not already exist");
-  }
-
-  /**
-   * Does subscription corresponding to short name exist?
-   */
-  public boolean subscriptionExists(String shortTopic, String shortSubscription) {
-    PubsubClient.ProjectPath project = PubsubClient.projectPathFromId(projectId);
-    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
-    PubsubClient.SubscriptionPath subscription =
-        PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
-    try {
-      Collection<PubsubClient.SubscriptionPath> existingSubscriptions =
-          pubsubClient.listSubscriptions(project, topic);
-      return existingSubscriptions.contains(subscription);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to check Pubsub subscription" + subscription + ": ", e);
-    }
-  }
-
-  /**
-   * Delete all the subscriptions and topics we created.
-   */
-  @Override
-  public void close() {
-    for (PubsubClient.SubscriptionPath subscription : createdSubscriptions) {
-      try {
-        NexmarkUtils.console("delete subscription %s", subscription);
-        pubsubClient.deleteSubscription(subscription);
-      } catch (IOException ex) {
-        NexmarkUtils.console("could not delete subscription %s", subscription);
-      }
-    }
-    for (PubsubClient.TopicPath topic : createdTopics) {
-      try {
-        NexmarkUtils.console("delete topic %s", topic);
-        pubsubClient.deleteTopic(topic);
-      } catch (IOException ex) {
-        NexmarkUtils.console("could not delete topic %s", topic);
-      }
-    }
-  }
-}