You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/05/09 23:33:14 UTC

[beam] branch master updated: Nexmark pubsub benchmarks. (#5247)

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c4d32c0  Nexmark pubsub benchmarks. (#5247)
c4d32c0 is described below

commit c4d32c0cbecbf517c3bdfe527bf44e3354560f25
Author: Raghu Angadi <ra...@apache.org>
AuthorDate: Wed May 9 16:33:10 2018 -0700

    Nexmark pubsub benchmarks. (#5247)
    
    * Nexmark pubsub benchmarks.
---
 .../beam/sdk/io/gcp/pubsub/PubsubClient.java       |   4 +-
 .../beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java   |   2 +-
 .../beam/sdk/io/gcp/pubsub/PubsubJsonClient.java   |   2 +-
 .../java/org/apache/beam/sdk/nexmark/Main.java     |   4 +-
 .../apache/beam/sdk/nexmark/NexmarkLauncher.java   | 149 +++++++++-----
 .../apache/beam/sdk/nexmark/NexmarkOptions.java    |  11 +-
 .../org/apache/beam/sdk/nexmark/NexmarkSuite.java  |  31 ++-
 .../org/apache/beam/sdk/nexmark/PubsubHelper.java  | 229 +++++++++++++++++++++
 8 files changed, 375 insertions(+), 57 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
index f5a3ccb..82aae48 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
@@ -36,7 +36,7 @@ import javax.annotation.Nullable;
 /**
  * An (abstract) helper class for talking to Pubsub via an underlying transport.
  */
-abstract class PubsubClient implements Closeable {
+public abstract class PubsubClient implements Closeable {
   /**
    * Factory for creating clients.
    */
@@ -119,7 +119,7 @@ abstract class PubsubClient implements Closeable {
   /**
    * Path representing a cloud project id.
    */
-  static class ProjectPath implements Serializable {
+  public static class ProjectPath implements Serializable {
     private final String projectId;
 
     /**
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
index 9778edf..2eb4a81 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
@@ -69,7 +69,7 @@ import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
  * <p>CAUTION: Currently uses the application default credentials and does not respect any
  * credentials-related arguments in {@link GcpOptions}.
  */
-class PubsubGrpcClient extends PubsubClient {
+public class PubsubGrpcClient extends PubsubClient {
   private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com";
   private static final int PUBSUB_PORT = 443;
   private static final int LIST_BATCH_SIZE = 1000;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
index 1c786d9..1981787 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -53,7 +53,7 @@ import org.apache.beam.sdk.util.Transport;
 /**
  * A Pubsub client using JSON transport.
  */
-class PubsubJsonClient extends PubsubClient {
+public class PubsubJsonClient extends PubsubClient {
 
   private static class PubsubJsonClientFactory implements PubsubClientFactory {
     private static HttpRequestInitializer chainHttpRequestInitializer(
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
index ab2284c..1cada92 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
@@ -56,7 +56,7 @@ public class Main<OptionT extends NexmarkOptions> {
   /**
    * Entry point.
    */
-  void runAll(OptionT options, NexmarkLauncher nexmarkLauncher) {
+  void runAll(OptionT options, NexmarkLauncher nexmarkLauncher) throws IOException {
     Instant start = Instant.now();
     Map<NexmarkConfiguration, NexmarkPerf> baseline = loadBaseline(options.getBaselineFilename());
     Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>();
@@ -293,7 +293,7 @@ public class Main<OptionT extends NexmarkOptions> {
     NexmarkUtils.console("saved javascript to file %s.", javascriptFilename);
   }
 
-  public static void main(String[] args) {
+  public static void main(String[] args) throws IOException {
     NexmarkOptions options = PipelineOptionsFactory.fromArgs(args)
       .withValidation()
       .as(NexmarkOptions.class);
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index 8f02136..d277389 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -30,14 +30,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
@@ -49,6 +47,8 @@ import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.nexmark.NexmarkUtils.PubSubMode;
+import org.apache.beam.sdk.nexmark.NexmarkUtils.SourceType;
 import org.apache.beam.sdk.nexmark.model.Auction;
 import org.apache.beam.sdk.nexmark.model.Bid;
 import org.apache.beam.sdk.nexmark.model.Event;
@@ -176,6 +176,21 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
   @Nullable
   private String queryName;
 
+  /**
+   * Full path of the PubSub topic (when PubSub is enabled).
+   */
+  @Nullable
+  private String pubsubTopic;
+
+  /**
+   * Full path of the PubSub subscription (when PubSub is enabled).
+   */
+  @Nullable
+  private String pubsubSubscription;
+
+  @Nullable
+  private PubsubHelper pubsubHelper;
+
   public NexmarkLauncher(OptionT options) {
     this.options = options;
   }
@@ -457,7 +472,29 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
    * Invoke the builder with options suitable for running a publish-only child pipeline.
    */
   private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder<NexmarkOptions> builder) {
-    builder.build(options);
+    String jobName = options.getJobName();
+    String appName = options.getAppName();
+    int numWorkers = options.getNumWorkers();
+    int maxNumWorkers = options.getMaxNumWorkers();
+
+    options.setJobName("p-" + jobName);
+    options.setAppName("p-" + appName);
+    int eventGeneratorWorkers = configuration.numEventGenerators;
+    // TODO: assign one generator per core rather than one per worker.
+    if (numWorkers > 0 && eventGeneratorWorkers > 0) {
+      options.setNumWorkers(Math.min(numWorkers, eventGeneratorWorkers));
+    }
+    if (maxNumWorkers > 0 && eventGeneratorWorkers > 0) {
+      options.setMaxNumWorkers(Math.min(maxNumWorkers, eventGeneratorWorkers));
+    }
+    try {
+      builder.build(options);
+    } finally {
+      options.setJobName(jobName);
+      options.setAppName(appName);
+      options.setNumWorkers(numWorkers);
+      options.setMaxNumWorkers(maxNumWorkers);
+    }
   }
 
   /**
@@ -498,6 +535,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
       if (endMsSinceEpoch >= 0 && now > endMsSinceEpoch && !waitingForShutdown) {
         NexmarkUtils.console("Reached end of test, cancelling job");
         try {
+          cancelJob = true;
           job.cancel();
         } catch (IOException e) {
           throw new RuntimeException("Unable to cancel main job: ", e);
@@ -745,11 +783,10 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
    * Return source of events from Pubsub.
    */
   private PCollection<Event> sourceEventsFromPubsub(Pipeline p, long now) {
-    String shortSubscription = shortSubscription(now);
-    NexmarkUtils.console("Reading events from Pubsub %s", shortSubscription);
+    NexmarkUtils.console("Reading events from Pubsub %s", pubsubSubscription);
 
     PubsubIO.Read<PubsubMessage> io =
-        PubsubIO.readMessagesWithAttributes().fromSubscription(shortSubscription)
+        PubsubIO.readMessagesWithAttributes().fromSubscription(pubsubSubscription)
             .withIdAttribute(NexmarkUtils.PUBSUB_ID);
     if (!configuration.usePubsubPublishTime) {
       io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
@@ -762,14 +799,9 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
   static final DoFn<Event, byte[]> EVENT_TO_BYTEARRAY =
           new DoFn<Event, byte[]>() {
             @ProcessElement
-            public void processElement(ProcessContext c) {
-              try {
-                byte[] encodedEvent = CoderUtils.encodeToByteArray(Event.CODER, c.element());
-                c.output(encodedEvent);
-              } catch (CoderException e1) {
-                LOG.error("Error while sending Event {} to Kafka: serialization error",
-                        c.element().toString());
-              }
+            public void processElement(ProcessContext c) throws IOException {
+              byte[] encodedEvent = CoderUtils.encodeToByteArray(Event.CODER, c.element());
+              c.output(encodedEvent);
             }
           };
 
@@ -787,18 +819,13 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
 
   }
 
-
   static final DoFn<KV<Long, byte[]>, Event> BYTEARRAY_TO_EVENT =
           new DoFn<KV<Long, byte[]>, Event>() {
             @ProcessElement
-            public void processElement(ProcessContext c) {
+            public void processElement(ProcessContext c) throws IOException {
               byte[] encodedEvent = c.element().getValue();
-              try {
-                Event event = CoderUtils.decodeFromByteArray(Event.CODER, encodedEvent);
-                c.output(event);
-              } catch (CoderException e) {
-                LOG.error("Error while decoding Event from Kafka message: serialization error");
-              }
+              Event event = CoderUtils.decodeFromByteArray(Event.CODER, encodedEvent);
+              c.output(event);
             }
           };
 
@@ -840,12 +867,12 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
   /**
    * Send {@code events} to Pubsub.
    */
-  private void sinkEventsToPubsub(PCollection<Event> events, long now) {
-    String shortTopic = shortTopic(now);
-    NexmarkUtils.console("Writing events to Pubsub %s", shortTopic);
+  private void sinkEventsToPubsub(PCollection<Event> events) {
+    checkState(pubsubTopic != null, "Pubsub topic needs to be set up before initializing sink");
+    NexmarkUtils.console("Writing events to Pubsub %s", pubsubTopic);
 
     PubsubIO.Write<PubsubMessage> io =
-        PubsubIO.writeMessages().to(shortTopic)
+        PubsubIO.writeMessages().to(pubsubTopic)
             .withIdAttribute(NexmarkUtils.PUBSUB_ID);
     if (!configuration.usePubsubPublishTime) {
       io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
@@ -965,6 +992,33 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
         .apply(queryName + ".WriteBigQueryResults", io);
   }
 
+  /**
+   * Creates or reuses PubSub topics and subscriptions as configured.
+   */
+  private void setupPubSubResources(long now) throws IOException {
+    String shortTopic = shortTopic(now);
+    String shortSubscription = shortSubscription(now);
+
+    if (!options.getManageResources() || configuration.pubSubMode == PubSubMode.SUBSCRIBE_ONLY) {
+      // The topic should already have been created by the user or
+      // a companion 'PUBLISH_ONLY' process.
+      pubsubTopic = pubsubHelper.reuseTopic(shortTopic).getPath();
+    } else {
+      // Create a fresh topic. It will be removed when the job is done.
+      pubsubTopic = pubsubHelper.createTopic(shortTopic).getPath();
+    }
+
+    // Create/confirm the subscription.
+    if (configuration.pubSubMode == PubSubMode.PUBLISH_ONLY) {
+      // Nothing to consume.
+    } else if (options.getManageResources()) {
+      pubsubSubscription = pubsubHelper.createSubscription(shortTopic, shortSubscription).getPath();
+    } else {
+      // The subscription should already have been created by the user.
+      pubsubSubscription = pubsubHelper.reuseSubscription(shortTopic, shortSubscription).getPath();
+    }
+  }
+
   // ================================================================================
   // Construct overall pipeline
   // ================================================================================
@@ -973,8 +1027,9 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
    * Return source of events for this run, or null if we are simply publishing events
    * to Pubsub.
    */
-  private PCollection<Event> createSource(Pipeline p, final long now) {
+  private PCollection<Event> createSource(Pipeline p, final long now) throws IOException {
     PCollection<Event> source = null;
+
     switch (configuration.sourceType) {
       case DIRECT:
         source = sourceEventsFromSynthetic(p);
@@ -986,6 +1041,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
         source = sourceEventsFromKafka(p);
         break;
       case PUBSUB:
+        setupPubSubResources(now);
         // Setup the sink for the publisher.
         switch (configuration.pubSubMode) {
           case SUBSCRIBE_ONLY:
@@ -995,8 +1051,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
             // Send synthesized events to Pubsub in this job.
             sinkEventsToPubsub(
                 sourceEventsFromSynthetic(p)
-                    .apply(queryName + ".Snoop", NexmarkUtils.snoop(queryName)),
-                now);
+                    .apply(queryName + ".Snoop", NexmarkUtils.snoop(queryName)));
             break;
           case COMBINED:
             // Send synthesized events to Pubsub in separate publisher job.
@@ -1009,9 +1064,9 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
                   publisherMonitor = new Monitor<>(queryName, "publisher");
                   sinkEventsToPubsub(
                       sourceEventsFromSynthetic(sp)
-                          .apply(queryName + ".Monitor", publisherMonitor.getTransform()),
-                      now);
+                          .apply(queryName + ".Monitor", publisherMonitor.getTransform()));
                   publisherResult = sp.run();
+                  NexmarkUtils.console("Publisher job is started.");
                 });
             break;
         }
@@ -1122,7 +1177,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
    * Run {@code configuration} and return its performance if possible.
    */
   @Nullable
-  public NexmarkPerf run(NexmarkConfiguration runConfiguration) {
+  public NexmarkPerf run(NexmarkConfiguration runConfiguration) throws IOException {
     if (options.getManageResources() && !options.getMonitorJobs()) {
       throw new RuntimeException("If using --manageResources then must also use --monitorJobs.");
     }
@@ -1133,6 +1188,9 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
     checkState(configuration == null);
     checkState(queryName == null);
     configuration = runConfiguration;
+    if (configuration.sourceType.equals(SourceType.PUBSUB)) {
+      pubsubHelper = PubsubHelper.create(options);
+    }
 
     try {
       NexmarkUtils.console("Running %s", configuration.toShortString());
@@ -1208,6 +1266,10 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
       mainResult.waitUntilFinish(Duration.standardSeconds(configuration.streamTimeout));
       return monitor(query);
     } finally {
+      if (pubsubHelper != null) {
+        pubsubHelper.cleanup();
+        pubsubHelper = null;
+      }
       configuration = null;
       queryName = null;
     }
@@ -1298,30 +1360,19 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
   }
 
   private static class PubsubMessageEventDoFn extends DoFn<PubsubMessage, Event> {
-
     @ProcessElement
-    public void processElement(ProcessContext c) {
+    public void processElement(ProcessContext c) throws IOException {
       byte[] payload = c.element().getPayload();
-      try {
-        Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload);
-        c.output(event);
-      } catch (CoderException e) {
-        LOG.error("Error while decoding Event from pusbSub message: serialization error");
-      }
+      Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload);
+      c.output(event);
     }
   }
 
   private static class EventPubsubMessageDoFn extends DoFn<Event, PubsubMessage> {
-
     @ProcessElement
-    public void processElement(ProcessContext c) {
-      try {
-        byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element());
-        c.output(new PubsubMessage(payload, new HashMap<>()));
-      } catch (CoderException e1) {
-        LOG.error(
-            "Error while sending Event {} to pusbSub: serialization error", c.element().toString());
-      }
+    public void processElement(ProcessContext c) throws IOException {
+      byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element());
+      c.output(new PubsubMessage(payload, Collections.emptyMap()));
     }
   }
 }
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
index a3386b6..ac894d2 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.nexmark;
 
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
@@ -29,7 +30,7 @@ import org.apache.beam.sdk.options.StreamingOptions;
  * Command line flags.
  */
 public interface NexmarkOptions
-    extends ApplicationNameOptions, GcpOptions, PipelineOptions, StreamingOptions {
+    extends ApplicationNameOptions, GcpOptions, PipelineOptions, PubsubOptions, StreamingOptions {
   @Description("Which suite to run. Default is to use command line arguments for one job.")
   @Default.Enum("DEFAULT")
   NexmarkSuite getSuite();
@@ -426,4 +427,12 @@ public interface NexmarkOptions
   String getBootstrapServers();
 
   void setBootstrapServers(String value);
+
+  @Description("Same as --numWorkers in DataflowPipelineWorkerPoolOptions")
+  int getNumWorkers();
+  void setNumWorkers(int value);
+
+  @Description("Same as --maxNumWorkers in DataflowPipelineWorkerPoolOptions.")
+  int getMaxNumWorkers();
+  void setMaxNumWorkers(int value);
 }
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java
index d38cb7b..a6e9f6b 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java
@@ -45,7 +45,12 @@ public enum NexmarkSuite {
   /**
    * As for SMOKE, but with 1b/100m events.
    */
-  FULL_THROTTLE(fullThrottle());
+  FULL_THROTTLE(fullThrottle()),
+
+  /**
+   * Query 10, at high volume with no autoscaling.
+   */
+  LONG_RUNNING_LOGGER(longRunningLogger());
 
   private static List<NexmarkConfiguration> defaultConf() {
     List<NexmarkConfiguration> configurations = new ArrayList<>();
@@ -89,6 +94,30 @@ public enum NexmarkSuite {
     return configurations;
   }
 
+  private static List<NexmarkConfiguration> longRunningLogger() {
+    NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT.copy();
+    configuration.numEventGenerators = 10;
+
+    configuration.query = 10;
+    configuration.isRateLimited = true;
+    configuration.sourceType = NexmarkUtils.SourceType.PUBSUB;
+    configuration.numEvents = 0; // as many as possible without overflow.
+    configuration.avgPersonByteSize = 500;
+    configuration.avgAuctionByteSize = 500;
+    configuration.avgBidByteSize = 500;
+    configuration.windowSizeSec = 300;
+    configuration.occasionalDelaySec = 360;
+    configuration.probDelayedEvent = 0.001;
+    configuration.useWallclockEventTime = true;
+    configuration.firstEventRate = 60000;
+    configuration.nextEventRate = 60000;
+    configuration.maxLogEvents = 15000;
+
+    List<NexmarkConfiguration> configurations = new ArrayList<>();
+    configurations.add(configuration);
+    return configurations;
+  }
+
   private final List<NexmarkConfiguration> configurations;
 
   NexmarkSuite(List<NexmarkConfiguration> configurations) {
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java
new file mode 100644
index 0000000..dd811d3
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.joda.time.Duration;
+
+/**
+ * Helper for working with pubsub and gcs.
+ */
+public class PubsubHelper {
+  /** Underlying pub/sub client. */
+  private final PubsubClient pubsubClient;
+
+  /** Project id. */
+  private final String project;
+
+  /** Topics we should delete on close. */
+  private final List<TopicPath> createdTopics;
+
+  /** Subscriptions we should delete on close. */
+  private final List<SubscriptionPath> createdSubscriptions;
+
+  /** How to sleep in retry loops. */
+  private final Sleeper sleeper;
+
+  /** How to backoff in retry loops. */
+  private final BackOff backOff;
+
+  private PubsubHelper(PubsubClient pubsubClient, String project) {
+    this.pubsubClient = pubsubClient;
+    this.project = project;
+    createdTopics = new ArrayList<>();
+    createdSubscriptions = new ArrayList<>();
+    sleeper = Sleeper.DEFAULT;
+    backOff = FluentBackoff.DEFAULT
+      .withInitialBackoff(Duration.standardSeconds(1))
+      .withMaxRetries(3)
+      .backoff();
+  }
+
+  /** Create a helper. */
+  public static PubsubHelper create(PubsubOptions options) throws IOException {
+    return new PubsubHelper(
+      PubsubJsonClient.FACTORY.newClient(null, null, options),
+      options.getProject());
+  }
+
+  /**
+   * Create a topic from short name. Delete it if it already exists. Ensure the topic will be
+   * deleted on cleanup. Return full topic name.
+   */
+  public TopicPath createTopic(String shortTopic) throws IOException {
+    TopicPath topic = PubsubClient.topicPathFromName(project, shortTopic);
+    while (true) {
+      try {
+        NexmarkUtils.console("create topic %s", topic);
+        pubsubClient.createTopic(topic);
+        createdTopics.add(topic);
+        return topic;
+      } catch (GoogleJsonResponseException ex) {
+        NexmarkUtils.console("attempting to cleanup topic %s", topic);
+        pubsubClient.deleteTopic(topic);
+        try {
+          if (!BackOffUtils.next(sleeper, backOff)) {
+            NexmarkUtils.console("too many retries for creating topic %s", topic);
+            throw ex;
+          }
+        } catch (InterruptedException in) {
+          throw new IOException(in);
+        }
+      }
+    }
+  }
+
+  /** Create a topic from short name if it does not already exist. The topic will not be
+   * deleted on cleanup. Return full topic name.
+   */
+  public TopicPath createOrReuseTopic(String shortTopic) throws IOException {
+    TopicPath topic = PubsubClient.topicPathFromName(project, shortTopic);
+    while (true) {
+      try {
+        NexmarkUtils.console("create topic %s", topic);
+        pubsubClient.createTopic(topic);
+        return topic;
+      } catch (GoogleJsonResponseException ex) {
+        if (topicExists(shortTopic)) {
+          NexmarkUtils.console("topic %s already exists", topic);
+          return topic;
+        }
+        try {
+          if (!BackOffUtils.next(sleeper, backOff)) {
+            NexmarkUtils.console("too many retries for creating/reusing topic %s", topic);
+            throw ex;
+          }
+        } catch (InterruptedException in) {
+          throw new IOException(in);
+        }
+      }
+    }
+  }
+
+  /**
+   * Check a topic corresponding to short name exists, and throw exception if not. The
+   * topic will not be deleted on cleanup. Return full topic name.
+   */
+  public TopicPath reuseTopic(String shortTopic) throws IOException {
+    TopicPath topic = PubsubClient.topicPathFromName(project, shortTopic);
+    if (topicExists(shortTopic)) {
+      NexmarkUtils.console("reusing existing topic %s", topic);
+      return topic;
+    }
+    throw new RuntimeException("topic '" + topic + "' does not already exist");
+  }
+
+  /** Does topic corresponding to short name exist? */
+  public boolean topicExists(String shortTopic) throws IOException {
+    TopicPath topic = PubsubClient.topicPathFromName(project, shortTopic);
+    return pubsubClient
+      .listTopics(PubsubClient.projectPathFromId(project))
+      .stream()
+      .anyMatch(topic::equals);
+  }
+
+  /**
+   * Create subscription from short name. Ensure the subscription will be deleted
+   * on cleanup. Return full subscription name.
+   */
+  public SubscriptionPath createSubscription(String shortTopic, String shortSubscription)
+      throws IOException {
+    TopicPath topic = PubsubClient.topicPathFromName(project, shortTopic);
+    SubscriptionPath subscription = PubsubClient.subscriptionPathFromName(project,
+                                                                          shortSubscription);
+    while (true) {
+      try {
+        NexmarkUtils.console("create subscription %s", subscription);
+        pubsubClient.createSubscription(topic, subscription, 60);
+        createdSubscriptions.add(subscription);
+        return subscription;
+      } catch (GoogleJsonResponseException ex) {
+        NexmarkUtils.console("attempting to cleanup subscription %s", subscription);
+        pubsubClient.deleteSubscription(subscription);
+        try {
+          if (!BackOffUtils.next(sleeper, backOff)) {
+            NexmarkUtils.console("too many retries for creating subscription %s", subscription);
+            throw ex;
+          }
+        } catch (InterruptedException in) {
+          throw new IOException(in);
+        }
+      }
+    }
+  }
+
+  /**
+   * Check a subscription corresponding to short name exists, and throw exception if not. The
+   * subscription will not be deleted on cleanup. Return full topic name.
+   */
+  public SubscriptionPath reuseSubscription(String shortTopic,
+                                            String shortSubscription) throws IOException {
+    SubscriptionPath subscription = PubsubClient.subscriptionPathFromName(project,
+                                                                          shortSubscription);
+    if (subscriptionExists(shortTopic, shortSubscription)) {
+      NexmarkUtils.console("reusing existing subscription %s", subscription);
+      return subscription;
+    }
+    throw new RuntimeException("subscription'" + subscription + "' does not already exist");
+  }
+
+  /** Does subscription corresponding to short name exist? */
+  public boolean subscriptionExists(String shortTopic, String shortSubscription)
+      throws IOException {
+    TopicPath topic = PubsubClient.topicPathFromName(project, shortTopic);
+    SubscriptionPath subscription = PubsubClient.subscriptionPathFromName(project,
+                                                                          shortSubscription);
+    return pubsubClient
+      .listSubscriptions(PubsubClient.projectPathFromId(project), topic)
+      .stream()
+      .anyMatch(subscription::equals);
+  }
+
+  /** Delete all the subscriptions and topics we created. */
+  public void cleanup() {
+    for (SubscriptionPath subscription : createdSubscriptions) {
+      try {
+        NexmarkUtils.console("delete subscription %s", subscription);
+        pubsubClient.deleteSubscription(subscription);
+      } catch (IOException ex) {
+        NexmarkUtils.console("could not delete subscription %s", subscription);
+      }
+    }
+    for (TopicPath topic : createdTopics) {
+      try {
+        NexmarkUtils.console("delete topic %s", topic);
+        pubsubClient.deleteTopic(topic);
+      } catch (IOException ex) {
+        NexmarkUtils.console("could not delete topic %s", topic);
+      }
+    }
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
altay@apache.org.