You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2019/07/05 12:46:17 UTC
[flink] 02/05: [FLINK-9311] [pubsub] Add unit and integration tests
for PubSub connectors
This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a96277090358d2237aab7426ebde7fdf1eaccbe4
Author: Niels Basjes <nb...@bol.com>
AuthorDate: Wed Aug 15 14:05:39 2018 +0200
[FLINK-9311] [pubsub] Add unit and integration tests for PubSub connectors
---
flink-connectors/flink-connector-pubsub/pom.xml | 86 +++----
.../flink/streaming/connectors/pubsub/Bound.java | 17 ++
.../connectors/pubsub/BoundedPubSubSource.java | 33 ++-
.../streaming/connectors/pubsub/PubSubSink.java | 141 +++++------
.../streaming/connectors/pubsub/PubSubSource.java | 40 ++--
.../connectors/pubsub/SubscriberWrapper.java | 22 +-
.../common/SerializableCredentialsProvider.java | 11 +-
.../streaming/connectors/pubsub/BoundTest.java | 20 +-
.../connectors/pubsub/BoundedPubSubSourceTest.java | 25 +-
.../connectors/pubsub/PubSubSourceTest.java | 5 +-
.../src/test/resources/log4j-test.properties | 24 ++
.../flink-connector-pubsub-emulator-tests}/pom.xml | 112 +++++----
.../connectors/pubsub/CheckPubSubEmulatorTest.java | 115 +++++++++
.../connectors/pubsub/EmulatedPubSubSinkTest.java | 109 +++++++++
.../pubsub/EmulatedPubSubSourceTest.java | 116 +++++++++
.../pubsub/emulator/GCloudEmulatorManager.java | 264 +++++++++++++++++++++
.../pubsub/emulator/GCloudUnitTestBase.java | 84 +++++++
.../connectors/pubsub/emulator/PubsubHelper.java | 232 ++++++++++++++++++
.../src/test/resources/log4j-test.properties | 24 ++
flink-end-to-end-tests/pom.xml | 1 +
flink-end-to-end-tests/run-nightly-tests.sh | 2 +
.../test-scripts/test_streaming_pubsub.sh | 22 ++
.../streaming/examples/pubsub/PubSubExample.java | 7 +-
23 files changed, 1309 insertions(+), 203 deletions(-)
diff --git a/flink-connectors/flink-connector-pubsub/pom.xml b/flink-connectors/flink-connector-pubsub/pom.xml
index a6e8d72..f50cbdd 100644
--- a/flink-connectors/flink-connector-pubsub/pom.xml
+++ b/flink-connectors/flink-connector-pubsub/pom.xml
@@ -35,12 +35,20 @@ under the License.
<packaging>jar</packaging>
- <properties>
- <pubsub.version>1.37.1</pubsub.version>
- </properties>
+ <!-- This is the way we get a consistent set of versions of the Google tools -->
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>com.google.cloud</groupId>
+ <artifactId>google-cloud-bom</artifactId>
+ <version>0.53.0-alpha</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
<dependencies>
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
@@ -51,7 +59,29 @@ under the License.
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
- <version>${pubsub.version}</version>
+ <!-- Version is pulled from google-cloud-bom -->
+ <exclusions>
+ <!-- Exclude an old version of guava that is being pulled
+ in by a transitive dependency of google-api-client -->
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-jdk5</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
@@ -69,51 +99,5 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
-
</dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <id>shade-flink</id>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <shadeTestJar>false</shadeTestJar>
- <artifactSet>
- <includes>
- <include>*:*</include>
- </includes>
- </artifactSet>
- <relocations>
- <relocation>
- <pattern>com.google.guava</pattern>
- <shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.guava</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.google.common.base</pattern>
- <shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.common.base</shadedPattern>
- </relocation>
- <relocation>
- <pattern>io.grpc.auth</pattern>
- <shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.auth</shadedPattern>
- </relocation>
- <relocation>
- <pattern>io.grpc.protobuf</pattern>
- <shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.protobuf</shadedPattern>
- </relocation>
- </relocations>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
</project>
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
index b37cc45..727f32e 100644
--- a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.flink.streaming.connectors.pubsub;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java
index 5f829ae..83fc15e 100644
--- a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.flink.streaming.connectors.pubsub;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
@@ -5,7 +22,11 @@ import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
-class BoundedPubSubSource<OUT> extends PubSubSource<OUT> {
+/**
+ * A bounded PubSub Source, similar to {@link PubSubSource} but this will stop at some point. For example after a period of idle or and after n amount of messages have been received.
+ *
+ */
+public class BoundedPubSubSource<OUT> extends PubSubSource<OUT> {
private Bound<OUT> bound;
private BoundedPubSubSource() {
@@ -28,11 +49,19 @@ class BoundedPubSubSource<OUT> extends PubSubSource<OUT> {
bound.receivedMessage();
}
+ /**
+ * Creates a {@link BoundedPubSubSourceBuilder}.
+ * @param <OUT> Type of Object which will be read by the produced {@link BoundedPubSubSource}
+ */
@SuppressWarnings("unchecked")
public static <OUT> BoundedPubSubSourceBuilder<OUT, ? extends PubSubSource, ? extends BoundedPubSubSourceBuilder> newBuilder() {
return new BoundedPubSubSourceBuilder<>(new BoundedPubSubSource<OUT>());
}
+ /**
+ * Builder to create BoundedPubSubSource.
+ * @param <OUT> Type of Object which will be read by the BoundedPubSubSource
+ */
@SuppressWarnings("unchecked")
public static class BoundedPubSubSourceBuilder<OUT, PSS extends BoundedPubSubSource<OUT>, BUILDER extends BoundedPubSubSourceBuilder<OUT, PSS, BUILDER>> extends PubSubSourceBuilder<OUT, PSS, BUILDER> {
private Long boundedByAmountOfMessages;
@@ -52,7 +81,7 @@ class BoundedPubSubSource<OUT> extends PubSubSource<OUT> {
return (BUILDER) this;
}
- private Bound <OUT> createBound() {
+ private Bound<OUT> createBound() {
if (boundedByAmountOfMessages != null && boundedByTimeSinceLastMessage != null) {
return Bound.boundByAmountOfMessagesOrTimeSinceLastMessage(boundedByAmountOfMessages, boundedByTimeSinceLastMessage);
}
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
index 92c9c6c..e6ac53e 100644
--- a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.connectors.pubsub;
-import com.google.api.gax.core.NoCredentialsProvider;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -40,22 +39,35 @@ import java.io.IOException;
/**
* A sink function that outputs to PubSub.
+ *
* @param <IN> type of PubSubSink messages to write
*/
public class PubSubSink<IN> extends RichSinkFunction<IN> {
- private final SerializableCredentialsProvider serializableCredentialsProvider;
- private final SerializationSchema<IN> serializationSchema;
- private final String projectName;
- private final String topicName;
- private String hostAndPort = null;
+ private SerializableCredentialsProvider serializableCredentialsProvider;
+ private SerializationSchema<IN> serializationSchema;
+ private String projectName;
+ private String topicName;
+ private String hostAndPort = null;
private transient Publisher publisher;
- public PubSubSink(SerializableCredentialsProvider serializableCredentialsProvider, SerializationSchema<IN> serializationSchema, String projectName, String topicName) {
+ private PubSubSink() {
+ }
+
+ void setSerializableCredentialsProvider(SerializableCredentialsProvider serializableCredentialsProvider) {
this.serializableCredentialsProvider = serializableCredentialsProvider;
+ }
+
+ void setSerializationSchema(SerializationSchema<IN> serializationSchema) {
this.serializationSchema = serializationSchema;
+ }
+
+ void setProjectName(String projectName) {
this.projectName = projectName;
+ }
+
+ void setTopicName(String topicName) {
this.topicName = topicName;
}
@@ -64,15 +76,29 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> {
* The ONLY reason to use this is during tests with the emulator provided by Google.
*
* @param hostAndPort The combination of hostname and port to connect to ("hostname:1234")
- * @return The current instance
*/
- public PubSubSink<IN> withHostAndPort(String hostAndPort) {
+ void withHostAndPort(String hostAndPort) {
this.hostAndPort = hostAndPort;
- return this;
}
- private transient ManagedChannel managedChannel = null;
- private transient TransportChannel channel = null;
+ void initialize() throws IOException {
+ if (serializableCredentialsProvider == null) {
+ serializableCredentialsProvider = SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables();
+ }
+ if (serializationSchema == null) {
+ throw new IllegalArgumentException("The serializationSchema has not been specified.");
+ }
+ if (projectName == null) {
+ throw new IllegalArgumentException("The projectName has not been specified.");
+ }
+ if (topicName == null) {
+ throw new IllegalArgumentException("The topicName has not been specified.");
+ }
+ }
+
+
+ private transient ManagedChannel managedChannel = null;
+ private transient TransportChannel channel = null;
@Override
public void open(Configuration configuration) throws Exception {
@@ -114,127 +140,110 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> {
/**
* Create a builder for a new PubSubSink.
+ *
* @param <IN> The generic of the type that is to be written into the sink.
* @return a new PubSubSinkBuilder instance
*/
- public static <IN> PubSubSinkBuilder<IN> newBuilder() {
- return new PubSubSinkBuilder<>();
+ public static <IN> PubSubSinkBuilder<IN, ? extends PubSubSink<IN>, ? extends PubSubSinkBuilder<IN, ?, ?>> newBuilder() {
+ return new PubSubSinkBuilder<>(new PubSubSink<>());
}
/**
* PubSubSinkBuilder to create a PubSubSink.
+ *
* @param <IN> Type of PubSubSink to create.
*/
- public static class PubSubSinkBuilder<IN> {
- private SerializableCredentialsProvider serializableCredentialsProvider = null;
- private SerializationSchema<IN> serializationSchema = null;
- private String projectName = null;
- private String topicName = null;
- private String hostAndPort = null;
-
- private PubSubSinkBuilder() {
+ @SuppressWarnings("unchecked")
+ public static class PubSubSinkBuilder<IN, PSS extends PubSubSink<IN>, BUILDER extends PubSubSinkBuilder<IN, PSS, BUILDER>> {
+ protected PSS sinkUnderConstruction;
+
+ private PubSubSinkBuilder(PSS sinkUnderConstruction) {
+ this.sinkUnderConstruction = sinkUnderConstruction;
}
/**
* Set the credentials.
* If this is not used then the credentials are picked up from the environment variables.
+ *
* @param credentials the Credentials needed to connect.
* @return The current PubSubSinkBuilder instance
*/
- public PubSubSinkBuilder<IN> withCredentials(Credentials credentials) {
- this.serializableCredentialsProvider = new SerializableCredentialsProvider(credentials);
- return this;
+ public BUILDER withCredentials(Credentials credentials) {
+ sinkUnderConstruction.setSerializableCredentialsProvider(new SerializableCredentialsProvider(credentials));
+ return (BUILDER) this;
}
/**
* Set the CredentialsProvider.
* If this is not used then the credentials are picked up from the environment variables.
+ *
* @param credentialsProvider the custom SerializableCredentialsProvider instance.
* @return The current PubSubSinkBuilder instance
*/
- public PubSubSinkBuilder<IN> withCredentialsProvider(CredentialsProvider credentialsProvider) throws IOException {
+ public BUILDER withCredentialsProvider(CredentialsProvider credentialsProvider) throws IOException {
return withCredentials(credentialsProvider.getCredentials());
}
/**
* Set the credentials to be absent.
* This means that no credentials are to be used at all.
+ *
* @return The current PubSubSinkBuilder instance
*/
- public PubSubSinkBuilder<IN> withoutCredentials() {
- this.serializableCredentialsProvider = SerializableCredentialsProvider.withoutCredentials();
- return this;
+ public BUILDER withoutCredentials() {
+ sinkUnderConstruction.setSerializableCredentialsProvider(SerializableCredentialsProvider.withoutCredentials());
+ return (BUILDER) this;
}
/**
* @param serializationSchema Instance of a SerializationSchema that converts the IN into a byte[]
* @return The current PubSubSinkBuilder instance
*/
- public PubSubSinkBuilder<IN> withSerializationSchema(SerializationSchema<IN> serializationSchema) {
- this.serializationSchema = serializationSchema;
- return this;
+ public BUILDER withSerializationSchema(SerializationSchema<IN> serializationSchema) {
+ sinkUnderConstruction.setSerializationSchema(serializationSchema);
+ return (BUILDER) this;
}
/**
* @param projectName The name of the project in PubSub
* @return The current PubSubSinkBuilder instance
*/
- public PubSubSinkBuilder<IN> withProjectName (String projectName) {
- this.projectName = projectName;
- return this;
+ public BUILDER withProjectName(String projectName) {
+ sinkUnderConstruction.setProjectName(projectName);
+ return (BUILDER) this;
}
/**
* @param topicName The name of the topic in PubSub
* @return The current PubSubSinkBuilder instance
*/
- public PubSubSinkBuilder<IN> withTopicName (String topicName) {
- this.topicName = topicName;
- return this;
+ public BUILDER withTopicName(String topicName) {
+ sinkUnderConstruction.setTopicName(topicName);
+ return (BUILDER) this;
}
-
/**
* Set the custom hostname/port combination of PubSub.
* The ONLY reason to use this is during tests with the emulator provided by Google.
+ *
* @param hostAndPort The combination of hostname and port to connect to ("hostname:1234")
* @return The current PubSubSinkBuilder instance
*/
- public PubSubSinkBuilder<IN> withHostAndPort(String hostAndPort) {
- this.hostAndPort = hostAndPort;
- return this;
+ public BUILDER withHostAndPort(String hostAndPort) {
+ sinkUnderConstruction.withHostAndPort(hostAndPort);
+ return (BUILDER) this;
}
/**
* Actually builder the desired instance of the PubSubSink.
+ *
* @return a brand new PubSubSink
- * @throws IOException incase of a problem getting the credentials
+ * @throws IOException incase of a problem getting the credentials
* @throws IllegalArgumentException incase required fields were not specified.
*/
public PubSubSink<IN> build() throws IOException {
- if (serializableCredentialsProvider == null) {
- serializableCredentialsProvider = SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables();
- }
- if (serializationSchema == null) {
- throw new IllegalArgumentException("The serializationSchema has not been specified.");
- }
- if (projectName == null) {
- throw new IllegalArgumentException("The projectName has not been specified.");
- }
- if (topicName == null) {
- throw new IllegalArgumentException("The topicName has not been specified.");
- }
-
- PubSubSink<IN> pubSubSink = new PubSubSink<>(
- serializableCredentialsProvider,
- serializationSchema,
- projectName, topicName);
-
- if (hostAndPort != null) {
- pubSubSink.withHostAndPort(hostAndPort);
- }
-
- return pubSubSink;
+ sinkUnderConstruction.initialize();
+ return sinkUnderConstruction;
}
}
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java
index 8f8c689..2d93998 100644
--- a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java
@@ -43,7 +43,7 @@ import java.util.List;
*/
public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, AckReplyConsumer> implements MessageReceiver, ResultTypeQueryable<OUT>, ParallelSourceFunction<OUT> {
private DeserializationSchema<OUT> deserializationSchema;
- private SubscriberWrapper subscriberWrapper;
+ private SubscriberWrapper subscriberWrapper;
protected transient SourceContext<OUT> sourceContext = null;
@@ -64,7 +64,8 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase
super.open(configuration);
subscriberWrapper.initialize(this);
if (hasNoCheckpointingEnabled(getRuntimeContext())) {
- throw new IllegalArgumentException("Checkpointing needs to be enabled to support: PubSub ATLEAST_ONCE");
+ throw new IllegalArgumentException("The PubSubSource REQUIRES Checkpointing to be enabled and " +
+ "the checkpointing frequency must be MUCH lower than the PubSub timeout for it to retry a message.");
}
}
@@ -123,26 +124,27 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase
return deserializationSchema.getProducedType();
}
- @SuppressWarnings("unchecked")
- public static <OUT> PubSubSourceBuilder<OUT, ? extends PubSubSource, ? extends PubSubSourceBuilder> newBuilder() {
- return new PubSubSourceBuilder<>(new PubSubSource<OUT>());
+ public static <OUT> PubSubSourceBuilder<OUT, ? extends PubSubSource<OUT>, ? extends PubSubSourceBuilder<OUT, ?, ?>> newBuilder() {
+ return new PubSubSourceBuilder<>(new PubSubSource<>());
}
/**
* Builder to create PubSubSource.
- * @param <OUT> The type of objects which will be read
- * @param <PSS> The type of PubSubSource
+ *
+ * @param <OUT> The type of objects which will be read
+ * @param <PSS> The type of PubSubSource
* @param <BUILDER> The type of Builder to create the PubSubSource
*/
+ @SuppressWarnings("unchecked")
public static class PubSubSourceBuilder<OUT, PSS extends PubSubSource<OUT>, BUILDER extends PubSubSourceBuilder<OUT, PSS, BUILDER>> {
- protected PSS sourceUnderConstruction;
+ protected PSS sourceUnderConstruction;
- private SubscriberWrapper subscriberWrapper = null;
+ private SubscriberWrapper subscriberWrapper = null;
private SerializableCredentialsProvider serializableCredentialsProvider;
- private DeserializationSchema<OUT> deserializationSchema;
- private String projectName;
- private String subscriptionName;
- private String hostAndPort;
+ private DeserializationSchema<OUT> deserializationSchema;
+ private String projectName;
+ private String subscriptionName;
+ private String hostAndPort;
protected PubSubSourceBuilder(PSS sourceUnderConstruction) {
this.sourceUnderConstruction = sourceUnderConstruction;
@@ -151,6 +153,7 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase
/**
* Set the credentials.
* If this is not used then the credentials are picked up from the environment variables.
+ *
* @param credentials the Credentials needed to connect.
* @return The current PubSubSourceBuilder instance
*/
@@ -162,6 +165,7 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase
/**
* Set the CredentialsProvider.
* If this is not used then the credentials are picked up from the environment variables.
+ *
* @param credentialsProvider the custom SerializableCredentialsProvider instance.
* @return The current PubSubSourceBuilder instance
*/
@@ -172,6 +176,7 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase
/**
* Set the credentials to be absent.
* This means that no credentials are to be used at all.
+ *
* @return The current PubSubSourceBuilder instance
*/
public BUILDER withoutCredentials() {
@@ -183,13 +188,13 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase
* @param deserializationSchema Instance of a DeserializationSchema that converts the OUT into a byte[]
* @return The current PubSubSourceBuilder instance
*/
- public BUILDER withDeserializationSchema(DeserializationSchema <OUT> deserializationSchema) {
+ public BUILDER withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema) {
this.deserializationSchema = deserializationSchema;
return (BUILDER) this;
}
/**
- * @param projectName The name of the project in GoogleCloudPlatform
+ * @param projectName The name of the project in GoogleCloudPlatform
* @param subscriptionName The name of the subscription in PubSub
* @return The current PubSubSourceBuilder instance
*/
@@ -202,6 +207,7 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase
/**
* Set the custom hostname/port combination of PubSub.
* The ONLY reason to use this is during tests with the emulator provided by Google.
+ *
* @param hostAndPort The combination of hostname and port to connect to ("hostname:1234")
* @return The current PubSubSourceBuilder instance
*/
@@ -213,6 +219,7 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase
/**
* Set a complete SubscriberWrapper.
* The ONLY reason to use this is during tests.
+ *
* @param subscriberWrapper The fully instantiated SubscriberWrapper
* @return The current PubSubSourceBuilder instance
*/
@@ -223,8 +230,9 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase
/**
* Actually build the desired instance of the PubSubSourceBuilder.
+ *
* @return a brand new SourceFunction
- * @throws IOException incase of a problem getting the credentials
+ * @throws IOException incase of a problem getting the credentials
* @throws IllegalArgumentException incase required fields were not specified.
*/
public PSS build() throws IOException {
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java
index fb75f43..0595877 100644
--- a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java
@@ -33,13 +33,13 @@ import java.io.Serializable;
class SubscriberWrapper implements Serializable {
private final SerializableCredentialsProvider serializableCredentialsProvider;
- private final String projectId;
- private final String subscriptionId;
- private String hostAndPort = null;
+ private final String projectId;
+ private final String subscriptionId;
+ private String hostAndPort = null;
- private transient Subscriber subscriber;
- private transient ManagedChannel managedChannel = null;
- private transient TransportChannel channel = null;
+ private transient Subscriber subscriber;
+ private transient ManagedChannel managedChannel = null;
+ private transient TransportChannel channel = null;
SubscriberWrapper(SerializableCredentialsProvider serializableCredentialsProvider, ProjectSubscriptionName projectSubscriptionName) {
this.serializableCredentialsProvider = serializableCredentialsProvider;
@@ -49,14 +49,14 @@ class SubscriberWrapper implements Serializable {
void initialize(MessageReceiver messageReceiver) {
Subscriber.Builder builder = Subscriber
- .newBuilder(ProjectSubscriptionName.of(projectId, subscriptionId), messageReceiver)
- .setCredentialsProvider(serializableCredentialsProvider);
+ .newBuilder(ProjectSubscriptionName.of(projectId, subscriptionId), messageReceiver)
+ .setCredentialsProvider(serializableCredentialsProvider);
if (hostAndPort != null) {
managedChannel = ManagedChannelBuilder
- .forTarget(hostAndPort)
- .usePlaintext(true) // This is 'Ok' because this is ONLY used for testing.
- .build();
+ .forTarget(hostAndPort)
+ .usePlaintext(true) // This is 'Ok' because this is ONLY used for testing.
+ .build();
channel = GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build();
builder.setChannelProvider(FixedTransportChannelProvider.create(channel));
}
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java
index bd04058..44b1fa0 100644
--- a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java
@@ -40,7 +40,9 @@ public class SerializableCredentialsProvider implements CredentialsProvider, Ser
}
/**
- * Creates a SerializableCredentialsProvider for a PubSubSubscription based on environment variables. {@link com.google.cloud.pubsub.v1.SubscriptionAdminSettings}
+ * Creates a SerializableCredentialsProvider for a PubSubSubscription based on environment variables.
+ * {@link com.google.cloud.pubsub.v1.SubscriptionAdminSettings}
+ *
* @return serializableCredentialsProvider
* @throws IOException thrown by {@link Credentials}
*/
@@ -50,11 +52,12 @@ public class SerializableCredentialsProvider implements CredentialsProvider, Ser
}
/**
- * Creates a SerializableCredentialsProvider for a PubSubSubscription without any credentials. {@link com.google.cloud.pubsub.v1.SubscriptionAdminSettings}
+ * Creates a SerializableCredentialsProvider for a PubSubSubscription without any credentials.
+ * {@link com.google.cloud.pubsub.v1.SubscriptionAdminSettings}
* This is ONLY useful when running tests locally against Mockito or the Google PubSub emulator
- * @see <a href="https://cloud.google.com/pubsub/docs/emulator" target="_top">https://cloud.google.com/pubsub/docs/emulator</a>.
+ * @see <a href="https://cloud.google.com/pubsub/docs/emulator" target="_top">https://cloud.google.com/pubsub/docs/emulator</a>
* @return serializableCredentialsProvider
- * @throws IOException thrown by {@link Credentials}
+ * @see <a href="https://cloud.google.com/pubsub/docs/emulator" target="_top">https://cloud.google.com/pubsub/docs/emulator</a>
*/
public static SerializableCredentialsProvider withoutCredentials() {
return new SerializableCredentialsProvider(NoCredentials.getInstance());
diff --git a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java
index f98731b..a340ae9 100644
--- a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java
+++ b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.flink.streaming.connectors.pubsub;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -5,9 +22,9 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.junit.Test;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
/**
* Test for {@link Bound}.
@@ -107,6 +124,7 @@ public class BoundTest {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
+ // Ignore any exceptions
}
}
}
diff --git a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java
index 805f823..5f938fd 100644
--- a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java
+++ b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.flink.streaming.connectors.pubsub;
import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -13,9 +30,9 @@ import org.junit.Test;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
/**
* Tests for {@link BoundedPubSubSource}.
@@ -63,8 +80,8 @@ public class BoundedPubSubSourceTest {
private PubsubMessage pubSubMessage() {
return PubsubMessage.newBuilder()
- .setMessageId("message-id")
- .setData(ByteString.copyFrom("some-message".getBytes()))
- .build();
+ .setMessageId("message-id")
+ .setData(ByteString.copyFrom("some-message".getBytes()))
+ .build();
}
}
diff --git a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java
index 73ca53b..9db5d7d 100644
--- a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java
+++ b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java
@@ -42,7 +42,6 @@ import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.times;
-
/**
* Test for {@link SourceFunction}.
*/
@@ -142,7 +141,7 @@ public class PubSubSourceTest {
private PubsubMessage pubSubMessage() {
return PubsubMessage.newBuilder()
- .setData(ByteString.copyFrom(SERIALIZED_MESSAGE))
- .build();
+ .setData(ByteString.copyFrom(SERIALIZED_MESSAGE))
+ .build();
}
}
diff --git a/flink-connectors/flink-connector-pubsub/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-pubsub/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..b316a9a
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/test/resources/log4j-test.properties
@@ -0,0 +1,24 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+log4j.rootLogger=INFO, testlogger
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.out
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
diff --git a/flink-connectors/flink-connector-pubsub/pom.xml b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/pom.xml
similarity index 50%
copy from flink-connectors/flink-connector-pubsub/pom.xml
copy to flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/pom.xml
index a6e8d72..7dd0d15 100644
--- a/flink-connectors/flink-connector-pubsub/pom.xml
+++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/pom.xml
@@ -25,33 +25,75 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-connectors</artifactId>
+ <artifactId>flink-end-to-end-tests</artifactId>
<version>1.7-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
- <artifactId>flink-connector-pubsub_${scala.binary.version}</artifactId>
- <name>flink-connector-pubsub</name>
+ <artifactId>flink-connector-pubsub-emulator-tests</artifactId>
+ <name>flink-connector-pubsub-emulator-tests</name>
<packaging>jar</packaging>
- <properties>
- <pubsub.version>1.37.1</pubsub.version>
- </properties>
+ <!-- This is the way we get a consistent set of versions of the Google tools -->
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>com.google.cloud</groupId>
+ <artifactId>google-cloud-bom</artifactId>
+ <version>0.53.0-alpha</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
<dependencies>
+ <!--All dependencies are <scope>test</scope> -->
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
- <scope>provided</scope>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-pubsub_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
- <version>${pubsub.version}</version>
+ <!-- Version is pulled from google-cloud-bom -->
+ <exclusions>
+ <!-- Exclude an old version of guava that is being pulled
+ in by a transitive dependency of google-api-client -->
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-jdk5</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- This is used to run the local PubSub -->
+ <dependency>
+ <groupId>com.spotify</groupId>
+ <artifactId>docker-client</artifactId>
+ <version>8.11.7</version>
+ <scope>test</scope>
</dependency>
<dependency>
@@ -69,51 +111,39 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
-
</dependencies>
- <build>
- <plugins>
+ <!-- ONLY run the tests when explicitly told to do so -->
+ <properties>
+ <skipTests>true</skipTests>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.12.4</version>
+ <configuration>
+ <skipTests>${skipTests}</skipTests>
+ </configuration>
+ </plugin>
+ <!-- Disabling convergence check because there are multiple problems within the used pubsub dependencies -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
+ <artifactId>maven-enforcer-plugin</artifactId>
<executions>
<execution>
- <id>shade-flink</id>
- <phase>package</phase>
+ <id>dependency-convergence</id>
<goals>
- <goal>shade</goal>
+ <goal>enforce</goal>
</goals>
<configuration>
- <shadeTestJar>false</shadeTestJar>
- <artifactSet>
- <includes>
- <include>*:*</include>
- </includes>
- </artifactSet>
- <relocations>
- <relocation>
- <pattern>com.google.guava</pattern>
- <shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.guava</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.google.common.base</pattern>
- <shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.common.base</shadedPattern>
- </relocation>
- <relocation>
- <pattern>io.grpc.auth</pattern>
- <shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.auth</shadedPattern>
- </relocation>
- <relocation>
- <pattern>io.grpc.protobuf</pattern>
- <shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.protobuf</shadedPattern>
- </relocation>
- </relocations>
+ <skip>true</skip>
</configuration>
</execution>
</executions>
</plugin>
- </plugins>
- </build>
+ </plugins>
+ </build>
</project>
diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/CheckPubSubEmulatorTest.java b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/CheckPubSubEmulatorTest.java
new file mode 100644
index 0000000..a54d47b
--- /dev/null
+++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/CheckPubSubEmulatorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.connectors.pubsub.emulator.GCloudUnitTestBase;
+import org.apache.flink.streaming.connectors.pubsub.emulator.PubsubHelper;
+
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests to ensure the docker image with PubSub is working correctly.
+ */
+public class CheckPubSubEmulatorTest extends GCloudUnitTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CheckPubSubEmulatorTest.class);
+
+ private static final String PROJECT_NAME = "Project";
+ private static final String TOPIC_NAME = "Topic";
+ private static final String SUBSCRIPTION_NAME = "Subscription";
+
+ private static PubsubHelper pubsubHelper = getPubsubHelper();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME);
+ pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ pubsubHelper.deleteSubscription(PROJECT_NAME, SUBSCRIPTION_NAME);
+ pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME);
+ }
+
+ @Test
+ public void testPull() throws Exception {
+ Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME);
+ publisher
+ .publish(PubsubMessage
+ .newBuilder()
+ .setData(ByteString.copyFromUtf8("Hello World PULL"))
+ .build())
+ .get();
+
+ List<ReceivedMessage> receivedMessages = pubsubHelper.pullMessages(PROJECT_NAME, SUBSCRIPTION_NAME, 10);
+ assertEquals(1, receivedMessages.size());
+ assertEquals("Hello World PULL", receivedMessages.get(0).getMessage().getData().toStringUtf8());
+
+ publisher.shutdown();
+ }
+
+ @Test
+ public void testPush() throws Exception {
+ List<PubsubMessage> receivedMessages = new ArrayList<>();
+ Subscriber subscriber = pubsubHelper.
+ subscribeToSubscription(
+ PROJECT_NAME,
+ SUBSCRIPTION_NAME,
+ (message, consumer) -> receivedMessages.add(message)
+ );
+
+ Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME);
+ publisher
+ .publish(PubsubMessage
+ .newBuilder()
+ .setData(ByteString.copyFromUtf8("Hello World"))
+ .build())
+ .get();
+
+ LOG.info("Waiting a while to receive the message...");
+ Thread.sleep(1000);
+
+ assertEquals(1, receivedMessages.size());
+ assertEquals("Hello World", receivedMessages.get(0).getData().toStringUtf8());
+
+ try {
+ subscriber.stopAsync().awaitTerminated(100, MILLISECONDS);
+ } catch (TimeoutException tme) {
+ // Yeah, whatever. Don't care about clean shutdown here.
+ }
+ publisher.shutdown();
+ }
+
+}
diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSinkTest.java b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSinkTest.java
new file mode 100644
index 0000000..165579f
--- /dev/null
+++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSinkTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.pubsub.emulator.GCloudUnitTestBase;
+import org.apache.flink.streaming.connectors.pubsub.emulator.PubsubHelper;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test of the PubSub SINK with the Google PubSub emulator.
+ */
+public class EmulatedPubSubSinkTest extends GCloudUnitTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EmulatedPubSubSinkTest.class);
+
+ private static final String PROJECT_NAME = "FLProject";
+ private static final String TOPIC_NAME = "FLTopic";
+ private static final String SUBSCRIPTION_NAME = "FLSubscription";
+
+ private static PubsubHelper pubsubHelper;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ pubsubHelper = getPubsubHelper();
+ pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME);
+ pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ pubsubHelper.deleteSubscription(PROJECT_NAME, SUBSCRIPTION_NAME);
+ pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME);
+ }
+
+ @Test
+ public void testFlinkSink() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ List<String> input = Arrays.asList("One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eigth", "Nine", "Ten");
+
+ // Create test stream
+ DataStream<String> theData = env
+ .fromCollection(input)
+ .name("Test input")
+ .map((MapFunction<String, String>) StringUtils::reverse);
+
+ // Sink into pubsub
+ theData
+ .addSink(PubSubSink.<String>newBuilder()
+ .withProjectName(PROJECT_NAME)
+ .withTopicName(TOPIC_NAME)
+ .withSerializationSchema(new SimpleStringSchema())
+ // Specific for emulator
+ .withCredentialsProvider(getPubsubHelper().getCredentialsProvider())
+ .withHostAndPort(getPubSubHostPort())
+ .build())
+ .name("PubSub sink");
+
+ // Run
+ env.execute();
+
+ // Now get the result from PubSub and verify if everything is there
+ List<ReceivedMessage> receivedMessages = pubsubHelper.pullMessages(PROJECT_NAME, SUBSCRIPTION_NAME, 100);
+
+ assertEquals("Wrong number of elements", input.size(), receivedMessages.size());
+
+ // Check output strings
+ List<String> output = new ArrayList<>();
+ receivedMessages.forEach(msg -> output.add(msg.getMessage().getData().toStringUtf8()));
+
+ for (String test : input) {
+ assertTrue("Missing " + test, output.contains(StringUtils.reverse(test)));
+ }
+ }
+
+}
diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSourceTest.java b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSourceTest.java
new file mode 100644
index 0000000..aaa4fc3
--- /dev/null
+++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSourceTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.pubsub.emulator.GCloudUnitTestBase;
+import org.apache.flink.streaming.connectors.pubsub.emulator.PubsubHelper;
+
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test of the PubSub SOURCE with the Google PubSub emulator.
+ */
+public class EmulatedPubSubSourceTest extends GCloudUnitTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EmulatedPubSubSourceTest.class);
+
+ private static final String PROJECT_NAME = "FLProject";
+ private static final String TOPIC_NAME = "FLTopic";
+ private static final String SUBSCRIPTION_NAME = "FLSubscription";
+
+ private static PubsubHelper pubsubHelper;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ pubsubHelper = getPubsubHelper();
+ pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME);
+ pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ pubsubHelper.deleteSubscription(PROJECT_NAME, SUBSCRIPTION_NAME);
+ pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME);
+ }
+
+ @Test
+ public void testFlinkSource() throws Exception {
+ // Create some messages and put them into pubsub
+ List<String> input = Arrays.asList("One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eigth", "Nine", "Ten");
+
+ // Publish the messages into PubSub
+ Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME);
+ input.forEach(s -> {
+ try {
+ publisher
+ .publish(PubsubMessage
+ .newBuilder()
+ .setData(ByteString.copyFromUtf8(s))
+ .build())
+ .get();
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ }
+ });
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(100);
+
+ DataStream<String> fromPubSub = env
+ .addSource(BoundedPubSubSource.<String>newBuilder()
+ .withDeserializationSchema(new SimpleStringSchema())
+ .withProjectSubscriptionName(PROJECT_NAME, SUBSCRIPTION_NAME)
+ // Specific for emulator
+ .withCredentialsProvider(getPubsubHelper().getCredentialsProvider())
+ .withHostAndPort(getPubSubHostPort())
+ // Make sure the test topology self terminates
+ .boundedByTimeSinceLastMessage(1000)
+ .build())
+ .name("PubSub source");
+
+ List<String> output = new ArrayList<>();
+ fromPubSub.writeUsingOutputFormat(new LocalCollectionOutputFormat<>(output));
+
+ env.execute();
+
+ assertEquals("Wrong number of elements", input.size(), output.size());
+ for (String test : input) {
+ assertTrue("Missing " + test, output.contains(test));
+ }
+ }
+
+}
diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudEmulatorManager.java b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudEmulatorManager.java
new file mode 100644
index 0000000..27a658a
--- /dev/null
+++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudEmulatorManager.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub.emulator;
+
+import com.spotify.docker.client.DefaultDockerClient;
+import com.spotify.docker.client.DockerClient;
+import com.spotify.docker.client.exceptions.ContainerNotFoundException;
+import com.spotify.docker.client.exceptions.DockerCertificateException;
+import com.spotify.docker.client.exceptions.DockerException;
+import com.spotify.docker.client.exceptions.ImageNotFoundException;
+import com.spotify.docker.client.messages.ContainerConfig;
+import com.spotify.docker.client.messages.ContainerCreation;
+import com.spotify.docker.client.messages.ContainerInfo;
+import com.spotify.docker.client.messages.HostConfig;
+import com.spotify.docker.client.messages.PortBinding;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * The class that handles the starting and stopping of the emulator docker image.
+ */
+public class GCloudEmulatorManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GCloudEmulatorManager.class);
+
+ private static DockerClient docker;
+
+ private static String dockerIpAddress = "127.0.0.1";
+
+ public static final String INTERNAL_PUBSUB_PORT = "22222";
+ public static final String DOCKER_IMAGE_NAME = "google/cloud-sdk:latest";
+
+ private static String pubsubPort;
+
+ public static String getDockerIpAddress() {
+ if (dockerIpAddress == null) {
+ throw new IllegalStateException("The docker has not yet been started (yet) so you cannot get the IP address yet.");
+ }
+ return dockerIpAddress;
+ }
+
+ public static String getDockerPubSubPort() {
+ if (pubsubPort == null) {
+ throw new IllegalStateException("The docker has not yet been started (yet) so you cannot get the port information yet.");
+ }
+ return pubsubPort;
+ }
+
+ public static final String UNITTEST_PROJECT_ID = "running-from-junit-for-flink";
+ private static final String CONTAINER_NAME_JUNIT = (DOCKER_IMAGE_NAME + "_" + UNITTEST_PROJECT_ID).replaceAll("[^a-zA-Z0-9_]", "_");
+
+ public static void launchDocker() throws DockerException, InterruptedException, DockerCertificateException {
+ // Create a client based on DOCKER_HOST and DOCKER_CERT_PATH env vars
+ docker = DefaultDockerClient.fromEnv().build();
+
+ terminateAndDiscardAnyExistingContainers(true);
+
+ LOG.info("");
+ LOG.info("/===========================================");
+ LOG.info("| GCloud Emulator");
+
+ ContainerInfo containerInfo;
+ String id;
+
+ try {
+ docker.inspectImage(DOCKER_IMAGE_NAME);
+ } catch (ImageNotFoundException e) {
+ // No such image so we must download it first.
+ LOG.info("| - Getting docker image \"{}\"", DOCKER_IMAGE_NAME);
+ docker.pull(DOCKER_IMAGE_NAME, message -> {
+ if (message.id() != null && message.progress() != null) {
+ LOG.info("| - Downloading > {} : {}", message.id(), message.progress());
+ }
+ });
+ }
+
+ // No such container. Good, we create one!
+ LOG.info("| - Creating new container");
+
+ // Bind container ports to host ports
+ final Map<String, List<PortBinding>> portBindings = new HashMap<>();
+ portBindings.put(INTERNAL_PUBSUB_PORT, Collections.singletonList(PortBinding.randomPort("0.0.0.0")));
+
+ final HostConfig hostConfig = HostConfig.builder().portBindings(portBindings).build();
+
+ // Create new container with exposed ports
+ final ContainerConfig containerConfig = ContainerConfig.builder()
+ .hostConfig(hostConfig)
+ .exposedPorts(INTERNAL_PUBSUB_PORT)
+ .image(DOCKER_IMAGE_NAME)
+ .cmd("sh", "-c", "mkdir -p /opt/data/pubsub ; gcloud beta emulators pubsub start --data-dir=/opt/data/pubsub --host-port=0.0.0.0:" + INTERNAL_PUBSUB_PORT)
+ .build();
+
+ final ContainerCreation creation = docker.createContainer(containerConfig, CONTAINER_NAME_JUNIT);
+ id = creation.id();
+
+ containerInfo = docker.inspectContainer(id);
+
+ if (!containerInfo.state().running()) {
+ LOG.warn("| - Starting it up ....");
+ docker.startContainer(id);
+ Thread.sleep(1000);
+ }
+
+ containerInfo = docker.inspectContainer(id);
+
+ dockerIpAddress = "127.0.0.1";
+
+ Map<String, List<PortBinding>> ports = containerInfo.networkSettings().ports();
+
+ assertNotNull("Unable to retrieve the ports where to connect to the emulators", ports);
+ assertEquals("We expect 1 port to be mapped", 1, ports.size());
+
+ pubsubPort = getPort(ports, INTERNAL_PUBSUB_PORT, "PubSub");
+
+ LOG.info("| Waiting for the emulators to be running");
+
+ // PubSub exposes an "Ok" at the root url when running.
+ if (!waitForOkStatus("PubSub", pubsubPort)) {
+ // Oops, we did not get an "Ok" within 10 seconds
+ startHasFailedKillEverything();
+ }
+ LOG.info("\\===========================================");
+ LOG.info("");
+ }
+
+ private static void startHasFailedKillEverything() throws DockerException, InterruptedException {
+ LOG.error("|");
+ LOG.error("| ==================== ");
+ LOG.error("| YOUR TESTS WILL FAIL ");
+ LOG.error("| ==================== ");
+ LOG.error("|");
+
+ // Kill this container and wipe all connection information
+ dockerIpAddress = null;
+ pubsubPort = null;
+ terminateAndDiscardAnyExistingContainers(false);
+ }
+
+ private static final long MAX_RETRY_TIMEOUT = 10000; // Milliseconds
+
+ private static boolean waitForOkStatus(String label, String port) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ URL url = new URL("http://" + dockerIpAddress + ":" + port + "/");
+ HttpURLConnection con = (HttpURLConnection) url.openConnection();
+ con.setRequestMethod("GET");
+ con.setConnectTimeout(50);
+ con.setReadTimeout(50);
+
+ BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
+ String inputLine;
+ StringBuilder content = new StringBuilder();
+ while ((inputLine = in.readLine()) != null) {
+ content.append(inputLine);
+ }
+ in.close();
+ con.disconnect();
+ if (content.toString().contains("Ok")) {
+ LOG.info("| - {} Emulator is running at {}:{}", label, dockerIpAddress, port);
+ return true;
+ }
+ } catch (IOException e) {
+ long now = System.currentTimeMillis();
+ if (now - start > MAX_RETRY_TIMEOUT) {
+ LOG.error("| - PubSub Emulator at {}:{} FAILED to return an Ok status within {} ms ", dockerIpAddress, port, MAX_RETRY_TIMEOUT);
+ return false;
+ }
+ try {
+ Thread.sleep(100); // Sleep a very short time
+ } catch (InterruptedException e1) {
+ // Ignore
+ }
+ }
+ }
+ }
+
+ private static String getPort(Map<String, List<PortBinding>> ports, String internalTCPPort, String label) {
+ List<PortBinding> portMappings = ports.get(internalTCPPort + "/tcp");
+ if (portMappings == null || portMappings.isEmpty()) {
+ LOG.info("| {} Emulator {} --> NOTHING CONNECTED TO {}", label, internalTCPPort + "/tcp");
+ return null;
+ }
+
+ return portMappings.get(0).hostPort();
+ }
+
+ private static void terminateAndDiscardAnyExistingContainers(boolean warnAboutExisting) throws DockerException, InterruptedException {
+ ContainerInfo containerInfo;
+ try {
+ containerInfo = docker.inspectContainer(CONTAINER_NAME_JUNIT);
+ // Already have this container running.
+
+ assertNotNull("We should either we get containerInfo or we get an exception", containerInfo);
+
+ LOG.info("");
+ LOG.info("/===========================================");
+ if (warnAboutExisting) {
+ LOG.warn("| >>> FOUND OLD EMULATOR INSTANCE RUNNING <<< ");
+ LOG.warn("| Destroying that one to keep tests running smoothly.");
+ }
+ LOG.info("| Cleanup of GCloud Emulator");
+
+ // We REQUIRE 100% accurate side effect free unit tests
+ // So we completely discard this one.
+
+ String id = containerInfo.id();
+ // Kill container
+ if (containerInfo.state().running()) {
+ docker.killContainer(id);
+ LOG.info("| - Killed");
+ }
+
+ // Remove container
+ docker.removeContainer(id);
+
+ LOG.info("| - Removed");
+ LOG.info("\\===========================================");
+ LOG.info("");
+
+ } catch (ContainerNotFoundException cnfe) {
+ // No such container. Good !
+ }
+ }
+
+ public static void terminateDocker() throws DockerException, InterruptedException {
+ terminateAndDiscardAnyExistingContainers(false);
+
+ // Close the docker client
+ docker.close();
+ }
+
+ // ====================================================================================
+
+}
diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudUnitTestBase.java b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudUnitTestBase.java
new file mode 100644
index 0000000..b6a011a
--- /dev/null
+++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudUnitTestBase.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub.emulator;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.spotify.docker.client.exceptions.DockerException;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.Serializable;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.streaming.connectors.pubsub.emulator.GCloudEmulatorManager.getDockerIpAddress;
+import static org.apache.flink.streaming.connectors.pubsub.emulator.GCloudEmulatorManager.getDockerPubSubPort;
+
+/**
+ * The base class from which unit tests should inherit if they need to use the Google cloud emulators.
+ */
+public class GCloudUnitTestBase implements Serializable {
+ @BeforeClass
+ public static void launchGCloudEmulator() throws Exception {
+ // Separated out into separate class so the entire test class to be serializable
+ GCloudEmulatorManager.launchDocker();
+ }
+
+ @AfterClass
+ public static void terminateGCloudEmulator() throws DockerException, InterruptedException {
+ GCloudEmulatorManager.terminateDocker();
+ }
+
+ // ====================================================================================
+ // Pubsub helpers
+
+ private static ManagedChannel channel = null;
+ private static TransportChannelProvider channelProvider = null;
+ private static CredentialsProvider credentialsProvider = null;
+
+ public static PubsubHelper getPubsubHelper() {
+ if (channel == null) {
+ //noinspection deprecation
+ channel = ManagedChannelBuilder
+ .forTarget(getPubSubHostPort())
+ .usePlaintext(true)
+ .build();
+ channelProvider = FixedTransportChannelProvider
+ .create(GrpcTransportChannel.create(channel));
+ credentialsProvider = NoCredentialsProvider.create();
+ }
+ return new PubsubHelper(channelProvider, credentialsProvider);
+ }
+
+ public static String getPubSubHostPort() {
+ return getDockerIpAddress() + ":" + getDockerPubSubPort();
+ }
+
+ @AfterClass
+ public static void cleanupPubsubChannel() throws InterruptedException {
+ if (channel != null) {
+ channel.shutdownNow().awaitTermination(1, SECONDS);
+ channel = null;
+ }
+ }
+}
diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/PubsubHelper.java b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/PubsubHelper.java
new file mode 100644
index 0000000..f08576b
--- /dev/null
+++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/PubsubHelper.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub.emulator;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.rpc.NotFoundException;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.PushConfig;
+import com.google.pubsub.v1.ReceivedMessage;
+import com.google.pubsub.v1.Topic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A helper class to make managing the testing topics a bit easier.
+ */
+public class PubsubHelper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PubsubHelper.class);
+
+ private TransportChannelProvider channelProvider = null;
+ private CredentialsProvider credentialsProvider = null;
+
+ private TopicAdminClient topicClient;
+ private SubscriptionAdminClient subscriptionAdminClient;
+
+ public PubsubHelper() {
+ this(TopicAdminSettings.defaultTransportChannelProvider(),
+ TopicAdminSettings.defaultCredentialsProviderBuilder().build());
+ }
+
+ public PubsubHelper(TransportChannelProvider channelProvider, CredentialsProvider credentialsProvider) {
+ this.channelProvider = channelProvider;
+ this.credentialsProvider = credentialsProvider;
+ }
+
+ public TransportChannelProvider getChannelProvider() {
+ return channelProvider;
+ }
+
+ public CredentialsProvider getCredentialsProvider() {
+ return credentialsProvider;
+ }
+
+ public TopicAdminClient getTopicAdminClient() throws IOException {
+ if (topicClient == null) {
+ TopicAdminSettings topicAdminSettings = TopicAdminSettings.newBuilder()
+ .setTransportChannelProvider(channelProvider)
+ .setCredentialsProvider(credentialsProvider)
+ .build();
+ topicClient = TopicAdminClient.create(topicAdminSettings);
+ }
+ return topicClient;
+ }
+
+ public Topic createTopic(String project, String topic) throws IOException {
+ deleteTopic(project, topic);
+ ProjectTopicName topicName = ProjectTopicName.of(project, topic);
+ TopicAdminClient adminClient = getTopicAdminClient();
+ LOG.info("CreateTopic {}", topicName);
+ return adminClient.createTopic(topicName);
+ }
+
+ public void deleteTopic(String project, String topic) throws IOException {
+ deleteTopic(ProjectTopicName.of(project, topic));
+ }
+
+ public void deleteTopic(ProjectTopicName topicName) throws IOException {
+// LOG.info("CreateTopic {}", topicName);
+ TopicAdminClient adminClient = getTopicAdminClient();
+ try {
+ Topic existingTopic = adminClient.getTopic(topicName);
+
+ // If it exists we delete all subscriptions and the topic itself.
+ LOG.info("DeleteTopic {} first delete old subscriptions.", topicName);
+ adminClient
+ .listTopicSubscriptions(topicName)
+ .iterateAllAsProjectSubscriptionName()
+ .forEach(subscriptionAdminClient::deleteSubscription);
+ LOG.info("DeleteTopic {}", topicName);
+ adminClient
+ .deleteTopic(topicName);
+ } catch (NotFoundException e) {
+ // Doesn't exist. Good.
+ }
+ }
+
+ public SubscriptionAdminClient getSubscriptionAdminClient() throws IOException {
+ if (subscriptionAdminClient == null) {
+ SubscriptionAdminSettings subscriptionAdminSettings =
+ SubscriptionAdminSettings
+ .newBuilder()
+ .setTransportChannelProvider(channelProvider)
+ .setCredentialsProvider(credentialsProvider)
+ .build();
+ subscriptionAdminClient = SubscriptionAdminClient.create(subscriptionAdminSettings);
+ }
+ return subscriptionAdminClient;
+ }
+
+ public void createSubscription(String subscriptionProject, String subscription, String topicProject, String topic) throws IOException {
+ ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.newBuilder()
+ .setProject(subscriptionProject)
+ .setSubscription(subscription)
+ .build();
+
+ deleteSubscription(subscriptionName);
+
+ SubscriptionAdminClient adminClient = getSubscriptionAdminClient();
+
+ ProjectTopicName topicName = ProjectTopicName.of(topicProject, topic);
+
+ PushConfig pushConfig = PushConfig.getDefaultInstance();
+
+ LOG.info("CreateSubscription {}", subscriptionName);
+ getSubscriptionAdminClient().createSubscription(subscriptionName, topicName, pushConfig, 1);
+ }
+
+ public void deleteSubscription(String subscriptionProject, String subscription) throws IOException {
+ deleteSubscription(ProjectSubscriptionName
+ .newBuilder()
+ .setProject(subscriptionProject)
+ .setSubscription(subscription)
+ .build());
+ }
+
+ public void deleteSubscription(ProjectSubscriptionName subscriptionName) throws IOException {
+ SubscriptionAdminClient adminClient = getSubscriptionAdminClient();
+ try {
+ adminClient.getSubscription(subscriptionName);
+ // If it already exists we must first delete it.
+ LOG.info("DeleteSubscription {}", subscriptionName);
+ adminClient.deleteSubscription(subscriptionName);
+ } catch (NotFoundException e) {
+ // Doesn't exist. Good.
+ }
+ }
+
+ // Mostly copied from the example on https://cloud.google.com/pubsub/docs/pull
+ public List<ReceivedMessage> pullMessages(String projectId, String subscriptionId, int maxNumberOfMessages) throws Exception {
+ SubscriberStubSettings subscriberStubSettings =
+ SubscriberStubSettings.newBuilder()
+ .setTransportChannelProvider(channelProvider)
+ .setCredentialsProvider(credentialsProvider)
+ .build();
+ try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
+ // String projectId = "my-project-id";
+ // String subscriptionId = "my-subscription-id";
+ // int numOfMessages = 10; // max number of messages to be pulled
+ String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
+ PullRequest pullRequest =
+ PullRequest.newBuilder()
+ .setMaxMessages(maxNumberOfMessages)
+ .setReturnImmediately(false) // return immediately if messages are not available
+ .setSubscription(subscriptionName)
+ .build();
+
+ // use pullCallable().futureCall to asynchronously perform this operation
+ PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
+ List<String> ackIds = new ArrayList<>();
+ for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
+ // handle received message
+ // ...
+ ackIds.add(message.getAckId());
+ }
+ // acknowledge received messages
+ AcknowledgeRequest acknowledgeRequest =
+ AcknowledgeRequest.newBuilder()
+ .setSubscription(subscriptionName)
+ .addAllAckIds(ackIds)
+ .build();
+ // use acknowledgeCallable().futureCall to asynchronously perform this operation
+ subscriber.acknowledgeCallable().call(acknowledgeRequest);
+ return pullResponse.getReceivedMessagesList();
+ }
+ }
+
+ public Subscriber subscribeToSubscription(String project, String subscription, MessageReceiver messageReceiver) {
+ ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(project, subscription);
+ Subscriber subscriber =
+ Subscriber
+ .newBuilder(subscriptionName, messageReceiver)
+ .setChannelProvider(channelProvider)
+ .setCredentialsProvider(credentialsProvider)
+ .build();
+ subscriber.startAsync();
+ return subscriber;
+ }
+
+ public Publisher createPublisher(String project, String topic) throws IOException {
+ return Publisher
+ .newBuilder(ProjectTopicName.of(project, topic))
+ .setChannelProvider(channelProvider)
+ .setCredentialsProvider(credentialsProvider)
+ .build();
+ }
+
+}
diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/resources/log4j-test.properties b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..b316a9a
--- /dev/null
+++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/resources/log4j-test.properties
@@ -0,0 +1,24 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+log4j.rootLogger=INFO, testlogger
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.out
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 0950e2f..b17dc70 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -61,6 +61,7 @@ under the License.
<module>flink-metrics-availability-test</module>
<module>flink-metrics-reporter-prometheus-test</module>
<module>flink-heavy-deployment-stress-test</module>
+ <module>flink-connector-pubsub-emulator-tests</module>
<module>flink-streaming-kafka-test-base</module>
<module>flink-streaming-kafka-test</module>
<module>flink-streaming-kafka011-test</module>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index af21512..957b3c6 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -136,6 +136,8 @@ run_test "Elasticsearch (v6.3.1) sink end-to-end test" "$END_TO_END_DIR/test-scr
run_test "Quickstarts Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh java"
run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala"
+run_test "Test PubSub connector with Docker based Google PubSub Emulator" "$END_TO_END_DIR/test-scripts/test_streaming_pubsub.sh"
+
run_test "Avro Confluent Schema Registry nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_confluent_schema_registry.sh"
run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh file"
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_pubsub.sh b/flink-end-to-end-tests/test-scripts/test_streaming_pubsub.sh
new file mode 100755
index 0000000..8e08385
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_pubsub.sh
@@ -0,0 +1,22 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+cd "${END_TO_END_DIR}/flink-connector-pubsub-emulator-tests"
+
+mvn test -DskipTests=false
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java
index 612da41..c5791bd 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.examples.pubsub;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pubsub.PubSubSink;
-import org.apache.flink.streaming.connectors.pubsub.PubSubSourceBuilder;
+import org.apache.flink.streaming.connectors.pubsub.PubSubSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +45,7 @@ public class PubSubExample {
System.out.println("Missing parameters!\n" +
"Usage: flink run PubSub.jar --input-subscription <subscription> --input-topicName <topic> --output-topicName " +
"--google-project <google project name> ");
- //return;
+ return;
}
String projectName = parameterTool.getRequired("google-project");
@@ -62,10 +62,9 @@ public class PubSubExample {
private static void runFlinkJob(String projectName, String subscriptionName, String outputTopicName) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.addSource(PubSubSourceBuilder.<Integer>builder()
+ env.addSource(PubSubSource.<Integer>newBuilder()
.withProjectSubscriptionName(projectName, subscriptionName)
.withDeserializationSchema(new IntegerSerializer())
- .withMode(PubSubSourceBuilder.Mode.NONE)
.build())
.map(PubSubExample::printAndReturn).disableChaining()
.addSink(PubSubSink.<Integer>newBuilder()