You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2019/07/05 12:46:17 UTC

[flink] 02/05: [FLINK-9311] [pubsub] Add unit and integration tests for PubSub connectors

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a96277090358d2237aab7426ebde7fdf1eaccbe4
Author: Niels Basjes <nb...@bol.com>
AuthorDate: Wed Aug 15 14:05:39 2018 +0200

    [FLINK-9311] [pubsub] Add unit and integration tests for PubSub connectors
---
 flink-connectors/flink-connector-pubsub/pom.xml    |  86 +++----
 .../flink/streaming/connectors/pubsub/Bound.java   |  17 ++
 .../connectors/pubsub/BoundedPubSubSource.java     |  33 ++-
 .../streaming/connectors/pubsub/PubSubSink.java    | 141 +++++------
 .../streaming/connectors/pubsub/PubSubSource.java  |  40 ++--
 .../connectors/pubsub/SubscriberWrapper.java       |  22 +-
 .../common/SerializableCredentialsProvider.java    |  11 +-
 .../streaming/connectors/pubsub/BoundTest.java     |  20 +-
 .../connectors/pubsub/BoundedPubSubSourceTest.java |  25 +-
 .../connectors/pubsub/PubSubSourceTest.java        |   5 +-
 .../src/test/resources/log4j-test.properties       |  24 ++
 .../flink-connector-pubsub-emulator-tests}/pom.xml | 112 +++++----
 .../connectors/pubsub/CheckPubSubEmulatorTest.java | 115 +++++++++
 .../connectors/pubsub/EmulatedPubSubSinkTest.java  | 109 +++++++++
 .../pubsub/EmulatedPubSubSourceTest.java           | 116 +++++++++
 .../pubsub/emulator/GCloudEmulatorManager.java     | 264 +++++++++++++++++++++
 .../pubsub/emulator/GCloudUnitTestBase.java        |  84 +++++++
 .../connectors/pubsub/emulator/PubsubHelper.java   | 232 ++++++++++++++++++
 .../src/test/resources/log4j-test.properties       |  24 ++
 flink-end-to-end-tests/pom.xml                     |   1 +
 flink-end-to-end-tests/run-nightly-tests.sh        |   2 +
 .../test-scripts/test_streaming_pubsub.sh          |  22 ++
 .../streaming/examples/pubsub/PubSubExample.java   |   7 +-
 23 files changed, 1309 insertions(+), 203 deletions(-)

diff --git a/flink-connectors/flink-connector-pubsub/pom.xml b/flink-connectors/flink-connector-pubsub/pom.xml
index a6e8d72..f50cbdd 100644
--- a/flink-connectors/flink-connector-pubsub/pom.xml
+++ b/flink-connectors/flink-connector-pubsub/pom.xml
@@ -35,12 +35,20 @@ under the License.
 
 	<packaging>jar</packaging>
 
-	<properties>
-		<pubsub.version>1.37.1</pubsub.version>
-	</properties>
+	<!-- This is the way we get a consistent set of versions of the Google tools -->
+	<dependencyManagement>
+		<dependencies>
+			<dependency>
+				<groupId>com.google.cloud</groupId>
+				<artifactId>google-cloud-bom</artifactId>
+				<version>0.53.0-alpha</version>
+				<type>pom</type>
+				<scope>import</scope>
+			</dependency>
+		</dependencies>
+	</dependencyManagement>
 
 	<dependencies>
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
@@ -51,7 +59,29 @@ under the License.
 		<dependency>
 			<groupId>com.google.cloud</groupId>
 			<artifactId>google-cloud-pubsub</artifactId>
-			<version>${pubsub.version}</version>
+			<!-- Version is pulled from google-cloud-bom -->
+			<exclusions>
+				<!-- Exclude an old version of guava that is being pulled
+                in by a transitive dependency of google-api-client -->
+				<exclusion>
+					<groupId>com.google.guava</groupId>
+					<artifactId>guava-jdk5</artifactId>
+				</exclusion>
+			</exclusions>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<version>${slf4j.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<version>${slf4j.version}</version>
+			<scope>test</scope>
 		</dependency>
 
 		<dependency>
@@ -69,51 +99,5 @@ under the License.
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
-
 	</dependencies>
-
-    <build>
-        <plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-				<executions>
-					<execution>
-						<id>shade-flink</id>
-						<phase>package</phase>
-						<goals>
-							<goal>shade</goal>
-						</goals>
-						<configuration>
-							<shadeTestJar>false</shadeTestJar>
-							<artifactSet>
-								<includes>
-									<include>*:*</include>
-								</includes>
-							</artifactSet>
-							<relocations>
-								<relocation>
-									<pattern>com.google.guava</pattern>
-									<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.guava</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>com.google.common.base</pattern>
-									<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.common.base</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>io.grpc.auth</pattern>
-									<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.auth</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>io.grpc.protobuf</pattern>
-									<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.protobuf</shadedPattern>
-								</relocation>
-							</relocations>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-        </plugins>
-    </build>
-
 </project>
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
index b37cc45..727f32e 100644
--- a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.streaming.connectors.pubsub;
 
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java
index 5f829ae..83fc15e 100644
--- a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.streaming.connectors.pubsub;
 
 import com.google.cloud.pubsub.v1.AckReplyConsumer;
@@ -5,7 +22,11 @@ import com.google.pubsub.v1.PubsubMessage;
 
 import java.io.IOException;
 
-class BoundedPubSubSource<OUT> extends PubSubSource<OUT> {
+/**
+ * A bounded PubSub Source, similar to {@link PubSubSource} but this will stop at some point. For example after a period of idle or and after n amount of messages have been received.
+ *
+ */
+public class BoundedPubSubSource<OUT> extends PubSubSource<OUT> {
 	private Bound<OUT> bound;
 
 	private BoundedPubSubSource() {
@@ -28,11 +49,19 @@ class BoundedPubSubSource<OUT> extends PubSubSource<OUT> {
 		bound.receivedMessage();
 	}
 
+	/**
+	 * Creates a {@link BoundedPubSubSourceBuilder}.
+	 * @param <OUT> Type of Object which will be read by the produced {@link BoundedPubSubSource}
+	 */
 	@SuppressWarnings("unchecked")
 	public static <OUT> BoundedPubSubSourceBuilder<OUT, ? extends PubSubSource, ? extends BoundedPubSubSourceBuilder> newBuilder() {
 		return new BoundedPubSubSourceBuilder<>(new BoundedPubSubSource<OUT>());
 	}
 
+	/**
+	 * Builder to create BoundedPubSubSource.
+	 * @param <OUT> Type of Object which will be read by the BoundedPubSubSource
+	 */
 	@SuppressWarnings("unchecked")
 	public static class BoundedPubSubSourceBuilder<OUT, PSS extends BoundedPubSubSource<OUT>, BUILDER extends BoundedPubSubSourceBuilder<OUT, PSS, BUILDER>> extends PubSubSourceBuilder<OUT, PSS, BUILDER> {
 		private Long boundedByAmountOfMessages;
@@ -52,7 +81,7 @@ class BoundedPubSubSource<OUT> extends PubSubSource<OUT> {
 			return (BUILDER) this;
 		}
 
-		private Bound <OUT> createBound() {
+		private Bound<OUT> createBound() {
 			if (boundedByAmountOfMessages != null && boundedByTimeSinceLastMessage != null) {
 				return Bound.boundByAmountOfMessagesOrTimeSinceLastMessage(boundedByAmountOfMessages, boundedByTimeSinceLastMessage);
 			}
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
index 92c9c6c..e6ac53e 100644
--- a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.connectors.pubsub;
 
-import com.google.api.gax.core.NoCredentialsProvider;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -40,22 +39,35 @@ import java.io.IOException;
 
 /**
  * A sink function that outputs to PubSub.
+ *
  * @param <IN> type of PubSubSink messages to write
  */
 public class PubSubSink<IN> extends RichSinkFunction<IN> {
 
-	private final SerializableCredentialsProvider serializableCredentialsProvider;
-	private final SerializationSchema<IN>         serializationSchema;
-	private final String                          projectName;
-	private final String                          topicName;
-	private       String                          hostAndPort = null;
+	private SerializableCredentialsProvider serializableCredentialsProvider;
+	private SerializationSchema<IN> serializationSchema;
+	private String projectName;
+	private String topicName;
+	private String hostAndPort = null;
 
 	private transient Publisher publisher;
 
-	public PubSubSink(SerializableCredentialsProvider serializableCredentialsProvider, SerializationSchema<IN> serializationSchema, String projectName, String topicName) {
+	private PubSubSink() {
+	}
+
+	void setSerializableCredentialsProvider(SerializableCredentialsProvider serializableCredentialsProvider) {
 		this.serializableCredentialsProvider = serializableCredentialsProvider;
+	}
+
+	void setSerializationSchema(SerializationSchema<IN> serializationSchema) {
 		this.serializationSchema = serializationSchema;
+	}
+
+	void setProjectName(String projectName) {
 		this.projectName = projectName;
+	}
+
+	void setTopicName(String topicName) {
 		this.topicName = topicName;
 	}
 
@@ -64,15 +76,29 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> {
 	 * The ONLY reason to use this is during tests with the emulator provided by Google.
 	 *
 	 * @param hostAndPort The combination of hostname and port to connect to ("hostname:1234")
-	 * @return The current instance
 	 */
-	public PubSubSink<IN> withHostAndPort(String hostAndPort) {
+	void withHostAndPort(String hostAndPort) {
 		this.hostAndPort = hostAndPort;
-		return this;
 	}
 
-	private transient ManagedChannel   managedChannel = null;
-	private transient TransportChannel channel        = null;
+	void initialize() throws IOException {
+		if (serializableCredentialsProvider == null) {
+			serializableCredentialsProvider = SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables();
+		}
+		if (serializationSchema == null) {
+			throw new IllegalArgumentException("The serializationSchema has not been specified.");
+		}
+		if (projectName == null) {
+			throw new IllegalArgumentException("The projectName has not been specified.");
+		}
+		if (topicName == null) {
+			throw new IllegalArgumentException("The topicName has not been specified.");
+		}
+	}
+
+
+	private transient ManagedChannel managedChannel = null;
+	private transient TransportChannel channel = null;
 
 	@Override
 	public void open(Configuration configuration) throws Exception {
@@ -114,127 +140,110 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> {
 
 	/**
 	 * Create a builder for a new PubSubSink.
+	 *
 	 * @param <IN> The generic of the type that is to be written into the sink.
 	 * @return a new PubSubSinkBuilder instance
 	 */
-	public static <IN> PubSubSinkBuilder<IN> newBuilder() {
-		return new PubSubSinkBuilder<>();
+	public static <IN> PubSubSinkBuilder<IN, ? extends PubSubSink<IN>, ? extends PubSubSinkBuilder<IN, ?, ?>> newBuilder() {
+		return new PubSubSinkBuilder<>(new PubSubSink<>());
 	}
 
 	/**
 	 * PubSubSinkBuilder to create a PubSubSink.
+	 *
 	 * @param <IN> Type of PubSubSink to create.
 	 */
-	public static class PubSubSinkBuilder<IN> {
-		private SerializableCredentialsProvider serializableCredentialsProvider = null;
-		private SerializationSchema<IN>         serializationSchema             = null;
-		private String                          projectName                     = null;
-		private String                          topicName                       = null;
-		private String                          hostAndPort                     = null;
-
-		private PubSubSinkBuilder() {
+	@SuppressWarnings("unchecked")
+	public static class PubSubSinkBuilder<IN, PSS extends PubSubSink<IN>, BUILDER extends PubSubSinkBuilder<IN, PSS, BUILDER>> {
+		protected PSS sinkUnderConstruction;
+
+		private PubSubSinkBuilder(PSS sinkUnderConstruction) {
+			this.sinkUnderConstruction = sinkUnderConstruction;
 		}
 
 		/**
 		 * Set the credentials.
 		 * If this is not used then the credentials are picked up from the environment variables.
+		 *
 		 * @param credentials the Credentials needed to connect.
 		 * @return The current PubSubSinkBuilder instance
 		 */
-		public PubSubSinkBuilder<IN> withCredentials(Credentials credentials) {
-			this.serializableCredentialsProvider = new SerializableCredentialsProvider(credentials);
-			return this;
+		public BUILDER withCredentials(Credentials credentials) {
+			sinkUnderConstruction.setSerializableCredentialsProvider(new SerializableCredentialsProvider(credentials));
+			return (BUILDER) this;
 		}
 
 		/**
 		 * Set the CredentialsProvider.
 		 * If this is not used then the credentials are picked up from the environment variables.
+		 *
 		 * @param credentialsProvider the custom SerializableCredentialsProvider instance.
 		 * @return The current PubSubSinkBuilder instance
 		 */
-		public PubSubSinkBuilder<IN> withCredentialsProvider(CredentialsProvider credentialsProvider) throws IOException {
+		public BUILDER withCredentialsProvider(CredentialsProvider credentialsProvider) throws IOException {
 			return withCredentials(credentialsProvider.getCredentials());
 		}
 
 		/**
 		 * Set the credentials to be absent.
 		 * This means that no credentials are to be used at all.
+		 *
 		 * @return The current PubSubSinkBuilder instance
 		 */
-		public PubSubSinkBuilder<IN> withoutCredentials() {
-			this.serializableCredentialsProvider = SerializableCredentialsProvider.withoutCredentials();
-			return this;
+		public BUILDER withoutCredentials() {
+			sinkUnderConstruction.setSerializableCredentialsProvider(SerializableCredentialsProvider.withoutCredentials());
+			return (BUILDER) this;
 		}
 
 		/**
 		 * @param serializationSchema Instance of a SerializationSchema that converts the IN into a byte[]
 		 * @return The current PubSubSinkBuilder instance
 		 */
-		public PubSubSinkBuilder<IN> withSerializationSchema(SerializationSchema<IN> serializationSchema) {
-			this.serializationSchema = serializationSchema;
-			return this;
+		public BUILDER withSerializationSchema(SerializationSchema<IN> serializationSchema) {
+			sinkUnderConstruction.setSerializationSchema(serializationSchema);
+			return (BUILDER) this;
 		}
 
 		/**
 		 * @param projectName The name of the project in PubSub
 		 * @return The current PubSubSinkBuilder instance
 		 */
-		public PubSubSinkBuilder<IN> withProjectName (String projectName) {
-			this.projectName = projectName;
-			return this;
+		public BUILDER withProjectName(String projectName) {
+			sinkUnderConstruction.setProjectName(projectName);
+			return (BUILDER) this;
 		}
 
 		/**
 		 * @param topicName The name of the topic in PubSub
 		 * @return The current PubSubSinkBuilder instance
 		 */
-		public PubSubSinkBuilder<IN> withTopicName (String topicName) {
-			this.topicName = topicName;
-			return this;
+		public BUILDER withTopicName(String topicName) {
+			sinkUnderConstruction.setTopicName(topicName);
+			return (BUILDER) this;
 		}
 
-
 		/**
 		 * Set the custom hostname/port combination of PubSub.
 		 * The ONLY reason to use this is during tests with the emulator provided by Google.
+		 *
 		 * @param hostAndPort The combination of hostname and port to connect to ("hostname:1234")
 		 * @return The current PubSubSinkBuilder instance
 		 */
-		public PubSubSinkBuilder<IN> withHostAndPort(String hostAndPort) {
-			this.hostAndPort = hostAndPort;
-			return this;
+		public BUILDER withHostAndPort(String hostAndPort) {
+			sinkUnderConstruction.withHostAndPort(hostAndPort);
+			return (BUILDER) this;
 		}
 
 		/**
 		 * Actually builder the desired instance of the PubSubSink.
+		 *
 		 * @return a brand new PubSubSink
-		 * @throws IOException incase of a problem getting the credentials
+		 * @throws IOException              incase of a problem getting the credentials
 		 * @throws IllegalArgumentException incase required fields were not specified.
 		 */
 		public PubSubSink<IN> build() throws IOException {
-			if (serializableCredentialsProvider == null) {
-				serializableCredentialsProvider = SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables();
-			}
-			if (serializationSchema == null) {
-				throw new IllegalArgumentException("The serializationSchema has not been specified.");
-			}
-			if (projectName == null) {
-				throw new IllegalArgumentException("The projectName has not been specified.");
-			}
-			if (topicName == null) {
-				throw new IllegalArgumentException("The topicName has not been specified.");
-			}
-
-			PubSubSink<IN> pubSubSink = new PubSubSink<>(
-				serializableCredentialsProvider,
-				serializationSchema,
-				projectName, topicName);
-
-			if (hostAndPort != null) {
-				pubSubSink.withHostAndPort(hostAndPort);
-			}
-
-			return pubSubSink;
+			sinkUnderConstruction.initialize();
+			return sinkUnderConstruction;
 		}
 	}
 
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java
index 8f8c689..2d93998 100644
--- a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java
@@ -43,7 +43,7 @@ import java.util.List;
  */
 public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, AckReplyConsumer> implements MessageReceiver, ResultTypeQueryable<OUT>, ParallelSourceFunction<OUT> {
 	private DeserializationSchema<OUT> deserializationSchema;
-	private SubscriberWrapper          subscriberWrapper;
+	private SubscriberWrapper subscriberWrapper;
 
 	protected transient SourceContext<OUT> sourceContext = null;
 
@@ -64,7 +64,8 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase
 		super.open(configuration);
 		subscriberWrapper.initialize(this);
 		if (hasNoCheckpointingEnabled(getRuntimeContext())) {
-			throw new IllegalArgumentException("Checkpointing needs to be enabled to support: PubSub ATLEAST_ONCE");
+			throw new IllegalArgumentException("The PubSubSource REQUIRES Checkpointing to be enabled and " +
+				"the checkpointing frequency must be MUCH lower than the PubSub timeout for it to retry a message.");
 		}
 	}
 
@@ -123,26 +124,27 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase
 		return deserializationSchema.getProducedType();
 	}
 
-	@SuppressWarnings("unchecked")
-	public static <OUT> PubSubSourceBuilder<OUT, ? extends PubSubSource, ? extends PubSubSourceBuilder> newBuilder() {
-		return new PubSubSourceBuilder<>(new PubSubSource<OUT>());
+	public static <OUT> PubSubSourceBuilder<OUT, ? extends PubSubSource<OUT>, ? extends PubSubSourceBuilder<OUT, ?, ?>> newBuilder() {
+		return new PubSubSourceBuilder<>(new PubSubSource<>());
 	}
 
 	/**
 	 * Builder to create PubSubSource.
-	 * @param <OUT> The type of objects which will be read
-	 * @param <PSS> The type of PubSubSource
+	 *
+	 * @param <OUT>     The type of objects which will be read
+	 * @param <PSS>     The type of PubSubSource
 	 * @param <BUILDER> The type of Builder to create the PubSubSource
 	 */
+	@SuppressWarnings("unchecked")
 	public static class PubSubSourceBuilder<OUT, PSS extends PubSubSource<OUT>, BUILDER extends PubSubSourceBuilder<OUT, PSS, BUILDER>> {
-		protected PSS 							sourceUnderConstruction;
+		protected PSS sourceUnderConstruction;
 
-		private SubscriberWrapper               subscriberWrapper = null;
+		private SubscriberWrapper subscriberWrapper = null;
 		private SerializableCredentialsProvider serializableCredentialsProvider;
-		private DeserializationSchema<OUT>      deserializationSchema;
-		private String                          projectName;
-		private String                          subscriptionName;
-		private String                          hostAndPort;
+		private DeserializationSchema<OUT> deserializationSchema;
+		private String projectName;
+		private String subscriptionName;
+		private String hostAndPort;
 
 		protected PubSubSourceBuilder(PSS sourceUnderConstruction) {
 			this.sourceUnderConstruction = sourceUnderConstruction;
@@ -151,6 +153,7 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase
 		/**
 		 * Set the credentials.
 		 * If this is not used then the credentials are picked up from the environment variables.
+		 *
 		 * @param credentials the Credentials needed to connect.
 		 * @return The current PubSubSourceBuilder instance
 		 */
@@ -162,6 +165,7 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase
 		/**
 		 * Set the CredentialsProvider.
 		 * If this is not used then the credentials are picked up from the environment variables.
+		 *
 		 * @param credentialsProvider the custom SerializableCredentialsProvider instance.
 		 * @return The current PubSubSourceBuilder instance
 		 */
@@ -172,6 +176,7 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase
 		/**
 		 * Set the credentials to be absent.
 		 * This means that no credentials are to be used at all.
+		 *
 		 * @return The current PubSubSourceBuilder instance
 		 */
 		public BUILDER withoutCredentials() {
@@ -183,13 +188,13 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase
 		 * @param deserializationSchema Instance of a DeserializationSchema that converts the OUT into a byte[]
 		 * @return The current PubSubSourceBuilder instance
 		 */
-		public BUILDER withDeserializationSchema(DeserializationSchema <OUT> deserializationSchema) {
+		public BUILDER withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema) {
 			this.deserializationSchema = deserializationSchema;
 			return (BUILDER) this;
 		}
 
 		/**
-		 * @param projectName The name of the project in GoogleCloudPlatform
+		 * @param projectName      The name of the project in GoogleCloudPlatform
 		 * @param subscriptionName The name of the subscription in PubSub
 		 * @return The current PubSubSourceBuilder instance
 		 */
@@ -202,6 +207,7 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase
 		/**
 		 * Set the custom hostname/port combination of PubSub.
 		 * The ONLY reason to use this is during tests with the emulator provided by Google.
+		 *
 		 * @param hostAndPort The combination of hostname and port to connect to ("hostname:1234")
 		 * @return The current PubSubSourceBuilder instance
 		 */
@@ -213,6 +219,7 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase
 		/**
 		 * Set a complete SubscriberWrapper.
 		 * The ONLY reason to use this is during tests.
+		 *
 		 * @param subscriberWrapper The fully instantiated SubscriberWrapper
 		 * @return The current PubSubSourceBuilder instance
 		 */
@@ -223,8 +230,9 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase
 
 		/**
 		 * Actually build the desired instance of the PubSubSourceBuilder.
+		 *
 		 * @return a brand new SourceFunction
-		 * @throws IOException incase of a problem getting the credentials
+		 * @throws IOException              incase of a problem getting the credentials
 		 * @throws IllegalArgumentException incase required fields were not specified.
 		 */
 		public PSS build() throws IOException {
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java
index fb75f43..0595877 100644
--- a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java
@@ -33,13 +33,13 @@ import java.io.Serializable;
 
 class SubscriberWrapper implements Serializable {
 	private final SerializableCredentialsProvider serializableCredentialsProvider;
-	private final String                          projectId;
-	private final String                          subscriptionId;
-	private       String                          hostAndPort = null;
+	private final String projectId;
+	private final String subscriptionId;
+	private String hostAndPort = null;
 
-	private transient Subscriber       subscriber;
-	private transient ManagedChannel   managedChannel = null;
-	private transient TransportChannel channel        = null;
+	private transient Subscriber subscriber;
+	private transient ManagedChannel managedChannel = null;
+	private transient TransportChannel channel = null;
 
 	SubscriberWrapper(SerializableCredentialsProvider serializableCredentialsProvider, ProjectSubscriptionName projectSubscriptionName) {
 		this.serializableCredentialsProvider = serializableCredentialsProvider;
@@ -49,14 +49,14 @@ class SubscriberWrapper implements Serializable {
 
 	void initialize(MessageReceiver messageReceiver) {
 		Subscriber.Builder builder = Subscriber
-				.newBuilder(ProjectSubscriptionName.of(projectId, subscriptionId), messageReceiver)
-				.setCredentialsProvider(serializableCredentialsProvider);
+			.newBuilder(ProjectSubscriptionName.of(projectId, subscriptionId), messageReceiver)
+			.setCredentialsProvider(serializableCredentialsProvider);
 
 		if (hostAndPort != null) {
 			managedChannel = ManagedChannelBuilder
-					.forTarget(hostAndPort)
-					.usePlaintext(true) // This is 'Ok' because this is ONLY used for testing.
-					.build();
+				.forTarget(hostAndPort)
+				.usePlaintext(true) // This is 'Ok' because this is ONLY used for testing.
+				.build();
 			channel = GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build();
 			builder.setChannelProvider(FixedTransportChannelProvider.create(channel));
 		}
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java
index bd04058..44b1fa0 100644
--- a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java
@@ -40,7 +40,9 @@ public class SerializableCredentialsProvider implements CredentialsProvider, Ser
 	}
 
 	/**
-	 * Creates a SerializableCredentialsProvider for a PubSubSubscription based on environment variables. {@link com.google.cloud.pubsub.v1.SubscriptionAdminSettings}
+	 * Creates a SerializableCredentialsProvider for a PubSubSubscription based on environment variables.
+	 * {@link com.google.cloud.pubsub.v1.SubscriptionAdminSettings}
+	 *
 	 * @return serializableCredentialsProvider
 	 * @throws IOException thrown by {@link Credentials}
 	 */
@@ -50,11 +52,12 @@ public class SerializableCredentialsProvider implements CredentialsProvider, Ser
 	}
 
 	/**
-	 * Creates a SerializableCredentialsProvider for a PubSubSubscription without any credentials. {@link com.google.cloud.pubsub.v1.SubscriptionAdminSettings}
+	 * Creates a SerializableCredentialsProvider for a PubSubSubscription without any credentials.
+	 * {@link com.google.cloud.pubsub.v1.SubscriptionAdminSettings}
 	 * This is ONLY useful when running tests locally against Mockito or the Google PubSub emulator
-	 * @see <a href="https://cloud.google.com/pubsub/docs/emulator" target="_top">https://cloud.google.com/pubsub/docs/emulator</a>.
+	 * @see <a href="https://cloud.google.com/pubsub/docs/emulator" target="_top">https://cloud.google.com/pubsub/docs/emulator</a>
 	 * @return serializableCredentialsProvider
-	 * @throws IOException thrown by {@link Credentials}
+	 * @see <a href="https://cloud.google.com/pubsub/docs/emulator" target="_top">https://cloud.google.com/pubsub/docs/emulator</a>
 	 */
 	public static SerializableCredentialsProvider withoutCredentials() {
 		return new SerializableCredentialsProvider(NoCredentials.getInstance());
diff --git a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java
index f98731b..a340ae9 100644
--- a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java
+++ b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.streaming.connectors.pubsub;
 
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -5,9 +22,9 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.junit.Test;
 
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
 
 /**
  * Test for {@link Bound}.
@@ -107,6 +124,7 @@ public class BoundTest {
 		try {
 			Thread.sleep(sleepTime);
 		} catch (InterruptedException e) {
+			// Ignore any exceptions
 		}
 	}
 }
diff --git a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java
index 805f823..5f938fd 100644
--- a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java
+++ b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.streaming.connectors.pubsub;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -13,9 +30,9 @@ import org.junit.Test;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
 
 /**
  * Tests for {@link BoundedPubSubSource}.
@@ -63,8 +80,8 @@ public class BoundedPubSubSourceTest {
 
 	private PubsubMessage pubSubMessage() {
 		return PubsubMessage.newBuilder()
-				.setMessageId("message-id")
-				.setData(ByteString.copyFrom("some-message".getBytes()))
-				.build();
+			.setMessageId("message-id")
+			.setData(ByteString.copyFrom("some-message".getBytes()))
+			.build();
 	}
 }
diff --git a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java
index 73ca53b..9db5d7d 100644
--- a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java
+++ b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java
@@ -42,7 +42,6 @@ import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 import static org.mockito.internal.verification.VerificationModeFactory.times;
 
-
 /**
  * Test for {@link SourceFunction}.
  */
@@ -142,7 +141,7 @@ public class PubSubSourceTest {
 
 	private PubsubMessage pubSubMessage() {
 		return PubsubMessage.newBuilder()
-							.setData(ByteString.copyFrom(SERIALIZED_MESSAGE))
-							.build();
+			.setData(ByteString.copyFrom(SERIALIZED_MESSAGE))
+			.build();
 	}
 }
diff --git a/flink-connectors/flink-connector-pubsub/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-pubsub/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..b316a9a
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/test/resources/log4j-test.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+log4j.rootLogger=INFO, testlogger
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.out
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
diff --git a/flink-connectors/flink-connector-pubsub/pom.xml b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/pom.xml
similarity index 50%
copy from flink-connectors/flink-connector-pubsub/pom.xml
copy to flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/pom.xml
index a6e8d72..7dd0d15 100644
--- a/flink-connectors/flink-connector-pubsub/pom.xml
+++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/pom.xml
@@ -25,33 +25,75 @@ under the License.
 
 	<parent>
 		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-connectors</artifactId>
+		<artifactId>flink-end-to-end-tests</artifactId>
 		<version>1.7-SNAPSHOT</version>
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-connector-pubsub_${scala.binary.version}</artifactId>
-	<name>flink-connector-pubsub</name>
+	<artifactId>flink-connector-pubsub-emulator-tests</artifactId>
+	<name>flink-connector-pubsub-emulator-tests</name>
 
 	<packaging>jar</packaging>
 
-	<properties>
-		<pubsub.version>1.37.1</pubsub.version>
-	</properties>
+	<!-- This is the way we get a consistent set of versions of the Google tools -->
+	<dependencyManagement>
+		<dependencies>
+			<dependency>
+				<groupId>com.google.cloud</groupId>
+				<artifactId>google-cloud-bom</artifactId>
+				<version>0.53.0-alpha</version>
+				<type>pom</type>
+				<scope>import</scope>
+			</dependency>
+		</dependencies>
+	</dependencyManagement>
 
 	<dependencies>
 
+		<!--All dependencies are   <scope>test</scope> -->
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
-			<scope>provided</scope>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-pubsub_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>com.google.cloud</groupId>
 			<artifactId>google-cloud-pubsub</artifactId>
-			<version>${pubsub.version}</version>
+			<!-- Version is pulled from google-cloud-bom -->
+			<exclusions>
+				<!-- Exclude an old version of guava that is being pulled
+                in by a transitive dependency of google-api-client -->
+				<exclusion>
+					<groupId>com.google.guava</groupId>
+					<artifactId>guava-jdk5</artifactId>
+				</exclusion>
+			</exclusions>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<version>${slf4j.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<!-- This is used to run the local PubSub -->
+		<dependency>
+			<groupId>com.spotify</groupId>
+			<artifactId>docker-client</artifactId>
+			<version>8.11.7</version>
+			<scope>test</scope>
 		</dependency>
 
 		<dependency>
@@ -69,51 +111,39 @@ under the License.
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
-
 	</dependencies>
 
-    <build>
-        <plugins>
+	<!-- ONLY run the tests when explicitly told to do so  -->
+	<properties>
+		<skipTests>true</skipTests>
+	</properties>
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.12.4</version>
+				<configuration>
+					<skipTests>${skipTests}</skipTests>
+				</configuration>
+			</plugin>
+			<!-- Disabling convergence check because there are multiple problems within the used pubsub dependencies -->
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
+				<artifactId>maven-enforcer-plugin</artifactId>
 				<executions>
 					<execution>
-						<id>shade-flink</id>
-						<phase>package</phase>
+						<id>dependency-convergence</id>
 						<goals>
-							<goal>shade</goal>
+							<goal>enforce</goal>
 						</goals>
 						<configuration>
-							<shadeTestJar>false</shadeTestJar>
-							<artifactSet>
-								<includes>
-									<include>*:*</include>
-								</includes>
-							</artifactSet>
-							<relocations>
-								<relocation>
-									<pattern>com.google.guava</pattern>
-									<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.guava</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>com.google.common.base</pattern>
-									<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.common.base</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>io.grpc.auth</pattern>
-									<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.auth</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>io.grpc.protobuf</pattern>
-									<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.protobuf</shadedPattern>
-								</relocation>
-							</relocations>
+							<skip>true</skip>
 						</configuration>
 					</execution>
 				</executions>
 			</plugin>
-        </plugins>
-    </build>
+		</plugins>
+	</build>
 
 </project>
diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/CheckPubSubEmulatorTest.java b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/CheckPubSubEmulatorTest.java
new file mode 100644
index 0000000..a54d47b
--- /dev/null
+++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/CheckPubSubEmulatorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.connectors.pubsub.emulator.GCloudUnitTestBase;
+import org.apache.flink.streaming.connectors.pubsub.emulator.PubsubHelper;
+
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests to ensure the docker image with PubSub is working correctly.
+ */
+public class CheckPubSubEmulatorTest extends GCloudUnitTestBase {
+
+	private static final Logger LOG = LoggerFactory.getLogger(CheckPubSubEmulatorTest.class);
+
+	private static final String PROJECT_NAME = "Project";
+	private static final String TOPIC_NAME = "Topic";
+	private static final String SUBSCRIPTION_NAME = "Subscription";
+
+	private static PubsubHelper pubsubHelper = getPubsubHelper();
+
+	@BeforeClass
+	public static void setUp() throws Exception {
+		pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME);
+		pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME);
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		pubsubHelper.deleteSubscription(PROJECT_NAME, SUBSCRIPTION_NAME);
+		pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME);
+	}
+
+	@Test
+	public void testPull() throws Exception {
+		Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME);
+		publisher
+			.publish(PubsubMessage
+				.newBuilder()
+				.setData(ByteString.copyFromUtf8("Hello World PULL"))
+				.build())
+			.get();
+
+		List<ReceivedMessage> receivedMessages = pubsubHelper.pullMessages(PROJECT_NAME, SUBSCRIPTION_NAME, 10);
+		assertEquals(1, receivedMessages.size());
+		assertEquals("Hello World PULL", receivedMessages.get(0).getMessage().getData().toStringUtf8());
+
+		publisher.shutdown();
+	}
+
+	@Test
+	public void testPush() throws Exception {
+		List<PubsubMessage> receivedMessages = new ArrayList<>();
+		Subscriber subscriber = pubsubHelper.
+			subscribeToSubscription(
+				PROJECT_NAME,
+				SUBSCRIPTION_NAME,
+				(message, consumer) -> receivedMessages.add(message)
+			);
+
+		Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME);
+		publisher
+			.publish(PubsubMessage
+				.newBuilder()
+				.setData(ByteString.copyFromUtf8("Hello World"))
+				.build())
+			.get();
+
+		LOG.info("Waiting a while to receive the message...");
+		Thread.sleep(1000);
+
+		assertEquals(1, receivedMessages.size());
+		assertEquals("Hello World", receivedMessages.get(0).getData().toStringUtf8());
+
+		try {
+			subscriber.stopAsync().awaitTerminated(100, MILLISECONDS);
+		} catch (TimeoutException tme) {
+			// Yeah, whatever. Don't care about clean shutdown here.
+		}
+		publisher.shutdown();
+	}
+
+}
diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSinkTest.java b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSinkTest.java
new file mode 100644
index 0000000..165579f
--- /dev/null
+++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSinkTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.pubsub.emulator.GCloudUnitTestBase;
+import org.apache.flink.streaming.connectors.pubsub.emulator.PubsubHelper;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test of the PubSub SINK with the Google PubSub emulator.
+ */
+public class EmulatedPubSubSinkTest extends GCloudUnitTestBase {
+
+	private static final Logger LOG = LoggerFactory.getLogger(EmulatedPubSubSinkTest.class);
+
+	private static final String PROJECT_NAME = "FLProject";
+	private static final String TOPIC_NAME = "FLTopic";
+	private static final String SUBSCRIPTION_NAME = "FLSubscription";
+
+	private static PubsubHelper pubsubHelper;
+
+	@BeforeClass
+	public static void setUp() throws Exception {
+		pubsubHelper = getPubsubHelper();
+		pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME);
+		pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME);
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		pubsubHelper.deleteSubscription(PROJECT_NAME, SUBSCRIPTION_NAME);
+		pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME);
+	}
+
+	@Test
+	public void testFlinkSink() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		List<String> input = Arrays.asList("One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eigth", "Nine", "Ten");
+
+		// Create test stream
+		DataStream<String> theData = env
+			.fromCollection(input)
+			.name("Test input")
+			.map((MapFunction<String, String>) StringUtils::reverse);
+
+		// Sink into pubsub
+		theData
+			.addSink(PubSubSink.<String>newBuilder()
+				.withProjectName(PROJECT_NAME)
+				.withTopicName(TOPIC_NAME)
+				.withSerializationSchema(new SimpleStringSchema())
+				// Specific for emulator
+				.withCredentialsProvider(getPubsubHelper().getCredentialsProvider())
+				.withHostAndPort(getPubSubHostPort())
+				.build())
+			.name("PubSub sink");
+
+		// Run
+		env.execute();
+
+		// Now get the result from PubSub and verify if everything is there
+		List<ReceivedMessage> receivedMessages = pubsubHelper.pullMessages(PROJECT_NAME, SUBSCRIPTION_NAME, 100);
+
+		assertEquals("Wrong number of elements", input.size(), receivedMessages.size());
+
+		// Check output strings
+		List<String> output = new ArrayList<>();
+		receivedMessages.forEach(msg -> output.add(msg.getMessage().getData().toStringUtf8()));
+
+		for (String test : input) {
+			assertTrue("Missing " + test, output.contains(StringUtils.reverse(test)));
+		}
+	}
+
+}
diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSourceTest.java b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSourceTest.java
new file mode 100644
index 0000000..aaa4fc3
--- /dev/null
+++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSourceTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.pubsub.emulator.GCloudUnitTestBase;
+import org.apache.flink.streaming.connectors.pubsub.emulator.PubsubHelper;
+
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test of the PubSub SOURCE with the Google PubSub emulator.
+ */
+public class EmulatedPubSubSourceTest extends GCloudUnitTestBase {
+
+	private static final Logger LOG = LoggerFactory.getLogger(EmulatedPubSubSourceTest.class);
+
+	private static final String PROJECT_NAME = "FLProject";
+	private static final String TOPIC_NAME = "FLTopic";
+	private static final String SUBSCRIPTION_NAME = "FLSubscription";
+
+	private static PubsubHelper pubsubHelper;
+
+	@BeforeClass
+	public static void setUp() throws Exception {
+		pubsubHelper = getPubsubHelper();
+		pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME);
+		pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME);
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		pubsubHelper.deleteSubscription(PROJECT_NAME, SUBSCRIPTION_NAME);
+		pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME);
+	}
+
+	@Test
+	public void testFlinkSource() throws Exception {
+		// Create some messages and put them into pubsub
+		List<String> input = Arrays.asList("One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eigth", "Nine", "Ten");
+
+		// Publish the messages into PubSub
+		Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME);
+		input.forEach(s -> {
+			try {
+				publisher
+					.publish(PubsubMessage
+						.newBuilder()
+						.setData(ByteString.copyFromUtf8(s))
+						.build())
+					.get();
+			} catch (InterruptedException | ExecutionException e) {
+				e.printStackTrace();
+			}
+		});
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.enableCheckpointing(100);
+
+		DataStream<String> fromPubSub = env
+			.addSource(BoundedPubSubSource.<String>newBuilder()
+				.withDeserializationSchema(new SimpleStringSchema())
+				.withProjectSubscriptionName(PROJECT_NAME, SUBSCRIPTION_NAME)
+				// Specific for emulator
+				.withCredentialsProvider(getPubsubHelper().getCredentialsProvider())
+				.withHostAndPort(getPubSubHostPort())
+				// Make sure the test topology self terminates
+				.boundedByTimeSinceLastMessage(1000)
+				.build())
+			.name("PubSub source");
+
+		List<String> output = new ArrayList<>();
+		fromPubSub.writeUsingOutputFormat(new LocalCollectionOutputFormat<>(output));
+
+		env.execute();
+
+		assertEquals("Wrong number of elements", input.size(), output.size());
+		for (String test : input) {
+			assertTrue("Missing " + test, output.contains(test));
+		}
+	}
+
+}
diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudEmulatorManager.java b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudEmulatorManager.java
new file mode 100644
index 0000000..27a658a
--- /dev/null
+++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudEmulatorManager.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub.emulator;
+
+import com.spotify.docker.client.DefaultDockerClient;
+import com.spotify.docker.client.DockerClient;
+import com.spotify.docker.client.exceptions.ContainerNotFoundException;
+import com.spotify.docker.client.exceptions.DockerCertificateException;
+import com.spotify.docker.client.exceptions.DockerException;
+import com.spotify.docker.client.exceptions.ImageNotFoundException;
+import com.spotify.docker.client.messages.ContainerConfig;
+import com.spotify.docker.client.messages.ContainerCreation;
+import com.spotify.docker.client.messages.ContainerInfo;
+import com.spotify.docker.client.messages.HostConfig;
+import com.spotify.docker.client.messages.PortBinding;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * The class that handles the starting and stopping of the emulator docker image.
+ */
+public class GCloudEmulatorManager {
+
+	private static final Logger LOG = LoggerFactory.getLogger(GCloudEmulatorManager.class);
+
+	private static DockerClient docker;
+
+	private static String dockerIpAddress = "127.0.0.1";
+
+	public static final String INTERNAL_PUBSUB_PORT = "22222";
+	public static final String DOCKER_IMAGE_NAME = "google/cloud-sdk:latest";
+
+	private static String pubsubPort;
+
+	public static String getDockerIpAddress() {
+		if (dockerIpAddress == null) {
+			throw new IllegalStateException("The docker has not yet been started (yet) so you cannot get the IP address yet.");
+		}
+		return dockerIpAddress;
+	}
+
+	public static String getDockerPubSubPort() {
+		if (pubsubPort == null) {
+			throw new IllegalStateException("The docker has not yet been started (yet) so you cannot get the port information yet.");
+		}
+		return pubsubPort;
+	}
+
+	public static final String UNITTEST_PROJECT_ID = "running-from-junit-for-flink";
+	private static final String CONTAINER_NAME_JUNIT = (DOCKER_IMAGE_NAME + "_" + UNITTEST_PROJECT_ID).replaceAll("[^a-zA-Z0-9_]", "_");
+
+	public static void launchDocker() throws DockerException, InterruptedException, DockerCertificateException {
+		// Create a client based on DOCKER_HOST and DOCKER_CERT_PATH env vars
+		docker = DefaultDockerClient.fromEnv().build();
+
+		terminateAndDiscardAnyExistingContainers(true);
+
+		LOG.info("");
+		LOG.info("/===========================================");
+		LOG.info("| GCloud Emulator");
+
+		ContainerInfo containerInfo;
+		String id;
+
+		try {
+			docker.inspectImage(DOCKER_IMAGE_NAME);
+		} catch (ImageNotFoundException e) {
+			// No such image so we must download it first.
+			LOG.info("| - Getting docker image \"{}\"", DOCKER_IMAGE_NAME);
+			docker.pull(DOCKER_IMAGE_NAME, message -> {
+				if (message.id() != null && message.progress() != null) {
+					LOG.info("| - Downloading > {} : {}", message.id(), message.progress());
+				}
+			});
+		}
+
+		// No such container. Good, we create one!
+		LOG.info("| - Creating new container");
+
+		// Bind container ports to host ports
+		final Map<String, List<PortBinding>> portBindings = new HashMap<>();
+		portBindings.put(INTERNAL_PUBSUB_PORT, Collections.singletonList(PortBinding.randomPort("0.0.0.0")));
+
+		final HostConfig hostConfig = HostConfig.builder().portBindings(portBindings).build();
+
+		// Create new container with exposed ports
+		final ContainerConfig containerConfig = ContainerConfig.builder()
+			.hostConfig(hostConfig)
+			.exposedPorts(INTERNAL_PUBSUB_PORT)
+			.image(DOCKER_IMAGE_NAME)
+			.cmd("sh", "-c", "mkdir -p /opt/data/pubsub ; gcloud beta emulators pubsub start --data-dir=/opt/data/pubsub  --host-port=0.0.0.0:" + INTERNAL_PUBSUB_PORT)
+			.build();
+
+		final ContainerCreation creation = docker.createContainer(containerConfig, CONTAINER_NAME_JUNIT);
+		id = creation.id();
+
+		containerInfo = docker.inspectContainer(id);
+
+		if (!containerInfo.state().running()) {
+			LOG.warn("| - Starting it up ....");
+			docker.startContainer(id);
+			Thread.sleep(1000);
+		}
+
+		containerInfo = docker.inspectContainer(id);
+
+		dockerIpAddress = "127.0.0.1";
+
+		Map<String, List<PortBinding>> ports = containerInfo.networkSettings().ports();
+
+		assertNotNull("Unable to retrieve the ports where to connect to the emulators", ports);
+		assertEquals("We expect 1 port to be mapped", 1, ports.size());
+
+		pubsubPort = getPort(ports, INTERNAL_PUBSUB_PORT, "PubSub");
+
+		LOG.info("| Waiting for the emulators to be running");
+
+		// PubSub exposes an "Ok" at the root url when running.
+		if (!waitForOkStatus("PubSub", pubsubPort)) {
+			// Oops, we did not get an "Ok" within 10 seconds
+			startHasFailedKillEverything();
+		}
+		LOG.info("\\===========================================");
+		LOG.info("");
+	}
+
+	private static void startHasFailedKillEverything() throws DockerException, InterruptedException {
+		LOG.error("|");
+		LOG.error("| ==================== ");
+		LOG.error("| YOUR TESTS WILL FAIL ");
+		LOG.error("| ==================== ");
+		LOG.error("|");
+
+		// Kill this container and wipe all connection information
+		dockerIpAddress = null;
+		pubsubPort = null;
+		terminateAndDiscardAnyExistingContainers(false);
+	}
+
+	private static final long MAX_RETRY_TIMEOUT = 10000; // Milliseconds
+
+	private static boolean waitForOkStatus(String label, String port) {
+		long start = System.currentTimeMillis();
+		while (true) {
+			try {
+				URL url = new URL("http://" + dockerIpAddress + ":" + port + "/");
+				HttpURLConnection con = (HttpURLConnection) url.openConnection();
+				con.setRequestMethod("GET");
+				con.setConnectTimeout(50);
+				con.setReadTimeout(50);
+
+				BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
+				String inputLine;
+				StringBuilder content = new StringBuilder();
+				while ((inputLine = in.readLine()) != null) {
+					content.append(inputLine);
+				}
+				in.close();
+				con.disconnect();
+				if (content.toString().contains("Ok")) {
+					LOG.info("| - {} Emulator is running at {}:{}", label, dockerIpAddress, port);
+					return true;
+				}
+			} catch (IOException e) {
+				long now = System.currentTimeMillis();
+				if (now - start > MAX_RETRY_TIMEOUT) {
+					LOG.error("| - PubSub Emulator at {}:{} FAILED to return an Ok status within {} ms ", dockerIpAddress, port, MAX_RETRY_TIMEOUT);
+					return false;
+				}
+				try {
+					Thread.sleep(100); // Sleep a very short time
+				} catch (InterruptedException e1) {
+					// Ignore
+				}
+			}
+		}
+	}
+
+	private static String getPort(Map<String, List<PortBinding>> ports, String internalTCPPort, String label) {
+		List<PortBinding> portMappings = ports.get(internalTCPPort + "/tcp");
+		if (portMappings == null || portMappings.isEmpty()) {
+			LOG.info("| {} Emulator {} --> NOTHING CONNECTED TO {}", label, internalTCPPort + "/tcp");
+			return null;
+		}
+
+		return portMappings.get(0).hostPort();
+	}
+
+	private static void terminateAndDiscardAnyExistingContainers(boolean warnAboutExisting) throws DockerException, InterruptedException {
+		ContainerInfo containerInfo;
+		try {
+			containerInfo = docker.inspectContainer(CONTAINER_NAME_JUNIT);
+			// Already have this container running.
+
+			assertNotNull("We should either we get containerInfo or we get an exception", containerInfo);
+
+			LOG.info("");
+			LOG.info("/===========================================");
+			if (warnAboutExisting) {
+				LOG.warn("|    >>> FOUND OLD EMULATOR INSTANCE RUNNING <<< ");
+				LOG.warn("| Destroying that one to keep tests running smoothly.");
+			}
+			LOG.info("| Cleanup of GCloud Emulator");
+
+			// We REQUIRE 100% accurate side effect free unit tests
+			// So we completely discard this one.
+
+			String id = containerInfo.id();
+			// Kill container
+			if (containerInfo.state().running()) {
+				docker.killContainer(id);
+				LOG.info("| - Killed");
+			}
+
+			// Remove container
+			docker.removeContainer(id);
+
+			LOG.info("| - Removed");
+			LOG.info("\\===========================================");
+			LOG.info("");
+
+		} catch (ContainerNotFoundException cnfe) {
+			// No such container. Good !
+		}
+	}
+
+	public static void terminateDocker() throws DockerException, InterruptedException {
+		terminateAndDiscardAnyExistingContainers(false);
+
+		// Close the docker client
+		docker.close();
+	}
+
+	// ====================================================================================
+
+}
diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudUnitTestBase.java b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudUnitTestBase.java
new file mode 100644
index 0000000..b6a011a
--- /dev/null
+++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudUnitTestBase.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub.emulator;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.spotify.docker.client.exceptions.DockerException;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.Serializable;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.streaming.connectors.pubsub.emulator.GCloudEmulatorManager.getDockerIpAddress;
+import static org.apache.flink.streaming.connectors.pubsub.emulator.GCloudEmulatorManager.getDockerPubSubPort;
+
+/**
+ * The base class from which unit tests should inherit if they need to use the Google cloud emulators.
+ */
+public class GCloudUnitTestBase implements Serializable {
+	@BeforeClass
+	public static void launchGCloudEmulator() throws Exception {
+		// Separated out into separate class so the entire test class to be serializable
+		GCloudEmulatorManager.launchDocker();
+	}
+
+	@AfterClass
+	public static void terminateGCloudEmulator() throws DockerException, InterruptedException {
+		GCloudEmulatorManager.terminateDocker();
+	}
+
+	// ====================================================================================
+	// Pubsub helpers
+
+	private static ManagedChannel channel = null;
+	private static TransportChannelProvider channelProvider = null;
+	private static CredentialsProvider credentialsProvider = null;
+
+	public static PubsubHelper getPubsubHelper() {
+		if (channel == null) {
+			//noinspection deprecation
+			channel = ManagedChannelBuilder
+				.forTarget(getPubSubHostPort())
+				.usePlaintext(true)
+				.build();
+			channelProvider = FixedTransportChannelProvider
+				.create(GrpcTransportChannel.create(channel));
+			credentialsProvider = NoCredentialsProvider.create();
+		}
+		return new PubsubHelper(channelProvider, credentialsProvider);
+	}
+
+	public static String getPubSubHostPort() {
+		return getDockerIpAddress() + ":" + getDockerPubSubPort();
+	}
+
+	@AfterClass
+	public static void cleanupPubsubChannel() throws InterruptedException {
+		if (channel != null) {
+			channel.shutdownNow().awaitTermination(1, SECONDS);
+			channel = null;
+		}
+	}
+}
diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/PubsubHelper.java b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/PubsubHelper.java
new file mode 100644
index 0000000..f08576b
--- /dev/null
+++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/PubsubHelper.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub.emulator;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.rpc.NotFoundException;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.PushConfig;
+import com.google.pubsub.v1.ReceivedMessage;
+import com.google.pubsub.v1.Topic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A helper class to make managing the testing topics a bit easier.
+ */
+public class PubsubHelper {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PubsubHelper.class);
+
+	private TransportChannelProvider channelProvider = null;
+	private CredentialsProvider credentialsProvider = null;
+
+	private TopicAdminClient topicClient;
+	private SubscriptionAdminClient subscriptionAdminClient;
+
+	public PubsubHelper() {
+		this(TopicAdminSettings.defaultTransportChannelProvider(),
+			TopicAdminSettings.defaultCredentialsProviderBuilder().build());
+	}
+
+	public PubsubHelper(TransportChannelProvider channelProvider, CredentialsProvider credentialsProvider) {
+		this.channelProvider = channelProvider;
+		this.credentialsProvider = credentialsProvider;
+	}
+
+	public TransportChannelProvider getChannelProvider() {
+		return channelProvider;
+	}
+
+	public CredentialsProvider getCredentialsProvider() {
+		return credentialsProvider;
+	}
+
+	public TopicAdminClient getTopicAdminClient() throws IOException {
+		if (topicClient == null) {
+			TopicAdminSettings topicAdminSettings = TopicAdminSettings.newBuilder()
+				.setTransportChannelProvider(channelProvider)
+				.setCredentialsProvider(credentialsProvider)
+				.build();
+			topicClient = TopicAdminClient.create(topicAdminSettings);
+		}
+		return topicClient;
+	}
+
+	public Topic createTopic(String project, String topic) throws IOException {
+		deleteTopic(project, topic);
+		ProjectTopicName topicName = ProjectTopicName.of(project, topic);
+		TopicAdminClient adminClient = getTopicAdminClient();
+		LOG.info("CreateTopic {}", topicName);
+		return adminClient.createTopic(topicName);
+	}
+
+	public void deleteTopic(String project, String topic) throws IOException {
+		deleteTopic(ProjectTopicName.of(project, topic));
+	}
+
+	public void deleteTopic(ProjectTopicName topicName) throws IOException {
+//        LOG.info("CreateTopic {}", topicName);
+		TopicAdminClient adminClient = getTopicAdminClient();
+		try {
+			Topic existingTopic = adminClient.getTopic(topicName);
+
+			// If it exists we delete all subscriptions and the topic itself.
+			LOG.info("DeleteTopic {} first delete old subscriptions.", topicName);
+			adminClient
+				.listTopicSubscriptions(topicName)
+				.iterateAllAsProjectSubscriptionName()
+				.forEach(subscriptionAdminClient::deleteSubscription);
+			LOG.info("DeleteTopic {}", topicName);
+			adminClient
+				.deleteTopic(topicName);
+		} catch (NotFoundException e) {
+			// Doesn't exist. Good.
+		}
+	}
+
+	public SubscriptionAdminClient getSubscriptionAdminClient() throws IOException {
+		if (subscriptionAdminClient == null) {
+			SubscriptionAdminSettings subscriptionAdminSettings =
+				SubscriptionAdminSettings
+					.newBuilder()
+					.setTransportChannelProvider(channelProvider)
+					.setCredentialsProvider(credentialsProvider)
+					.build();
+			subscriptionAdminClient = SubscriptionAdminClient.create(subscriptionAdminSettings);
+		}
+		return subscriptionAdminClient;
+	}
+
+	public void createSubscription(String subscriptionProject, String subscription, String topicProject, String topic) throws IOException {
+		ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.newBuilder()
+			.setProject(subscriptionProject)
+			.setSubscription(subscription)
+			.build();
+
+		deleteSubscription(subscriptionName);
+
+		SubscriptionAdminClient adminClient = getSubscriptionAdminClient();
+
+		ProjectTopicName topicName = ProjectTopicName.of(topicProject, topic);
+
+		PushConfig pushConfig = PushConfig.getDefaultInstance();
+
+		LOG.info("CreateSubscription {}", subscriptionName);
+		getSubscriptionAdminClient().createSubscription(subscriptionName, topicName, pushConfig, 1);
+	}
+
+	public void deleteSubscription(String subscriptionProject, String subscription) throws IOException {
+		deleteSubscription(ProjectSubscriptionName
+			.newBuilder()
+			.setProject(subscriptionProject)
+			.setSubscription(subscription)
+			.build());
+	}
+
+	public void deleteSubscription(ProjectSubscriptionName subscriptionName) throws IOException {
+		SubscriptionAdminClient adminClient = getSubscriptionAdminClient();
+		try {
+			adminClient.getSubscription(subscriptionName);
+			// If it already exists we must first delete it.
+			LOG.info("DeleteSubscription {}", subscriptionName);
+			adminClient.deleteSubscription(subscriptionName);
+		} catch (NotFoundException e) {
+			// Doesn't exist. Good.
+		}
+	}
+
+	// Mostly copied from the example on https://cloud.google.com/pubsub/docs/pull
+	public List<ReceivedMessage> pullMessages(String projectId, String subscriptionId, int maxNumberOfMessages) throws Exception {
+		SubscriberStubSettings subscriberStubSettings =
+			SubscriberStubSettings.newBuilder()
+				.setTransportChannelProvider(channelProvider)
+				.setCredentialsProvider(credentialsProvider)
+				.build();
+		try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
+			// String projectId = "my-project-id";
+			// String subscriptionId = "my-subscription-id";
+			// int numOfMessages = 10;   // max number of messages to be pulled
+			String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
+			PullRequest pullRequest =
+				PullRequest.newBuilder()
+					.setMaxMessages(maxNumberOfMessages)
+					.setReturnImmediately(false) // return immediately if messages are not available
+					.setSubscription(subscriptionName)
+					.build();
+
+			// use pullCallable().futureCall to asynchronously perform this operation
+			PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
+			List<String> ackIds = new ArrayList<>();
+			for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
+				// handle received message
+				// ...
+				ackIds.add(message.getAckId());
+			}
+			// acknowledge received messages
+			AcknowledgeRequest acknowledgeRequest =
+				AcknowledgeRequest.newBuilder()
+					.setSubscription(subscriptionName)
+					.addAllAckIds(ackIds)
+					.build();
+			// use acknowledgeCallable().futureCall to asynchronously perform this operation
+			subscriber.acknowledgeCallable().call(acknowledgeRequest);
+			return pullResponse.getReceivedMessagesList();
+		}
+	}
+
+	public Subscriber subscribeToSubscription(String project, String subscription, MessageReceiver messageReceiver) {
+		ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(project, subscription);
+		Subscriber subscriber =
+			Subscriber
+				.newBuilder(subscriptionName, messageReceiver)
+				.setChannelProvider(channelProvider)
+				.setCredentialsProvider(credentialsProvider)
+				.build();
+		subscriber.startAsync();
+		return subscriber;
+	}
+
+	public Publisher createPublisher(String project, String topic) throws IOException {
+		return Publisher
+			.newBuilder(ProjectTopicName.of(project, topic))
+			.setChannelProvider(channelProvider)
+			.setCredentialsProvider(credentialsProvider)
+			.build();
+	}
+
+}
diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/resources/log4j-test.properties b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..b316a9a
--- /dev/null
+++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/resources/log4j-test.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+log4j.rootLogger=INFO, testlogger
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.out
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 0950e2f..b17dc70 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -61,6 +61,7 @@ under the License.
 		<module>flink-metrics-availability-test</module>
 		<module>flink-metrics-reporter-prometheus-test</module>
 		<module>flink-heavy-deployment-stress-test</module>
+		<module>flink-connector-pubsub-emulator-tests</module>
 		<module>flink-streaming-kafka-test-base</module>
 		<module>flink-streaming-kafka-test</module>
 		<module>flink-streaming-kafka011-test</module>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index af21512..957b3c6 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -136,6 +136,8 @@ run_test "Elasticsearch (v6.3.1) sink end-to-end test" "$END_TO_END_DIR/test-scr
 run_test "Quickstarts Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh java"
 run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala"
 
+run_test "Test PubSub connector with Docker based Google PubSub Emulator" "$END_TO_END_DIR/test-scripts/test_streaming_pubsub.sh"
+
 run_test "Avro Confluent Schema Registry nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_confluent_schema_registry.sh"
 
 run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh file"
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_pubsub.sh b/flink-end-to-end-tests/test-scripts/test_streaming_pubsub.sh
new file mode 100755
index 0000000..8e08385
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_pubsub.sh
@@ -0,0 +1,22 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+cd "${END_TO_END_DIR}/flink-connector-pubsub-emulator-tests"
+
+mvn test -DskipTests=false
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java
index 612da41..c5791bd 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.examples.pubsub;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.pubsub.PubSubSink;
-import org.apache.flink.streaming.connectors.pubsub.PubSubSourceBuilder;
+import org.apache.flink.streaming.connectors.pubsub.PubSubSource;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,7 +45,7 @@ public class PubSubExample {
 			System.out.println("Missing parameters!\n" +
 								"Usage: flink run PubSub.jar --input-subscription <subscription> --input-topicName <topic> --output-topicName " +
 								"--google-project <google project name> ");
-			//return;
+			return;
 		}
 
 		String projectName = parameterTool.getRequired("google-project");
@@ -62,10 +62,9 @@ public class PubSubExample {
 	private static void runFlinkJob(String projectName, String subscriptionName, String outputTopicName) throws Exception {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-		env.addSource(PubSubSourceBuilder.<Integer>builder()
+		env.addSource(PubSubSource.<Integer>newBuilder()
 										.withProjectSubscriptionName(projectName, subscriptionName)
 										.withDeserializationSchema(new IntegerSerializer())
-										.withMode(PubSubSourceBuilder.Mode.NONE)
 										.build())
 			.map(PubSubExample::printAndReturn).disableChaining()
 			.addSink(PubSubSink.<Integer>newBuilder()