You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:33 UTC
[25/55] [abbrv] beam git commit: Change Nexmark pom structure to
mirror other modules on Beam
Change Nexmark pom structure to mirror other modules on Beam
Fix compile after PubsubIO refactor
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8098bb1d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8098bb1d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8098bb1d
Branch: refs/heads/master
Commit: 8098bb1dbcc22153960d9b4483327e2977641148
Parents: 7ef49dc
Author: Ismaël Mejía <ie...@apache.org>
Authored: Thu Apr 13 10:47:54 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:27 2017 +0200
----------------------------------------------------------------------
integration/java/nexmark/pom.xml | 48 +-
.../beam/integration/nexmark/NexmarkRunner.java | 2 +-
.../integration/nexmark/io/PubsubClient.java | 543 +++++++++++++++++++
.../integration/nexmark/io/PubsubHelper.java | 2 -
.../nexmark/io/PubsubJsonClient.java | 318 +++++++++++
.../nexmark/io/PubsubTestClient.java | 436 +++++++++++++++
integration/java/pom.xml | 37 ++
integration/pom.xml | 37 ++
pom.xml | 2 +-
9 files changed, 1401 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml
index 7cd7d39..67d6117 100644
--- a/integration/java/nexmark/pom.xml
+++ b/integration/java/nexmark/pom.xml
@@ -22,19 +22,17 @@
<parent>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-parent</artifactId>
+ <artifactId>beam-integration-java-parent</artifactId>
<version>0.7.0-SNAPSHOT</version>
- <relativePath>../../../pom.xml</relativePath>
+ <relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>beam-integration-java</artifactId>
+ <artifactId>beam-integration-java-nexmark</artifactId>
<name>Apache Beam :: Integration Tests :: Java :: Nexmark</name>
<packaging>jar</packaging>
<properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<flink.version>1.2.0</flink.version>
<spark.version>1.6.3</spark.version>
<apex.codehaus.jackson.version>1.9.3</apex.codehaus.jackson.version>
@@ -253,11 +251,36 @@
</dependency>
<dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-pubsub</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.auth</groupId>
+ <artifactId>google-auth-library-credentials</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.auth</groupId>
+ <artifactId>google-auth-library-oauth2-http</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcsio</artifactId>
</dependency>
<dependency>
+ <groupId>com.google.cloud.bigdataoss</groupId>
+ <artifactId>util</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.http-client</groupId>
+ <artifactId>google-http-client</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
@@ -288,13 +311,6 @@
<scope>compile</scope>
</dependency>
- <!--<dependency>-->
- <!--<groupId>org.slf4j</groupId>-->
- <!--<artifactId>slf4j-jdk14</artifactId>-->
- <!--<!– When loaded at runtime this will wire up slf4j to the JUL backend –>-->
- <!--<scope>runtime</scope>-->
- <!--</dependency>-->
-
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
@@ -305,13 +321,5 @@
<artifactId>junit</artifactId>
<scope>compile</scope>
</dependency>
-
- <!--<dependency>-->
- <!--<groupId>io.netty</groupId>-->
- <!--<artifactId>netty-tcnative-boringssl-static</artifactId>-->
- <!--<version>1.1.33.Fork13</version>-->
- <!--<classifier>${os.detected.classifier}</classifier>-->
- <!--<scope>runtime</scope>-->
- <!--</dependency>-->
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index df1000a..3a0452f 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -67,9 +67,9 @@ import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.io.PubsubIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java
new file mode 100644
index 0000000..687aa35
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark.io;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.client.util.DateTime;
+import com.google.common.base.Objects;
+import com.google.common.base.Strings;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PubsubOptions;
+
+/**
+ * An (abstract) helper class for talking to Pubsub via an underlying transport.
+ */
+abstract class PubsubClient implements Closeable {
+ /**
+ * Factory for creating clients.
+ */
+ public interface PubsubClientFactory extends Serializable {
+ /**
+ * Construct a new Pubsub client. It should be closed via {@link #close} in order
+ * to ensure tidy cleanup of underlying netty resources (or use the try-with-resources
+ * construct). Uses {@code options} to derive pubsub endpoints and application credentials.
+ * If non-{@literal null}, use {@code timestampLabel} and {@code idLabel} to store custom
+ * timestamps/ids within message metadata.
+ */
+ PubsubClient newClient(@Nullable String timestampLabel,
+ @Nullable String idLabel, PubsubOptions options) throws IOException;
+
+ /**
+ * Return the display name for this factory. Eg "Json", "gRPC".
+ */
+ String getKind();
+ }
+
+ /**
+ * Return timestamp as ms-since-unix-epoch corresponding to {@code timestamp}.
+ * Return {@literal null} if no timestamp could be found. Throw {@link IllegalArgumentException}
+ * if timestamp cannot be recognized.
+ */
+ @Nullable
+ private static Long asMsSinceEpoch(@Nullable String timestamp) {
+ if (Strings.isNullOrEmpty(timestamp)) {
+ return null;
+ }
+ try {
+ // Try parsing as milliseconds since epoch. Note there is no way to parse a
+ // string in RFC 3339 format here.
+ // Expected IllegalArgumentException if parsing fails; we use that to fall back
+ // to RFC 3339.
+ return Long.parseLong(timestamp);
+ } catch (IllegalArgumentException e1) {
+ // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an
+ // IllegalArgumentException if parsing fails, and the caller should handle.
+ return DateTime.parseRfc3339(timestamp).getValue();
+ }
+ }
+
+ /**
+ * Return the timestamp (in ms since unix epoch) to use for a Pubsub message with {@code
+ * attributes} and {@code pubsubTimestamp}.
+ *
+ * <p>If {@code timestampLabel} is non-{@literal null} then the message attributes must contain
+ * that label, and the value of that label will be taken as the timestamp.
+ * Otherwise the timestamp will be taken from the Pubsub publish timestamp {@code
+ * pubsubTimestamp}.
+ *
+ * @throws IllegalArgumentException if the timestamp cannot be recognized as a ms-since-unix-epoch
+ * or RFC3339 time.
+ */
+ protected static long extractTimestamp(
+ @Nullable String timestampLabel,
+ @Nullable String pubsubTimestamp,
+ @Nullable Map<String, String> attributes) {
+ Long timestampMsSinceEpoch;
+ if (Strings.isNullOrEmpty(timestampLabel)) {
+ timestampMsSinceEpoch = asMsSinceEpoch(pubsubTimestamp);
+ checkArgument(timestampMsSinceEpoch != null,
+ "Cannot interpret PubSub publish timestamp: %s",
+ pubsubTimestamp);
+ } else {
+ String value = attributes == null ? null : attributes.get(timestampLabel);
+ checkArgument(value != null,
+ "PubSub message is missing a value for timestamp label %s",
+ timestampLabel);
+ timestampMsSinceEpoch = asMsSinceEpoch(value);
+ checkArgument(timestampMsSinceEpoch != null,
+ "Cannot interpret value of label %s as timestamp: %s",
+ timestampLabel, value);
+ }
+ return timestampMsSinceEpoch;
+ }
+
+ /**
+ * Path representing a cloud project id.
+ */
+ static class ProjectPath implements Serializable {
+ private final String projectId;
+
+ /**
+ * Creates a {@link ProjectPath} from a {@link String} representation, which
+ * must be of the form {@code "projects/" + projectId}.
+ */
+ ProjectPath(String path) {
+ String[] splits = path.split("/");
+ checkArgument(
+ splits.length == 2 && splits[0].equals("projects"),
+ "Malformed project path \"%s\": must be of the form \"projects/\" + <project id>",
+ path);
+ this.projectId = splits[1];
+ }
+
+ public String getPath() {
+ return String.format("projects/%s", projectId);
+ }
+
+ public String getId() {
+ return projectId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ProjectPath that = (ProjectPath) o;
+
+ return projectId.equals(that.projectId);
+ }
+
+ @Override
+ public int hashCode() {
+ return projectId.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return getPath();
+ }
+ }
+
+ public static ProjectPath projectPathFromPath(String path) {
+ return new ProjectPath(path);
+ }
+
+ public static ProjectPath projectPathFromId(String projectId) {
+ return new ProjectPath(String.format("projects/%s", projectId));
+ }
+
+ /**
+ * Path representing a Pubsub subscription.
+ */
+ public static class SubscriptionPath implements Serializable {
+ private final String projectId;
+ private final String subscriptionName;
+
+ SubscriptionPath(String path) {
+ String[] splits = path.split("/");
+ checkState(
+ splits.length == 4 && splits[0].equals("projects") && splits[2].equals("subscriptions"),
+ "Malformed subscription path %s: "
+ + "must be of the form \"projects/\" + <project id> + \"subscriptions\"", path);
+ this.projectId = splits[1];
+ this.subscriptionName = splits[3];
+ }
+
+ public String getPath() {
+ return String.format("projects/%s/subscriptions/%s", projectId, subscriptionName);
+ }
+
+ public String getName() {
+ return subscriptionName;
+ }
+
+ public String getV1Beta1Path() {
+ return String.format("/subscriptions/%s/%s", projectId, subscriptionName);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SubscriptionPath that = (SubscriptionPath) o;
+ return this.subscriptionName.equals(that.subscriptionName)
+ && this.projectId.equals(that.projectId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(projectId, subscriptionName);
+ }
+
+ @Override
+ public String toString() {
+ return getPath();
+ }
+ }
+
+ public static SubscriptionPath subscriptionPathFromPath(String path) {
+ return new SubscriptionPath(path);
+ }
+
+ public static SubscriptionPath subscriptionPathFromName(
+ String projectId, String subscriptionName) {
+ return new SubscriptionPath(String.format("projects/%s/subscriptions/%s",
+ projectId, subscriptionName));
+ }
+
+ /**
+ * Path representing a Pubsub topic.
+ */
+ public static class TopicPath implements Serializable {
+ private final String path;
+
+ TopicPath(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public String getName() {
+ String[] splits = path.split("/");
+ checkState(splits.length == 4, "Malformed topic path %s", path);
+ return splits[3];
+ }
+
+ public String getV1Beta1Path() {
+ String[] splits = path.split("/");
+ checkState(splits.length == 4, "Malformed topic path %s", path);
+ return String.format("/topics/%s/%s", splits[1], splits[3]);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TopicPath topicPath = (TopicPath) o;
+ return path.equals(topicPath.path);
+ }
+
+ @Override
+ public int hashCode() {
+ return path.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return path;
+ }
+ }
+
+ public static TopicPath topicPathFromPath(String path) {
+ return new TopicPath(path);
+ }
+
+ public static TopicPath topicPathFromName(String projectId, String topicName) {
+ return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName));
+ }
+
+ /**
+ * A message to be sent to Pubsub.
+ *
+ * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}.
+ * Java serialization is never used for non-test clients.
+ */
+ static class OutgoingMessage implements Serializable {
+ /**
+ * Underlying (encoded) element.
+ */
+ public final byte[] elementBytes;
+
+ public final Map<String, String> attributes;
+
+ /**
+ * Timestamp for element (ms since epoch).
+ */
+ public final long timestampMsSinceEpoch;
+
+ /**
+ * If using an id label, the record id to associate with this record's metadata so the receiver
+ * can reject duplicates. Otherwise {@literal null}.
+ */
+ @Nullable
+ public final String recordId;
+
+ public OutgoingMessage(byte[] elementBytes, Map<String, String> attributes,
+ long timestampMsSinceEpoch, @Nullable String recordId) {
+ this.elementBytes = elementBytes;
+ this.attributes = attributes;
+ this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+ this.recordId = recordId;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("OutgoingMessage(%db, %dms)",
+ elementBytes.length, timestampMsSinceEpoch);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ OutgoingMessage that = (OutgoingMessage) o;
+
+ return timestampMsSinceEpoch == that.timestampMsSinceEpoch
+ && Arrays.equals(elementBytes, that.elementBytes)
+ && Objects.equal(attributes, that.attributes)
+ && Objects.equal(recordId, that.recordId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch,
+ recordId);
+ }
+ }
+
+ /**
+ * A message received from Pubsub.
+ *
+ * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}.
+ * Java serialization is never used for non-test clients.
+ */
+ static class IncomingMessage implements Serializable {
+ /**
+ * Underlying (encoded) element.
+ */
+ public final byte[] elementBytes;
+
+ public Map<String, String> attributes;
+
+ /**
+ * Timestamp for element (ms since epoch). Either Pubsub's processing time,
+ * or the custom timestamp associated with the message.
+ */
+ public final long timestampMsSinceEpoch;
+
+ /**
+ * Timestamp (in system time) at which we requested the message (ms since epoch).
+ */
+ public final long requestTimeMsSinceEpoch;
+
+ /**
+ * Id to pass back to Pubsub to acknowledge receipt of this message.
+ */
+ public final String ackId;
+
+ /**
+ * Id to pass to the runner to distinguish this message from all others.
+ */
+ public final String recordId;
+
+ public IncomingMessage(
+ byte[] elementBytes,
+ Map<String, String> attributes,
+ long timestampMsSinceEpoch,
+ long requestTimeMsSinceEpoch,
+ String ackId,
+ String recordId) {
+ this.elementBytes = elementBytes;
+ this.attributes = attributes;
+ this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+ this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
+ this.ackId = ackId;
+ this.recordId = recordId;
+ }
+
+ public IncomingMessage withRequestTime(long requestTimeMsSinceEpoch) {
+ return new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
+ requestTimeMsSinceEpoch, ackId, recordId);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("IncomingMessage(%db, %dms)",
+ elementBytes.length, timestampMsSinceEpoch);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ IncomingMessage that = (IncomingMessage) o;
+
+ return timestampMsSinceEpoch == that.timestampMsSinceEpoch
+ && requestTimeMsSinceEpoch == that.requestTimeMsSinceEpoch
+ && ackId.equals(that.ackId)
+ && recordId.equals(that.recordId)
+ && Arrays.equals(elementBytes, that.elementBytes)
+ && Objects.equal(attributes, that.attributes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch,
+ requestTimeMsSinceEpoch,
+ ackId, recordId);
+ }
+ }
+
+ /**
+ * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages
+ * published.
+ */
+ public abstract int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
+ throws IOException;
+
+ /**
+ * Request the next batch of up to {@code batchSize} messages from {@code subscription}.
+ * Return the received messages, or empty collection if none were available. Does not
+ * wait for messages to arrive if {@code returnImmediately} is {@literal true}.
+ * Returned messages will record their request time as {@code requestTimeMsSinceEpoch}.
+ */
+ public abstract List<IncomingMessage> pull(
+ long requestTimeMsSinceEpoch,
+ SubscriptionPath subscription,
+ int batchSize,
+ boolean returnImmediately)
+ throws IOException;
+
+ /**
+ * Acknowldege messages from {@code subscription} with {@code ackIds}.
+ */
+ public abstract void acknowledge(SubscriptionPath subscription, List<String> ackIds)
+ throws IOException;
+
+ /**
+ * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to
+ * be {@code deadlineSeconds} from now.
+ */
+ public abstract void modifyAckDeadline(
+ SubscriptionPath subscription, List<String> ackIds,
+ int deadlineSeconds) throws IOException;
+
+ /**
+ * Create {@code topic}.
+ */
+ public abstract void createTopic(TopicPath topic) throws IOException;
+
+ /*
+ * Delete {@code topic}.
+ */
+ public abstract void deleteTopic(TopicPath topic) throws IOException;
+
+ /**
+ * Return a list of topics for {@code project}.
+ */
+ public abstract List<TopicPath> listTopics(ProjectPath project) throws IOException;
+
+ /**
+ * Create {@code subscription} to {@code topic}.
+ */
+ public abstract void createSubscription(
+ TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException;
+
+ /**
+ * Create a random subscription for {@code topic}. Return the {@link SubscriptionPath}. It
+ * is the responsibility of the caller to later delete the subscription.
+ */
+ public SubscriptionPath createRandomSubscription(
+ ProjectPath project, TopicPath topic, int ackDeadlineSeconds) throws IOException {
+ // Create a randomized subscription derived from the topic name.
+ String subscriptionName = topic.getName() + "_beam_" + ThreadLocalRandom.current().nextLong();
+ SubscriptionPath subscription =
+ PubsubClient
+ .subscriptionPathFromName(project.getId(), subscriptionName);
+ createSubscription(topic, subscription, ackDeadlineSeconds);
+ return subscription;
+ }
+
+ /**
+ * Delete {@code subscription}.
+ */
+ public abstract void deleteSubscription(SubscriptionPath subscription) throws IOException;
+
+ /**
+ * Return a list of subscriptions for {@code topic} in {@code project}.
+ */
+ public abstract List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
+ throws IOException;
+
+ /**
+ * Return the ack deadline, in seconds, for {@code subscription}.
+ */
+ public abstract int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException;
+
+ /**
+ * Return {@literal true} if {@link #pull} will always return empty list. Actual clients
+ * will return {@literal false}. Test clients may return {@literal true} to signal that all
+ * expected messages have been pulled and the test may complete.
+ */
+ public abstract boolean isEOF();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
index f5cfc2b..15401b7 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
@@ -24,8 +24,6 @@ import java.util.List;
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.sdk.options.PubsubOptions;
-import org.apache.beam.sdk.util.PubsubClient;
-import org.apache.beam.sdk.util.PubsubJsonClient;
/**
* Helper for working with pubsub.
http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java
new file mode 100644
index 0000000..b778a09
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark.io;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.Pubsub.Builder;
+import com.google.api.services.pubsub.model.AcknowledgeRequest;
+import com.google.api.services.pubsub.model.ListSubscriptionsResponse;
+import com.google.api.services.pubsub.model.ListTopicsResponse;
+import com.google.api.services.pubsub.model.ModifyAckDeadlineRequest;
+import com.google.api.services.pubsub.model.PublishRequest;
+import com.google.api.services.pubsub.model.PublishResponse;
+import com.google.api.services.pubsub.model.PubsubMessage;
+import com.google.api.services.pubsub.model.PullRequest;
+import com.google.api.services.pubsub.model.PullResponse;
+import com.google.api.services.pubsub.model.ReceivedMessage;
+import com.google.api.services.pubsub.model.Subscription;
+import com.google.api.services.pubsub.model.Topic;
+import com.google.auth.Credentials;
+import com.google.auth.http.HttpCredentialsAdapter;
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+import org.apache.beam.sdk.util.Transport;
+
+/**
+ * A Pubsub client using JSON transport.
+ */
+class PubsubJsonClient extends PubsubClient {
+
+ private static class PubsubJsonClientFactory implements PubsubClientFactory {
+ private static HttpRequestInitializer chainHttpRequestInitializer(
+ Credentials credential, HttpRequestInitializer httpRequestInitializer) {
+ if (credential == null) {
+ return httpRequestInitializer;
+ } else {
+ return new ChainingHttpRequestInitializer(
+ new HttpCredentialsAdapter(credential),
+ httpRequestInitializer);
+ }
+ }
+
+ @Override
+ public PubsubClient newClient(
+ @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+ throws IOException {
+ Pubsub pubsub = new Builder(
+ Transport.getTransport(),
+ Transport.getJsonFactory(),
+ chainHttpRequestInitializer(
+ options.getGcpCredential(),
+ // Do not log 404. It clutters the output and is possibly even required by the caller.
+ new RetryHttpRequestInitializer(ImmutableList.of(404))))
+ .setRootUrl(options.getPubsubRootUrl())
+ .setApplicationName(options.getAppName())
+ .setGoogleClientRequestInitializer(options.getGoogleApiTrace())
+ .build();
+ return new PubsubJsonClient(timestampLabel, idLabel, pubsub);
+ }
+
+ @Override
+ public String getKind() {
+ return "Json";
+ }
+ }
+
+ /**
+ * Factory for creating Pubsub clients using Json transport.
+ */
+ public static final PubsubClientFactory FACTORY = new PubsubJsonClientFactory();
+
+ /**
+ * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
+ * instead.
+ */
+ @Nullable
+ private final String timestampLabel;
+
+ /**
+ * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
+ */
+ @Nullable
+ private final String idLabel;
+
+ /**
+ * Underlying JSON transport.
+ */
+ private Pubsub pubsub;
+
+ @VisibleForTesting PubsubJsonClient(
+ @Nullable String timestampLabel,
+ @Nullable String idLabel,
+ Pubsub pubsub) {
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ this.pubsub = pubsub;
+ }
+
+ @Override
+ public void close() {
+ // Nothing to close.
+ }
+
+ @Override
+ public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
+ throws IOException {
+ List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size());
+ for (OutgoingMessage outgoingMessage : outgoingMessages) {
+ PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes);
+
+ Map<String, String> attributes = outgoingMessage.attributes;
+ if ((timestampLabel != null || idLabel != null) && attributes == null) {
+ attributes = new TreeMap<>();
+ }
+ if (attributes != null) {
+ pubsubMessage.setAttributes(attributes);
+ }
+
+ if (timestampLabel != null) {
+ attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+ }
+
+ if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+ attributes.put(idLabel, outgoingMessage.recordId);
+ }
+
+ pubsubMessages.add(pubsubMessage);
+ }
+ PublishRequest request = new PublishRequest().setMessages(pubsubMessages);
+ PublishResponse response = pubsub.projects()
+ .topics()
+ .publish(topic.getPath(), request)
+ .execute();
+ return response.getMessageIds().size();
+ }
+
+ @Override
+ public List<IncomingMessage> pull(
+ long requestTimeMsSinceEpoch,
+ SubscriptionPath subscription,
+ int batchSize,
+ boolean returnImmediately) throws IOException {
+ PullRequest request = new PullRequest()
+ .setReturnImmediately(returnImmediately)
+ .setMaxMessages(batchSize);
+ PullResponse response = pubsub.projects()
+ .subscriptions()
+ .pull(subscription.getPath(), request)
+ .execute();
+ if (response.getReceivedMessages() == null || response.getReceivedMessages().size() == 0) {
+ return ImmutableList.of();
+ }
+ List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessages().size());
+ for (ReceivedMessage message : response.getReceivedMessages()) {
+ PubsubMessage pubsubMessage = message.getMessage();
+ @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
+
+ // Payload.
+ byte[] elementBytes = pubsubMessage.decodeData();
+
+ // Timestamp.
+ long timestampMsSinceEpoch =
+ extractTimestamp(timestampLabel, message.getMessage().getPublishTime(), attributes);
+
+ // Ack id.
+ String ackId = message.getAckId();
+ checkState(!Strings.isNullOrEmpty(ackId));
+
+ // Record id, if any.
+ @Nullable String recordId = null;
+ if (idLabel != null && attributes != null) {
+ recordId = attributes.get(idLabel);
+ }
+ if (Strings.isNullOrEmpty(recordId)) {
+ // Fall back to the Pubsub provided message id.
+ recordId = pubsubMessage.getMessageId();
+ }
+
+ incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
+ requestTimeMsSinceEpoch, ackId, recordId));
+ }
+
+ return incomingMessages;
+ }
+
+ @Override
+ public void acknowledge(SubscriptionPath subscription, List<String> ackIds) throws IOException {
+ AcknowledgeRequest request = new AcknowledgeRequest().setAckIds(ackIds);
+ pubsub.projects()
+ .subscriptions()
+ .acknowledge(subscription.getPath(), request)
+ .execute(); // ignore Empty result.
+ }
+
+ @Override
+ public void modifyAckDeadline(
+ SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds)
+ throws IOException {
+ ModifyAckDeadlineRequest request =
+ new ModifyAckDeadlineRequest().setAckIds(ackIds)
+ .setAckDeadlineSeconds(deadlineSeconds);
+ pubsub.projects()
+ .subscriptions()
+ .modifyAckDeadline(subscription.getPath(), request)
+ .execute(); // ignore Empty result.
+ }
+
+ @Override
+ public void createTopic(TopicPath topic) throws IOException {
+ pubsub.projects()
+ .topics()
+ .create(topic.getPath(), new Topic())
+ .execute(); // ignore Topic result.
+ }
+
+ @Override
+ public void deleteTopic(TopicPath topic) throws IOException {
+ pubsub.projects()
+ .topics()
+ .delete(topic.getPath())
+ .execute(); // ignore Empty result.
+ }
+
+ @Override
+ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
+ ListTopicsResponse response = pubsub.projects()
+ .topics()
+ .list(project.getPath())
+ .execute();
+ if (response.getTopics() == null || response.getTopics().isEmpty()) {
+ return ImmutableList.of();
+ }
+ List<TopicPath> topics = new ArrayList<>(response.getTopics().size());
+ for (Topic topic : response.getTopics()) {
+ topics.add(topicPathFromPath(topic.getName()));
+ }
+ return topics;
+ }
+
+ @Override
+ public void createSubscription(
+ TopicPath topic, SubscriptionPath subscription,
+ int ackDeadlineSeconds) throws IOException {
+ Subscription request = new Subscription()
+ .setTopic(topic.getPath())
+ .setAckDeadlineSeconds(ackDeadlineSeconds);
+ pubsub.projects()
+ .subscriptions()
+ .create(subscription.getPath(), request)
+ .execute(); // ignore Subscription result.
+ }
+
+ @Override
+ public void deleteSubscription(SubscriptionPath subscription) throws IOException {
+ pubsub.projects()
+ .subscriptions()
+ .delete(subscription.getPath())
+ .execute(); // ignore Empty result.
+ }
+
+ @Override
+ public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
+ throws IOException {
+ ListSubscriptionsResponse response = pubsub.projects()
+ .subscriptions()
+ .list(project.getPath())
+ .execute();
+ if (response.getSubscriptions() == null || response.getSubscriptions().isEmpty()) {
+ return ImmutableList.of();
+ }
+ List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptions().size());
+ for (Subscription subscription : response.getSubscriptions()) {
+ if (subscription.getTopic().equals(topic.getPath())) {
+ subscriptions.add(subscriptionPathFromPath(subscription.getName()));
+ }
+ }
+ return subscriptions;
+ }
+
+ @Override
+ public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
+ Subscription response = pubsub.projects().subscriptions().get(subscription.getPath()).execute();
+ return response.getAckDeadlineSeconds();
+ }
+
+ @Override
+ public boolean isEOF() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java
new file mode 100644
index 0000000..125a8d6
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark.io;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.client.util.Clock;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PubsubOptions;
+
+/**
+ * A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for
+ * testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline}
+ * methods. Relies on statics to mimic the Pubsub service, though we try to hide that.
+ */
+class PubsubTestClient extends PubsubClient implements Serializable {
+ /**
+ * Mimic the state of the simulated Pubsub 'service'.
+ *
+ * <p>Note that the {@link PubsubTestClientFactory} is serialized/deserialized even when running
+ * test pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s to be created
+ * from the same client factory and run in parallel. Thus we can't enforce aliasing of the
+ * following data structures over all clients and must resort to a static.
+ */
+ private static class State {
+ /**
+ * True if has been primed for a test but not yet validated.
+ */
+ boolean isActive;
+
+ /**
+ * Publish mode only: Only publish calls for this topic are allowed.
+ */
+ @Nullable
+ TopicPath expectedTopic;
+
+ /**
+ * Publish mode only: Messages yet to seen in a {@link #publish} call.
+ */
+ @Nullable
+ Set<OutgoingMessage> remainingExpectedOutgoingMessages;
+
+ /**
+ * Publish mode only: Messages which should throw when first sent to simulate transient publish
+ * failure.
+ */
+ @Nullable
+ Set<OutgoingMessage> remainingFailingOutgoingMessages;
+
+ /**
+ * Pull mode only: Clock from which to get current time.
+ */
+ @Nullable
+ Clock clock;
+
+ /**
+ * Pull mode only: Only pull calls for this subscription are allowed.
+ */
+ @Nullable
+ SubscriptionPath expectedSubscription;
+
+ /**
+ * Pull mode only: Timeout to simulate.
+ */
+ int ackTimeoutSec;
+
+ /**
+ * Pull mode only: Messages waiting to be received by a {@link #pull} call.
+ */
+ @Nullable
+ List<IncomingMessage> remainingPendingIncomingMessages;
+
+ /**
+ * Pull mode only: Messages which have been returned from a {@link #pull} call and
+ * not yet ACKed by an {@link #acknowledge} call.
+ */
+ @Nullable
+ Map<String, IncomingMessage> pendingAckIncomingMessages;
+
+ /**
+ * Pull mode only: When above messages are due to have their ACK deadlines expire.
+ */
+ @Nullable
+ Map<String, Long> ackDeadline;
+ }
+
+ private static final State STATE = new State();
+
+ /** Closing the factory will validate all expected messages were processed. */
+ public interface PubsubTestClientFactory
+ extends PubsubClientFactory, Closeable, Serializable {
+ }
+
+ /**
+ * Return a factory for testing publishers. Only one factory may be in-flight at a time.
+ * The factory must be closed when the test is complete, at which point final validation will
+ * occur.
+ */
+ static PubsubTestClientFactory createFactoryForPublish(
+ final TopicPath expectedTopic,
+ final Iterable<OutgoingMessage> expectedOutgoingMessages,
+ final Iterable<OutgoingMessage> failingOutgoingMessages) {
+ synchronized (STATE) {
+ checkState(!STATE.isActive, "Test still in flight");
+ STATE.expectedTopic = expectedTopic;
+ STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages);
+ STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages);
+ STATE.isActive = true;
+ }
+ return new PubsubTestClientFactory() {
+ @Override
+ public PubsubClient newClient(
+ @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+ throws IOException {
+ return new PubsubTestClient();
+ }
+
+ @Override
+ public String getKind() {
+ return "PublishTest";
+ }
+
+ @Override
+ public void close() {
+ synchronized (STATE) {
+ checkState(STATE.isActive, "No test still in flight");
+ checkState(STATE.remainingExpectedOutgoingMessages.isEmpty(),
+ "Still waiting for %s messages to be published",
+ STATE.remainingExpectedOutgoingMessages.size());
+ STATE.isActive = false;
+ STATE.remainingExpectedOutgoingMessages = null;
+ }
+ }
+ };
+ }
+
+ /**
+ * Return a factory for testing subscribers. Only one factory may be in-flight at a time.
+ * The factory must be closed when the test in complete
+ */
+ public static PubsubTestClientFactory createFactoryForPull(
+ final Clock clock,
+ final SubscriptionPath expectedSubscription,
+ final int ackTimeoutSec,
+ final Iterable<IncomingMessage> expectedIncomingMessages) {
+ synchronized (STATE) {
+ checkState(!STATE.isActive, "Test still in flight");
+ STATE.clock = clock;
+ STATE.expectedSubscription = expectedSubscription;
+ STATE.ackTimeoutSec = ackTimeoutSec;
+ STATE.remainingPendingIncomingMessages = Lists.newArrayList(expectedIncomingMessages);
+ STATE.pendingAckIncomingMessages = new HashMap<>();
+ STATE.ackDeadline = new HashMap<>();
+ STATE.isActive = true;
+ }
+ return new PubsubTestClientFactory() {
+ @Override
+ public PubsubClient newClient(
+ @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+ throws IOException {
+ return new PubsubTestClient();
+ }
+
+ @Override
+ public String getKind() {
+ return "PullTest";
+ }
+
+ @Override
+ public void close() {
+ synchronized (STATE) {
+ checkState(STATE.isActive, "No test still in flight");
+ checkState(STATE.remainingPendingIncomingMessages.isEmpty(),
+ "Still waiting for %s messages to be pulled",
+ STATE.remainingPendingIncomingMessages.size());
+ checkState(STATE.pendingAckIncomingMessages.isEmpty(),
+ "Still waiting for %s messages to be ACKed",
+ STATE.pendingAckIncomingMessages.size());
+ checkState(STATE.ackDeadline.isEmpty(),
+ "Still waiting for %s messages to be ACKed",
+ STATE.ackDeadline.size());
+ STATE.isActive = false;
+ STATE.remainingPendingIncomingMessages = null;
+ STATE.pendingAckIncomingMessages = null;
+ STATE.ackDeadline = null;
+ }
+ }
+ };
+ }
+
+ public static PubsubTestClientFactory createFactoryForCreateSubscription() {
+ return new PubsubTestClientFactory() {
+ int numCalls = 0;
+
+ @Override
+ public void close() throws IOException {
+ checkState(
+ numCalls == 1, "Expected exactly one subscription to be created, got %s", numCalls);
+ }
+
+ @Override
+ public PubsubClient newClient(
+ @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
+ throws IOException {
+ return new PubsubTestClient() {
+ @Override
+ public void createSubscription(
+ TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds)
+ throws IOException {
+ checkState(numCalls == 0, "Expected at most one subscription to be created");
+ numCalls++;
+ }
+ };
+ }
+
+ @Override
+ public String getKind() {
+ return "CreateSubscriptionTest";
+ }
+ };
+ }
+
+ /**
+ * Return true if in pull mode.
+ */
+ private boolean inPullMode() {
+ checkState(STATE.isActive, "No test is active");
+ return STATE.expectedSubscription != null;
+ }
+
+ /**
+ * Return true if in publish mode.
+ */
+ private boolean inPublishMode() {
+ checkState(STATE.isActive, "No test is active");
+ return STATE.expectedTopic != null;
+ }
+
+ /**
+ * For subscription mode only:
+ * Track progression of time according to the {@link Clock} passed . This will simulate Pubsub
+ * expiring
+ * outstanding ACKs.
+ */
+ public void advance() {
+ synchronized (STATE) {
+ checkState(inPullMode(), "Can only advance in pull mode");
+ // Any messages who's ACKs timed out are available for re-pulling.
+ Iterator<Map.Entry<String, Long>> deadlineItr = STATE.ackDeadline.entrySet().iterator();
+ while (deadlineItr.hasNext()) {
+ Map.Entry<String, Long> entry = deadlineItr.next();
+ if (entry.getValue() <= STATE.clock.currentTimeMillis()) {
+ STATE.remainingPendingIncomingMessages.add(
+ STATE.pendingAckIncomingMessages.remove(entry.getKey()));
+ deadlineItr.remove();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public int publish(
+ TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException {
+ synchronized (STATE) {
+ checkState(inPublishMode(), "Can only publish in publish mode");
+ checkState(topic.equals(STATE.expectedTopic), "Topic %s does not match expected %s", topic,
+ STATE.expectedTopic);
+ for (OutgoingMessage outgoingMessage : outgoingMessages) {
+ if (STATE.remainingFailingOutgoingMessages.remove(outgoingMessage)) {
+ throw new RuntimeException("Simulating failure for " + outgoingMessage);
+ }
+ checkState(STATE.remainingExpectedOutgoingMessages.remove(outgoingMessage),
+ "Unexpected outgoing message %s", outgoingMessage);
+ }
+ return outgoingMessages.size();
+ }
+ }
+
+ @Override
+ public List<IncomingMessage> pull(
+ long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize,
+ boolean returnImmediately) throws IOException {
+ synchronized (STATE) {
+ checkState(inPullMode(), "Can only pull in pull mode");
+ long now = STATE.clock.currentTimeMillis();
+ checkState(requestTimeMsSinceEpoch == now,
+ "Simulated time %s does not match request time %s", now, requestTimeMsSinceEpoch);
+ checkState(subscription.equals(STATE.expectedSubscription),
+ "Subscription %s does not match expected %s", subscription,
+ STATE.expectedSubscription);
+ checkState(returnImmediately, "Pull only supported if returning immediately");
+
+ List<IncomingMessage> incomingMessages = new ArrayList<>();
+ Iterator<IncomingMessage> pendItr = STATE.remainingPendingIncomingMessages.iterator();
+ while (pendItr.hasNext()) {
+ IncomingMessage incomingMessage = pendItr.next();
+ pendItr.remove();
+ IncomingMessage incomingMessageWithRequestTime =
+ incomingMessage.withRequestTime(requestTimeMsSinceEpoch);
+ incomingMessages.add(incomingMessageWithRequestTime);
+ STATE.pendingAckIncomingMessages.put(incomingMessageWithRequestTime.ackId,
+ incomingMessageWithRequestTime);
+ STATE.ackDeadline.put(incomingMessageWithRequestTime.ackId,
+ requestTimeMsSinceEpoch + STATE.ackTimeoutSec * 1000);
+ if (incomingMessages.size() >= batchSize) {
+ break;
+ }
+ }
+ return incomingMessages;
+ }
+ }
+
+ @Override
+ public void acknowledge(
+ SubscriptionPath subscription,
+ List<String> ackIds) throws IOException {
+ synchronized (STATE) {
+ checkState(inPullMode(), "Can only acknowledge in pull mode");
+ checkState(subscription.equals(STATE.expectedSubscription),
+ "Subscription %s does not match expected %s", subscription,
+ STATE.expectedSubscription);
+
+ for (String ackId : ackIds) {
+ checkState(STATE.ackDeadline.remove(ackId) != null,
+ "No message with ACK id %s is waiting for an ACK", ackId);
+ checkState(STATE.pendingAckIncomingMessages.remove(ackId) != null,
+ "No message with ACK id %s is waiting for an ACK", ackId);
+ }
+ }
+ }
+
+ @Override
+ public void modifyAckDeadline(
+ SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) throws IOException {
+ synchronized (STATE) {
+ checkState(inPullMode(), "Can only modify ack deadline in pull mode");
+ checkState(subscription.equals(STATE.expectedSubscription),
+ "Subscription %s does not match expected %s", subscription,
+ STATE.expectedSubscription);
+
+ for (String ackId : ackIds) {
+ if (deadlineSeconds > 0) {
+ checkState(STATE.ackDeadline.remove(ackId) != null,
+ "No message with ACK id %s is waiting for an ACK", ackId);
+ checkState(STATE.pendingAckIncomingMessages.containsKey(ackId),
+ "No message with ACK id %s is waiting for an ACK", ackId);
+ STATE.ackDeadline.put(ackId, STATE.clock.currentTimeMillis() + deadlineSeconds * 1000);
+ } else {
+ checkState(STATE.ackDeadline.remove(ackId) != null,
+ "No message with ACK id %s is waiting for an ACK", ackId);
+ IncomingMessage message = STATE.pendingAckIncomingMessages.remove(ackId);
+ checkState(message != null, "No message with ACK id %s is waiting for an ACK", ackId);
+ STATE.remainingPendingIncomingMessages.add(message);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void createTopic(TopicPath topic) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteTopic(TopicPath topic) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createSubscription(
+ TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteSubscription(SubscriptionPath subscription) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<SubscriptionPath> listSubscriptions(
+ ProjectPath project, TopicPath topic) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
+ synchronized (STATE) {
+ return STATE.ackTimeoutSec;
+ }
+ }
+
+ @Override
+ public boolean isEOF() {
+ synchronized (STATE) {
+ checkState(inPullMode(), "Can only check EOF in pull mode");
+ return STATE.remainingPendingIncomingMessages.isEmpty();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/java/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/pom.xml b/integration/java/pom.xml
new file mode 100644
index 0000000..dcad4c3
--- /dev/null
+++ b/integration/java/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-integration-parent</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-integration-java-parent</artifactId>
+ <packaging>pom</packaging>
+ <name>Apache Beam :: Integration Tests :: Java</name>
+
+ <modules>
+ <module>nexmark</module>
+ </modules>
+
+</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/integration/pom.xml
----------------------------------------------------------------------
diff --git a/integration/pom.xml b/integration/pom.xml
new file mode 100644
index 0000000..4839da5
--- /dev/null
+++ b/integration/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-parent</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-integration-parent</artifactId>
+ <packaging>pom</packaging>
+ <name>Apache Beam :: Integration Tests</name>
+
+ <modules>
+ <module>java</module>
+ </modules>
+
+</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/8098bb1d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c92d391..bddbf1f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -187,7 +187,7 @@
<module>sdks</module>
<module>runners</module>
<module>examples</module>
- <module>integration/java/nexmark</module>
+ <module>integration</module>
<!-- sdks/java/javadoc builds project-wide Javadoc. It has to run last. -->
<module>sdks/java/javadoc</module>
</modules>