You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/05/09 23:33:14 UTC
[beam] branch master updated: Nexmark pubsub benchmarks. (#5247)
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c4d32c0 Nexmark pubsub benchmarks. (#5247)
c4d32c0 is described below
commit c4d32c0cbecbf517c3bdfe527bf44e3354560f25
Author: Raghu Angadi <ra...@apache.org>
AuthorDate: Wed May 9 16:33:10 2018 -0700
Nexmark pubsub benchmarks. (#5247)
* Nexmark pubsub benchmarks.
---
.../beam/sdk/io/gcp/pubsub/PubsubClient.java | 4 +-
.../beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java | 2 +-
.../beam/sdk/io/gcp/pubsub/PubsubJsonClient.java | 2 +-
.../java/org/apache/beam/sdk/nexmark/Main.java | 4 +-
.../apache/beam/sdk/nexmark/NexmarkLauncher.java | 149 +++++++++-----
.../apache/beam/sdk/nexmark/NexmarkOptions.java | 11 +-
.../org/apache/beam/sdk/nexmark/NexmarkSuite.java | 31 ++-
.../org/apache/beam/sdk/nexmark/PubsubHelper.java | 229 +++++++++++++++++++++
8 files changed, 375 insertions(+), 57 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
index f5a3ccb..82aae48 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
@@ -36,7 +36,7 @@ import javax.annotation.Nullable;
/**
* An (abstract) helper class for talking to Pubsub via an underlying transport.
*/
-abstract class PubsubClient implements Closeable {
+public abstract class PubsubClient implements Closeable {
/**
* Factory for creating clients.
*/
@@ -119,7 +119,7 @@ abstract class PubsubClient implements Closeable {
/**
* Path representing a cloud project id.
*/
- static class ProjectPath implements Serializable {
+ public static class ProjectPath implements Serializable {
private final String projectId;
/**
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
index 9778edf..2eb4a81 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
@@ -69,7 +69,7 @@ import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
* <p>CAUTION: Currently uses the application default credentials and does not respect any
* credentials-related arguments in {@link GcpOptions}.
*/
-class PubsubGrpcClient extends PubsubClient {
+public class PubsubGrpcClient extends PubsubClient {
private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com";
private static final int PUBSUB_PORT = 443;
private static final int LIST_BATCH_SIZE = 1000;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
index 1c786d9..1981787 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -53,7 +53,7 @@ import org.apache.beam.sdk.util.Transport;
/**
* A Pubsub client using JSON transport.
*/
-class PubsubJsonClient extends PubsubClient {
+public class PubsubJsonClient extends PubsubClient {
private static class PubsubJsonClientFactory implements PubsubClientFactory {
private static HttpRequestInitializer chainHttpRequestInitializer(
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
index ab2284c..1cada92 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
@@ -56,7 +56,7 @@ public class Main<OptionT extends NexmarkOptions> {
/**
* Entry point.
*/
- void runAll(OptionT options, NexmarkLauncher nexmarkLauncher) {
+ void runAll(OptionT options, NexmarkLauncher nexmarkLauncher) throws IOException {
Instant start = Instant.now();
Map<NexmarkConfiguration, NexmarkPerf> baseline = loadBaseline(options.getBaselineFilename());
Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>();
@@ -293,7 +293,7 @@ public class Main<OptionT extends NexmarkOptions> {
NexmarkUtils.console("saved javascript to file %s.", javascriptFilename);
}
- public static void main(String[] args) {
+ public static void main(String[] args) throws IOException {
NexmarkOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(NexmarkOptions.class);
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index 8f02136..d277389 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -30,14 +30,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
@@ -49,6 +47,8 @@ import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.nexmark.NexmarkUtils.PubSubMode;
+import org.apache.beam.sdk.nexmark.NexmarkUtils.SourceType;
import org.apache.beam.sdk.nexmark.model.Auction;
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.Event;
@@ -176,6 +176,21 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
@Nullable
private String queryName;
+ /**
+ * Full path of the PubSub topic (when PubSub is enabled).
+ */
+ @Nullable
+ private String pubsubTopic;
+
+ /**
+ * Full path of the PubSub subscription (when PubSub is enabled).
+ */
+ @Nullable
+ private String pubsubSubscription;
+
+ @Nullable
+ private PubsubHelper pubsubHelper;
+
public NexmarkLauncher(OptionT options) {
this.options = options;
}
@@ -457,7 +472,29 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
* Invoke the builder with options suitable for running a publish-only child pipeline.
*/
private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder<NexmarkOptions> builder) {
- builder.build(options);
+ String jobName = options.getJobName();
+ String appName = options.getAppName();
+ int numWorkers = options.getNumWorkers();
+ int maxNumWorkers = options.getMaxNumWorkers();
+
+ options.setJobName("p-" + jobName);
+ options.setAppName("p-" + appName);
+ int eventGeneratorWorkers = configuration.numEventGenerators;
+ // TODO: assign one generator per core rather than one per worker.
+ if (numWorkers > 0 && eventGeneratorWorkers > 0) {
+ options.setNumWorkers(Math.min(numWorkers, eventGeneratorWorkers));
+ }
+ if (maxNumWorkers > 0 && eventGeneratorWorkers > 0) {
+ options.setMaxNumWorkers(Math.min(maxNumWorkers, eventGeneratorWorkers));
+ }
+ try {
+ builder.build(options);
+ } finally {
+ options.setJobName(jobName);
+ options.setAppName(appName);
+ options.setNumWorkers(numWorkers);
+ options.setMaxNumWorkers(maxNumWorkers);
+ }
}
/**
@@ -498,6 +535,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
if (endMsSinceEpoch >= 0 && now > endMsSinceEpoch && !waitingForShutdown) {
NexmarkUtils.console("Reached end of test, cancelling job");
try {
+ cancelJob = true;
job.cancel();
} catch (IOException e) {
throw new RuntimeException("Unable to cancel main job: ", e);
@@ -745,11 +783,10 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
* Return source of events from Pubsub.
*/
private PCollection<Event> sourceEventsFromPubsub(Pipeline p, long now) {
- String shortSubscription = shortSubscription(now);
- NexmarkUtils.console("Reading events from Pubsub %s", shortSubscription);
+ NexmarkUtils.console("Reading events from Pubsub %s", pubsubSubscription);
PubsubIO.Read<PubsubMessage> io =
- PubsubIO.readMessagesWithAttributes().fromSubscription(shortSubscription)
+ PubsubIO.readMessagesWithAttributes().fromSubscription(pubsubSubscription)
.withIdAttribute(NexmarkUtils.PUBSUB_ID);
if (!configuration.usePubsubPublishTime) {
io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
@@ -762,14 +799,9 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
static final DoFn<Event, byte[]> EVENT_TO_BYTEARRAY =
new DoFn<Event, byte[]>() {
@ProcessElement
- public void processElement(ProcessContext c) {
- try {
- byte[] encodedEvent = CoderUtils.encodeToByteArray(Event.CODER, c.element());
- c.output(encodedEvent);
- } catch (CoderException e1) {
- LOG.error("Error while sending Event {} to Kafka: serialization error",
- c.element().toString());
- }
+ public void processElement(ProcessContext c) throws IOException {
+ byte[] encodedEvent = CoderUtils.encodeToByteArray(Event.CODER, c.element());
+ c.output(encodedEvent);
}
};
@@ -787,18 +819,13 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
}
-
static final DoFn<KV<Long, byte[]>, Event> BYTEARRAY_TO_EVENT =
new DoFn<KV<Long, byte[]>, Event>() {
@ProcessElement
- public void processElement(ProcessContext c) {
+ public void processElement(ProcessContext c) throws IOException {
byte[] encodedEvent = c.element().getValue();
- try {
- Event event = CoderUtils.decodeFromByteArray(Event.CODER, encodedEvent);
- c.output(event);
- } catch (CoderException e) {
- LOG.error("Error while decoding Event from Kafka message: serialization error");
- }
+ Event event = CoderUtils.decodeFromByteArray(Event.CODER, encodedEvent);
+ c.output(event);
}
};
@@ -840,12 +867,12 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
/**
* Send {@code events} to Pubsub.
*/
- private void sinkEventsToPubsub(PCollection<Event> events, long now) {
- String shortTopic = shortTopic(now);
- NexmarkUtils.console("Writing events to Pubsub %s", shortTopic);
+ private void sinkEventsToPubsub(PCollection<Event> events) {
+ checkState(pubsubTopic != null, "Pubsub topic needs to be set up before initializing sink");
+ NexmarkUtils.console("Writing events to Pubsub %s", pubsubTopic);
PubsubIO.Write<PubsubMessage> io =
- PubsubIO.writeMessages().to(shortTopic)
+ PubsubIO.writeMessages().to(pubsubTopic)
.withIdAttribute(NexmarkUtils.PUBSUB_ID);
if (!configuration.usePubsubPublishTime) {
io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
@@ -965,6 +992,33 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
.apply(queryName + ".WriteBigQueryResults", io);
}
+ /**
+ * Creates or reuses PubSub topics and subscriptions as configured.
+ */
+ private void setupPubSubResources(long now) throws IOException {
+ String shortTopic = shortTopic(now);
+ String shortSubscription = shortSubscription(now);
+
+ if (!options.getManageResources() || configuration.pubSubMode == PubSubMode.SUBSCRIBE_ONLY) {
+ // The topic should already have been created by the user or
+ // a companion 'PUBLISH_ONLY' process.
+ pubsubTopic = pubsubHelper.reuseTopic(shortTopic).getPath();
+ } else {
+ // Create a fresh topic. It will be removed when the job is done.
+ pubsubTopic = pubsubHelper.createTopic(shortTopic).getPath();
+ }
+
+ // Create/confirm the subscription.
+ if (configuration.pubSubMode == PubSubMode.PUBLISH_ONLY) {
+ // Nothing to consume.
+ } else if (options.getManageResources()) {
+ pubsubSubscription = pubsubHelper.createSubscription(shortTopic, shortSubscription).getPath();
+ } else {
+ // The subscription should already have been created by the user.
+ pubsubSubscription = pubsubHelper.reuseSubscription(shortTopic, shortSubscription).getPath();
+ }
+ }
+
// ================================================================================
// Construct overall pipeline
// ================================================================================
@@ -973,8 +1027,9 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
* Return source of events for this run, or null if we are simply publishing events
* to Pubsub.
*/
- private PCollection<Event> createSource(Pipeline p, final long now) {
+ private PCollection<Event> createSource(Pipeline p, final long now) throws IOException {
PCollection<Event> source = null;
+
switch (configuration.sourceType) {
case DIRECT:
source = sourceEventsFromSynthetic(p);
@@ -986,6 +1041,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
source = sourceEventsFromKafka(p);
break;
case PUBSUB:
+ setupPubSubResources(now);
// Setup the sink for the publisher.
switch (configuration.pubSubMode) {
case SUBSCRIBE_ONLY:
@@ -995,8 +1051,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
// Send synthesized events to Pubsub in this job.
sinkEventsToPubsub(
sourceEventsFromSynthetic(p)
- .apply(queryName + ".Snoop", NexmarkUtils.snoop(queryName)),
- now);
+ .apply(queryName + ".Snoop", NexmarkUtils.snoop(queryName)));
break;
case COMBINED:
// Send synthesized events to Pubsub in separate publisher job.
@@ -1009,9 +1064,9 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
publisherMonitor = new Monitor<>(queryName, "publisher");
sinkEventsToPubsub(
sourceEventsFromSynthetic(sp)
- .apply(queryName + ".Monitor", publisherMonitor.getTransform()),
- now);
+ .apply(queryName + ".Monitor", publisherMonitor.getTransform()));
publisherResult = sp.run();
+ NexmarkUtils.console("Publisher job is started.");
});
break;
}
@@ -1122,7 +1177,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
* Run {@code configuration} and return its performance if possible.
*/
@Nullable
- public NexmarkPerf run(NexmarkConfiguration runConfiguration) {
+ public NexmarkPerf run(NexmarkConfiguration runConfiguration) throws IOException {
if (options.getManageResources() && !options.getMonitorJobs()) {
throw new RuntimeException("If using --manageResources then must also use --monitorJobs.");
}
@@ -1133,6 +1188,9 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
checkState(configuration == null);
checkState(queryName == null);
configuration = runConfiguration;
+ if (configuration.sourceType.equals(SourceType.PUBSUB)) {
+ pubsubHelper = PubsubHelper.create(options);
+ }
try {
NexmarkUtils.console("Running %s", configuration.toShortString());
@@ -1208,6 +1266,10 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
mainResult.waitUntilFinish(Duration.standardSeconds(configuration.streamTimeout));
return monitor(query);
} finally {
+ if (pubsubHelper != null) {
+ pubsubHelper.cleanup();
+ pubsubHelper = null;
+ }
configuration = null;
queryName = null;
}
@@ -1298,30 +1360,19 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
}
private static class PubsubMessageEventDoFn extends DoFn<PubsubMessage, Event> {
-
@ProcessElement
- public void processElement(ProcessContext c) {
+ public void processElement(ProcessContext c) throws IOException {
byte[] payload = c.element().getPayload();
- try {
- Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload);
- c.output(event);
- } catch (CoderException e) {
- LOG.error("Error while decoding Event from pusbSub message: serialization error");
- }
+ Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload);
+ c.output(event);
}
}
private static class EventPubsubMessageDoFn extends DoFn<Event, PubsubMessage> {
-
@ProcessElement
- public void processElement(ProcessContext c) {
- try {
- byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element());
- c.output(new PubsubMessage(payload, new HashMap<>()));
- } catch (CoderException e1) {
- LOG.error(
- "Error while sending Event {} to pusbSub: serialization error", c.element().toString());
- }
+ public void processElement(ProcessContext c) throws IOException {
+ byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element());
+ c.output(new PubsubMessage(payload, Collections.emptyMap()));
}
}
}
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
index a3386b6..ac894d2 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.nexmark;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
@@ -29,7 +30,7 @@ import org.apache.beam.sdk.options.StreamingOptions;
* Command line flags.
*/
public interface NexmarkOptions
- extends ApplicationNameOptions, GcpOptions, PipelineOptions, StreamingOptions {
+ extends ApplicationNameOptions, GcpOptions, PipelineOptions, PubsubOptions, StreamingOptions {
@Description("Which suite to run. Default is to use command line arguments for one job.")
@Default.Enum("DEFAULT")
NexmarkSuite getSuite();
@@ -426,4 +427,12 @@ public interface NexmarkOptions
String getBootstrapServers();
void setBootstrapServers(String value);
+
+ @Description("Same as --numWorkers in DataflowPipelineWorkerPoolOptions")
+ int getNumWorkers();
+ void setNumWorkers(int value);
+
+ @Description("Same as --maxNumWorkers in DataflowPipelineWorkerPoolOptions.")
+ int getMaxNumWorkers();
+ void setMaxNumWorkers(int value);
}
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java
index d38cb7b..a6e9f6b 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java
@@ -45,7 +45,12 @@ public enum NexmarkSuite {
/**
* As for SMOKE, but with 1b/100m events.
*/
- FULL_THROTTLE(fullThrottle());
+ FULL_THROTTLE(fullThrottle()),
+
+ /**
+ * Query 10, at high volume with no autoscaling.
+ */
+ LONG_RUNNING_LOGGER(longRunningLogger());
private static List<NexmarkConfiguration> defaultConf() {
List<NexmarkConfiguration> configurations = new ArrayList<>();
@@ -89,6 +94,30 @@ public enum NexmarkSuite {
return configurations;
}
+ private static List<NexmarkConfiguration> longRunningLogger() {
+ NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT.copy();
+ configuration.numEventGenerators = 10;
+
+ configuration.query = 10;
+ configuration.isRateLimited = true;
+ configuration.sourceType = NexmarkUtils.SourceType.PUBSUB;
+ configuration.numEvents = 0; // as many as possible without overflow.
+ configuration.avgPersonByteSize = 500;
+ configuration.avgAuctionByteSize = 500;
+ configuration.avgBidByteSize = 500;
+ configuration.windowSizeSec = 300;
+ configuration.occasionalDelaySec = 360;
+ configuration.probDelayedEvent = 0.001;
+ configuration.useWallclockEventTime = true;
+ configuration.firstEventRate = 60000;
+ configuration.nextEventRate = 60000;
+ configuration.maxLogEvents = 15000;
+
+ List<NexmarkConfiguration> configurations = new ArrayList<>();
+ configurations.add(configuration);
+ return configurations;
+ }
+
private final List<NexmarkConfiguration> configurations;
NexmarkSuite(List<NexmarkConfiguration> configurations) {
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java
new file mode 100644
index 0000000..dd811d3
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.joda.time.Duration;
+
+/**
+ * Helper for working with pubsub and gcs.
+ */
+public class PubsubHelper {
+ /** Underlying pub/sub client. */
+ private final PubsubClient pubsubClient;
+
+ /** Project id. */
+ private final String project;
+
+ /** Topics we should delete on close. */
+ private final List<TopicPath> createdTopics;
+
+ /** Subscriptions we should delete on close. */
+ private final List<SubscriptionPath> createdSubscriptions;
+
+ /** How to sleep in retry loops. */
+ private final Sleeper sleeper;
+
+ /** How to backoff in retry loops. */
+ private final BackOff backOff;
+
+ private PubsubHelper(PubsubClient pubsubClient, String project) {
+ this.pubsubClient = pubsubClient;
+ this.project = project;
+ createdTopics = new ArrayList<>();
+ createdSubscriptions = new ArrayList<>();
+ sleeper = Sleeper.DEFAULT;
+ backOff = FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.standardSeconds(1))
+ .withMaxRetries(3)
+ .backoff();
+ }
+
+ /** Create a helper. */
+ public static PubsubHelper create(PubsubOptions options) throws IOException {
+ return new PubsubHelper(
+ PubsubJsonClient.FACTORY.newClient(null, null, options),
+ options.getProject());
+ }
+
+ /**
+ * Create a topic from short name. Delete it if it already exists. Ensure the topic will be
+ * deleted on cleanup. Return full topic name.
+ */
+ public TopicPath createTopic(String shortTopic) throws IOException {
+ TopicPath topic = PubsubClient.topicPathFromName(project, shortTopic);
+ while (true) {
+ try {
+ NexmarkUtils.console("create topic %s", topic);
+ pubsubClient.createTopic(topic);
+ createdTopics.add(topic);
+ return topic;
+ } catch (GoogleJsonResponseException ex) {
+ NexmarkUtils.console("attempting to cleanup topic %s", topic);
+ pubsubClient.deleteTopic(topic);
+ try {
+ if (!BackOffUtils.next(sleeper, backOff)) {
+ NexmarkUtils.console("too many retries for creating topic %s", topic);
+ throw ex;
+ }
+ } catch (InterruptedException in) {
+ throw new IOException(in);
+ }
+ }
+ }
+ }
+
+ /** Create a topic from short name if it does not already exist. The topic will not be
+ * deleted on cleanup. Return full topic name.
+ */
+ public TopicPath createOrReuseTopic(String shortTopic) throws IOException {
+ TopicPath topic = PubsubClient.topicPathFromName(project, shortTopic);
+ while (true) {
+ try {
+ NexmarkUtils.console("create topic %s", topic);
+ pubsubClient.createTopic(topic);
+ return topic;
+ } catch (GoogleJsonResponseException ex) {
+ if (topicExists(shortTopic)) {
+ NexmarkUtils.console("topic %s already exists", topic);
+ return topic;
+ }
+ try {
+ if (!BackOffUtils.next(sleeper, backOff)) {
+ NexmarkUtils.console("too many retries for creating/reusing topic %s", topic);
+ throw ex;
+ }
+ } catch (InterruptedException in) {
+ throw new IOException(in);
+ }
+ }
+ }
+ }
+
+ /**
+ * Check a topic corresponding to short name exists, and throw exception if not. The
+ * topic will not be deleted on cleanup. Return full topic name.
+ */
+ public TopicPath reuseTopic(String shortTopic) throws IOException {
+ TopicPath topic = PubsubClient.topicPathFromName(project, shortTopic);
+ if (topicExists(shortTopic)) {
+ NexmarkUtils.console("reusing existing topic %s", topic);
+ return topic;
+ }
+ throw new RuntimeException("topic '" + topic + "' does not already exist");
+ }
+
+ /** Does topic corresponding to short name exist? */
+ public boolean topicExists(String shortTopic) throws IOException {
+ TopicPath topic = PubsubClient.topicPathFromName(project, shortTopic);
+ return pubsubClient
+ .listTopics(PubsubClient.projectPathFromId(project))
+ .stream()
+ .anyMatch(topic::equals);
+ }
+
+ /**
+ * Create subscription from short name. Ensure the subscription will be deleted
+ * on cleanup. Return full subscription name.
+ */
+ public SubscriptionPath createSubscription(String shortTopic, String shortSubscription)
+ throws IOException {
+ TopicPath topic = PubsubClient.topicPathFromName(project, shortTopic);
+ SubscriptionPath subscription = PubsubClient.subscriptionPathFromName(project,
+ shortSubscription);
+ while (true) {
+ try {
+ NexmarkUtils.console("create subscription %s", subscription);
+ pubsubClient.createSubscription(topic, subscription, 60);
+ createdSubscriptions.add(subscription);
+ return subscription;
+ } catch (GoogleJsonResponseException ex) {
+ NexmarkUtils.console("attempting to cleanup subscription %s", subscription);
+ pubsubClient.deleteSubscription(subscription);
+ try {
+ if (!BackOffUtils.next(sleeper, backOff)) {
+ NexmarkUtils.console("too many retries for creating subscription %s", subscription);
+ throw ex;
+ }
+ } catch (InterruptedException in) {
+ throw new IOException(in);
+ }
+ }
+ }
+ }
+
+ /**
+ * Check a subscription corresponding to short name exists, and throw exception if not. The
+ * subscription will not be deleted on cleanup. Return full topic name.
+ */
+ public SubscriptionPath reuseSubscription(String shortTopic,
+ String shortSubscription) throws IOException {
+ SubscriptionPath subscription = PubsubClient.subscriptionPathFromName(project,
+ shortSubscription);
+ if (subscriptionExists(shortTopic, shortSubscription)) {
+ NexmarkUtils.console("reusing existing subscription %s", subscription);
+ return subscription;
+ }
+ throw new RuntimeException("subscription'" + subscription + "' does not already exist");
+ }
+
+ /** Does subscription corresponding to short name exist? */
+ public boolean subscriptionExists(String shortTopic, String shortSubscription)
+ throws IOException {
+ TopicPath topic = PubsubClient.topicPathFromName(project, shortTopic);
+ SubscriptionPath subscription = PubsubClient.subscriptionPathFromName(project,
+ shortSubscription);
+ return pubsubClient
+ .listSubscriptions(PubsubClient.projectPathFromId(project), topic)
+ .stream()
+ .anyMatch(subscription::equals);
+ }
+
+ /** Delete all the subscriptions and topics we created. */
+ public void cleanup() {
+ for (SubscriptionPath subscription : createdSubscriptions) {
+ try {
+ NexmarkUtils.console("delete subscription %s", subscription);
+ pubsubClient.deleteSubscription(subscription);
+ } catch (IOException ex) {
+ NexmarkUtils.console("could not delete subscription %s", subscription);
+ }
+ }
+ for (TopicPath topic : createdTopics) {
+ try {
+ NexmarkUtils.console("delete topic %s", topic);
+ pubsubClient.deleteTopic(topic);
+ } catch (IOException ex) {
+ NexmarkUtils.console("could not delete topic %s", topic);
+ }
+ }
+ }
+}
--
To stop receiving notification emails like this one, please contact
altay@apache.org.