You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2019/07/05 12:46:16 UTC

[flink] 01/05: [FLINK-9311] [pubsub] Add PubSubSource and PubSubSink connectors

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0937601972674cc548446616796566ad2eab9b81
Author: Richard Deurwaarder <ri...@xeli.eu>
AuthorDate: Sat May 26 14:59:32 2018 +0200

    [FLINK-9311] [pubsub] Add PubSubSource and PubSubSink connectors
    
    This closes #6594
---
 flink-connectors/flink-connector-pubsub/pom.xml    | 119 ++++++++++
 .../flink/streaming/connectors/pubsub/Bound.java   |  99 ++++++++
 .../connectors/pubsub/BoundedPubSubSource.java     |  79 +++++++
 .../streaming/connectors/pubsub/PubSubSink.java    | 241 +++++++++++++++++++
 .../streaming/connectors/pubsub/PubSubSource.java  | 257 +++++++++++++++++++++
 .../connectors/pubsub/SubscriberWrapper.java       | 104 +++++++++
 .../common/SerializableCredentialsProvider.java    |  67 ++++++
 .../streaming/connectors/pubsub/BoundTest.java     | 112 +++++++++
 .../connectors/pubsub/BoundedPubSubSourceTest.java |  70 ++++++
 .../connectors/pubsub/PubSubSourceTest.java        | 148 ++++++++++++
 .../connectors/pubsub/SubscriberWrapperTest.java   |  57 +++++
 flink-connectors/pom.xml                           |   1 +
 flink-examples/flink-examples-streaming/pom.xml    |  12 +
 .../examples/pubsub/IntegerSerializer.java         |  48 ++++
 .../streaming/examples/pubsub/PubSubExample.java   |  84 +++++++
 .../streaming/examples/pubsub/PubSubPublisher.java |  64 +++++
 16 files changed, 1562 insertions(+)

diff --git a/flink-connectors/flink-connector-pubsub/pom.xml b/flink-connectors/flink-connector-pubsub/pom.xml
new file mode 100644
index 0000000..a6e8d72
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/pom.xml
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.7-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-pubsub_${scala.binary.version}</artifactId>
+	<name>flink-connector-pubsub</name>
+
+	<packaging>jar</packaging>
+
+	<properties>
+		<pubsub.version>1.37.1</pubsub.version>
+	</properties>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>com.google.cloud</groupId>
+			<artifactId>google-cloud-pubsub</artifactId>
+			<version>${pubsub.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+    <build>
+        <plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<shadeTestJar>false</shadeTestJar>
+							<artifactSet>
+								<includes>
+									<include>*:*</include>
+								</includes>
+							</artifactSet>
+							<relocations>
+								<relocation>
+									<pattern>com.google.guava</pattern>
+									<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.guava</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>com.google.common.base</pattern>
+									<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.common.base</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>io.grpc.auth</pattern>
+									<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.auth</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>io.grpc.protobuf</pattern>
+									<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.protobuf</shadedPattern>
+								</relocation>
+							</relocations>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
new file mode 100644
index 0000000..b37cc45
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
@@ -0,0 +1,99 @@
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Timer;
+import java.util.TimerTask;
+
+class Bound<OUT> implements Serializable {
+	private static final Logger LOG = LoggerFactory.getLogger(Bound.class);
+
+	private final Bound.Mode mode;
+	private final long maxMessagedReceived;
+	private final long maxTimeBetweenMessages;
+
+	private SourceFunction<OUT> sourceFunction;
+	private transient Timer timer;
+	private long messagesReceived;
+	private long lastReceivedMessage;
+	private boolean cancelled = false;
+
+	private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) {
+		this.mode = mode;
+		this.maxMessagedReceived = maxMessagedReceived;
+		this.maxTimeBetweenMessages = maxTimeBetweenMessages;
+		this.messagesReceived = 0L;
+	}
+
+	static <OUT> Bound<OUT> boundByAmountOfMessages(long maxMessagedReceived) {
+		return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L);
+	}
+
+	static <OUT> Bound<OUT> boundByTimeSinceLastMessage(long maxTimeBetweenMessages) {
+		return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages);
+	}
+
+	static <OUT> Bound<OUT> boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long maxTimeBetweenMessages) {
+		return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, maxTimeBetweenMessages);
+	}
+
+	private TimerTask shutdownPubSubSource() {
+		return new TimerTask() {
+			@Override
+			public void run() {
+				if (maxTimeBetweenMessagesElapsed()) {
+					cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source");
+					timer.cancel();
+				}
+			}
+		};
+	}
+
+	private synchronized boolean maxTimeBetweenMessagesElapsed() {
+		return System.currentTimeMillis() - lastReceivedMessage > maxTimeBetweenMessages;
+	}
+
+	private synchronized void cancelPubSubSource(String logMessage) {
+		if (!cancelled) {
+			cancelled = true;
+			sourceFunction.cancel();
+			LOG.info(logMessage);
+		}
+	}
+
+	void start(SourceFunction<OUT> sourceFunction) {
+		if (this.sourceFunction != null) {
+			throw new IllegalStateException("start() already called");
+		}
+
+		this.sourceFunction = sourceFunction;
+		messagesReceived = 0;
+
+		if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) {
+			lastReceivedMessage = System.currentTimeMillis();
+			timer = new Timer();
+			timer.schedule(shutdownPubSubSource(), 0, 100);
+		}
+	}
+
+	synchronized void receivedMessage() {
+		if (sourceFunction == null) {
+			throw new IllegalStateException("start() not called");
+		}
+
+		lastReceivedMessage = System.currentTimeMillis();
+		messagesReceived++;
+
+		if ((mode == Mode.COUNTER || mode == Mode.COUNTER_OR_TIMER) && messagesReceived >= maxMessagedReceived) {
+			cancelPubSubSource("BoundedSourceFunction: Max received messages --> canceling source");
+		}
+	}
+
+	private enum Mode {
+		COUNTER, TIMER, COUNTER_OR_TIMER
+	}
+}
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java
new file mode 100644
index 0000000..5f829ae
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java
@@ -0,0 +1,79 @@
+package org.apache.flink.streaming.connectors.pubsub;
+
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.pubsub.v1.PubsubMessage;
+
+import java.io.IOException;
+
+class BoundedPubSubSource<OUT> extends PubSubSource<OUT> {
+	private Bound<OUT> bound;
+
+	private BoundedPubSubSource() {
+		super();
+	}
+
+	protected void setBound(Bound<OUT> bound) {
+		this.bound = bound;
+	}
+
+	@Override
+	public void run(SourceContext<OUT> sourceContext) {
+		bound.start(this);
+		super.run(sourceContext);
+	}
+
+	@Override
+	public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
+		super.receiveMessage(message, consumer);
+		bound.receivedMessage();
+	}
+
+	@SuppressWarnings("unchecked")
+	public static <OUT> BoundedPubSubSourceBuilder<OUT, ? extends PubSubSource, ? extends BoundedPubSubSourceBuilder> newBuilder() {
+		return new BoundedPubSubSourceBuilder<>(new BoundedPubSubSource<OUT>());
+	}
+
+	@SuppressWarnings("unchecked")
+	public static class BoundedPubSubSourceBuilder<OUT, PSS extends BoundedPubSubSource<OUT>, BUILDER extends BoundedPubSubSourceBuilder<OUT, PSS, BUILDER>> extends PubSubSourceBuilder<OUT, PSS, BUILDER> {
+		private Long boundedByAmountOfMessages;
+		private Long boundedByTimeSinceLastMessage;
+
+		BoundedPubSubSourceBuilder(PSS sourceUnderConstruction) {
+			super(sourceUnderConstruction);
+		}
+
+		public BUILDER boundedByAmountOfMessages(long maxAmountOfMessages) {
+			boundedByAmountOfMessages = maxAmountOfMessages;
+			return (BUILDER) this;
+		}
+
+		public BUILDER boundedByTimeSinceLastMessage(long timeSinceLastMessage) {
+			boundedByTimeSinceLastMessage = timeSinceLastMessage;
+			return (BUILDER) this;
+		}
+
+		private Bound <OUT> createBound() {
+			if (boundedByAmountOfMessages != null && boundedByTimeSinceLastMessage != null) {
+				return Bound.boundByAmountOfMessagesOrTimeSinceLastMessage(boundedByAmountOfMessages, boundedByTimeSinceLastMessage);
+			}
+
+			if (boundedByAmountOfMessages != null) {
+				return Bound.boundByAmountOfMessages(boundedByAmountOfMessages);
+			}
+
+			if (boundedByTimeSinceLastMessage != null) {
+				return Bound.boundByTimeSinceLastMessage(boundedByTimeSinceLastMessage);
+			}
+
+			// This is functionally speaking no bound.
+			return Bound.boundByAmountOfMessages(Long.MAX_VALUE);
+		}
+
+		@Override
+		public PSS build() throws IOException {
+			sourceUnderConstruction.setBound(createBound());
+			return super.build();
+		}
+	}
+
+}
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
new file mode 100644
index 0000000..92c9c6c
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import com.google.api.gax.core.NoCredentialsProvider;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannel;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+import java.io.IOException;
+
+/**
+ * A sink function that outputs to PubSub.
+ * @param <IN> type of PubSubSink messages to write
+ */
+public class PubSubSink<IN> extends RichSinkFunction<IN> {
+
+	private final SerializableCredentialsProvider serializableCredentialsProvider;
+	private final SerializationSchema<IN>         serializationSchema;
+	private final String                          projectName;
+	private final String                          topicName;
+	private       String                          hostAndPort = null;
+
+	private transient Publisher publisher;
+
+	public PubSubSink(SerializableCredentialsProvider serializableCredentialsProvider, SerializationSchema<IN> serializationSchema, String projectName, String topicName) {
+		this.serializableCredentialsProvider = serializableCredentialsProvider;
+		this.serializationSchema = serializationSchema;
+		this.projectName = projectName;
+		this.topicName = topicName;
+	}
+
+	/**
+	 * Set the custom hostname/port combination of PubSub.
+	 * The ONLY reason to use this is during tests with the emulator provided by Google.
+	 *
+	 * @param hostAndPort The combination of hostname and port to connect to ("hostname:1234")
+	 * @return The current instance
+	 */
+	public PubSubSink<IN> withHostAndPort(String hostAndPort) {
+		this.hostAndPort = hostAndPort;
+		return this;
+	}
+
+	private transient ManagedChannel   managedChannel = null;
+	private transient TransportChannel channel        = null;
+
+	@Override
+	public void open(Configuration configuration) throws Exception {
+		Publisher.Builder builder = Publisher
+			.newBuilder(ProjectTopicName.of(projectName, topicName))
+			.setCredentialsProvider(serializableCredentialsProvider);
+
+		if (hostAndPort != null) {
+			managedChannel = ManagedChannelBuilder
+				.forTarget(hostAndPort)
+				.usePlaintext(true) // This is 'Ok' because this is ONLY used for testing.
+				.build();
+			channel = GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build();
+			builder.setChannelProvider(FixedTransportChannelProvider.create(channel));
+		}
+
+		publisher = builder.build();
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		publisher.shutdown();
+		if (channel != null) {
+			channel.close();
+			managedChannel.shutdownNow();
+		}
+	}
+
+	@Override
+	public void invoke(IN message, SinkFunction.Context context) {
+		PubsubMessage pubsubMessage = PubsubMessage
+			.newBuilder()
+			.setData(ByteString.copyFrom(serializationSchema.serialize(message)))
+			.build();
+		publisher.publish(pubsubMessage);
+		publisher.publishAllOutstanding();
+	}
+
+	/**
+	 * Create a builder for a new PubSubSink.
+	 * @param <IN> The generic of the type that is to be written into the sink.
+	 * @return a new PubSubSinkBuilder instance
+	 */
+	public static <IN> PubSubSinkBuilder<IN> newBuilder() {
+		return new PubSubSinkBuilder<>();
+	}
+
+	/**
+	 * PubSubSinkBuilder to create a PubSubSink.
+	 * @param <IN> Type of PubSubSink to create.
+	 */
+	public static class PubSubSinkBuilder<IN> {
+		private SerializableCredentialsProvider serializableCredentialsProvider = null;
+		private SerializationSchema<IN>         serializationSchema             = null;
+		private String                          projectName                     = null;
+		private String                          topicName                       = null;
+		private String                          hostAndPort                     = null;
+
+		private PubSubSinkBuilder() {
+		}
+
+		/**
+		 * Set the credentials.
+		 * If this is not used then the credentials are picked up from the environment variables.
+		 * @param credentials the Credentials needed to connect.
+		 * @return The current PubSubSinkBuilder instance
+		 */
+		public PubSubSinkBuilder<IN> withCredentials(Credentials credentials) {
+			this.serializableCredentialsProvider = new SerializableCredentialsProvider(credentials);
+			return this;
+		}
+
+		/**
+		 * Set the CredentialsProvider.
+		 * If this is not used then the credentials are picked up from the environment variables.
+		 * @param credentialsProvider the custom SerializableCredentialsProvider instance.
+		 * @return The current PubSubSinkBuilder instance
+		 */
+		public PubSubSinkBuilder<IN> withCredentialsProvider(CredentialsProvider credentialsProvider) throws IOException {
+			return withCredentials(credentialsProvider.getCredentials());
+		}
+
+		/**
+		 * Set the credentials to be absent.
+		 * This means that no credentials are to be used at all.
+		 * @return The current PubSubSinkBuilder instance
+		 */
+		public PubSubSinkBuilder<IN> withoutCredentials() {
+			this.serializableCredentialsProvider = SerializableCredentialsProvider.withoutCredentials();
+			return this;
+		}
+
+		/**
+		 * @param serializationSchema Instance of a SerializationSchema that converts the IN into a byte[]
+		 * @return The current PubSubSinkBuilder instance
+		 */
+		public PubSubSinkBuilder<IN> withSerializationSchema(SerializationSchema<IN> serializationSchema) {
+			this.serializationSchema = serializationSchema;
+			return this;
+		}
+
+		/**
+		 * @param projectName The name of the project in PubSub
+		 * @return The current PubSubSinkBuilder instance
+		 */
+		public PubSubSinkBuilder<IN> withProjectName (String projectName) {
+			this.projectName = projectName;
+			return this;
+		}
+
+		/**
+		 * @param topicName The name of the topic in PubSub
+		 * @return The current PubSubSinkBuilder instance
+		 */
+		public PubSubSinkBuilder<IN> withTopicName (String topicName) {
+			this.topicName = topicName;
+			return this;
+		}
+
+
+		/**
+		 * Set the custom hostname/port combination of PubSub.
+		 * The ONLY reason to use this is during tests with the emulator provided by Google.
+		 * @param hostAndPort The combination of hostname and port to connect to ("hostname:1234")
+		 * @return The current PubSubSinkBuilder instance
+		 */
+		public PubSubSinkBuilder<IN> withHostAndPort(String hostAndPort) {
+			this.hostAndPort = hostAndPort;
+			return this;
+		}
+
+		/**
+		 * Actually builder the desired instance of the PubSubSink.
+		 * @return a brand new PubSubSink
+		 * @throws IOException incase of a problem getting the credentials
+		 * @throws IllegalArgumentException incase required fields were not specified.
+		 */
+		public PubSubSink<IN> build() throws IOException {
+			if (serializableCredentialsProvider == null) {
+				serializableCredentialsProvider = SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables();
+			}
+			if (serializationSchema == null) {
+				throw new IllegalArgumentException("The serializationSchema has not been specified.");
+			}
+			if (projectName == null) {
+				throw new IllegalArgumentException("The projectName has not been specified.");
+			}
+			if (topicName == null) {
+				throw new IllegalArgumentException("The topicName has not been specified.");
+			}
+
+			PubSubSink<IN> pubSubSink = new PubSubSink<>(
+				serializableCredentialsProvider,
+				serializationSchema,
+				projectName, topicName);
+
+			if (hostAndPort != null) {
+				pubSubSink.withHostAndPort(hostAndPort);
+			}
+
+			return pubSubSink;
+		}
+	}
+
+}
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java
new file mode 100644
index 0000000..8f8c689
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PubsubMessage;
+
+import java.io.IOException;
+import java.util.List;
+
+
+/**
+ * PubSub Source, this Source will consume PubSub messages from a subscription and Acknowledge them as soon as they have been received.
+ */
+public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, AckReplyConsumer> implements MessageReceiver, ResultTypeQueryable<OUT>, ParallelSourceFunction<OUT> {
+	private DeserializationSchema<OUT> deserializationSchema;
+	private SubscriberWrapper          subscriberWrapper;
+
+	protected transient SourceContext<OUT> sourceContext = null;
+
+	protected PubSubSource() {
+		super(String.class);
+	}
+
+	protected void setDeserializationSchema(DeserializationSchema<OUT> deserializationSchema) {
+		this.deserializationSchema = deserializationSchema;
+	}
+
+	protected void setSubscriberWrapper(SubscriberWrapper subscriberWrapper) {
+		this.subscriberWrapper = subscriberWrapper;
+	}
+
+	@Override
+	public void open(Configuration configuration) throws Exception {
+		super.open(configuration);
+		subscriberWrapper.initialize(this);
+		if (hasNoCheckpointingEnabled(getRuntimeContext())) {
+			throw new IllegalArgumentException("Checkpointing needs to be enabled to support: PubSub ATLEAST_ONCE");
+		}
+	}
+
+	private boolean hasNoCheckpointingEnabled(RuntimeContext runtimeContext) {
+		return !(runtimeContext instanceof StreamingRuntimeContext && ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled());
+	}
+
+	@Override
+	protected void acknowledgeSessionIDs(List<AckReplyConsumer> ackReplyConsumers) {
+		ackReplyConsumers.forEach(AckReplyConsumer::ack);
+	}
+
+	@Override
+	public void run(SourceContext<OUT> sourceContext) {
+		this.sourceContext = sourceContext;
+		subscriberWrapper.startBlocking();
+	}
+
+	@Override
+	public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
+		if (sourceContext == null) {
+			consumer.nack();
+			return;
+		}
+
+		processMessage(message, consumer);
+	}
+
+	private void processMessage(PubsubMessage message, AckReplyConsumer ackReplyConsumer) {
+		synchronized (sourceContext.getCheckpointLock()) {
+			boolean alreadyProcessed = !addId(message.getMessageId());
+			if (alreadyProcessed) {
+				return;
+			}
+
+			sessionIds.add(ackReplyConsumer);
+			sourceContext.collect(deserializeMessage(message));
+		}
+	}
+
+	@Override
+	public void cancel() {
+		subscriberWrapper.stop();
+	}
+
+	private OUT deserializeMessage(PubsubMessage message) {
+		try {
+			return deserializationSchema.deserialize(message.getData().toByteArray());
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public TypeInformation<OUT> getProducedType() {
+		return deserializationSchema.getProducedType();
+	}
+
+	@SuppressWarnings("unchecked")
+	public static <OUT> PubSubSourceBuilder<OUT, ? extends PubSubSource, ? extends PubSubSourceBuilder> newBuilder() {
+		return new PubSubSourceBuilder<>(new PubSubSource<OUT>());
+	}
+
+	/**
+	 * Builder to create PubSubSource.
+	 * @param <OUT> The type of objects which will be read
+	 * @param <PSS> The type of PubSubSource
+	 * @param <BUILDER> The type of Builder to create the PubSubSource
+	 */
+	public static class PubSubSourceBuilder<OUT, PSS extends PubSubSource<OUT>, BUILDER extends PubSubSourceBuilder<OUT, PSS, BUILDER>> {
+		protected PSS 							sourceUnderConstruction;
+
+		private SubscriberWrapper               subscriberWrapper = null;
+		private SerializableCredentialsProvider serializableCredentialsProvider;
+		private DeserializationSchema<OUT>      deserializationSchema;
+		private String                          projectName;
+		private String                          subscriptionName;
+		private String                          hostAndPort;
+
+		protected PubSubSourceBuilder(PSS sourceUnderConstruction) {
+			this.sourceUnderConstruction = sourceUnderConstruction;
+		}
+
+		/**
+		 * Set the credentials.
+		 * If this is not used then the credentials are picked up from the environment variables.
+		 * @param credentials the Credentials needed to connect.
+		 * @return The current PubSubSourceBuilder instance
+		 */
+		public BUILDER withCredentials(Credentials credentials) {
+			this.serializableCredentialsProvider = new SerializableCredentialsProvider(credentials);
+			return (BUILDER) this;
+		}
+
+		/**
+		 * Set the CredentialsProvider.
+		 * If this is not used then the credentials are picked up from the environment variables.
+		 * @param credentialsProvider the custom SerializableCredentialsProvider instance.
+		 * @return The current PubSubSourceBuilder instance
+		 */
+		public BUILDER withCredentialsProvider(CredentialsProvider credentialsProvider) throws IOException {
+			return withCredentials(credentialsProvider.getCredentials());
+		}
+
+		/**
+		 * Set the credentials to be absent.
+		 * This means that no credentials are to be used at all.
+		 * @return The current PubSubSourceBuilder instance
+		 */
+		public BUILDER withoutCredentials() {
+			this.serializableCredentialsProvider = SerializableCredentialsProvider.withoutCredentials();
+			return (BUILDER) this;
+		}
+
+		/**
+		 * @param deserializationSchema Instance of a DeserializationSchema that converts the OUT into a byte[]
+		 * @return The current PubSubSourceBuilder instance
+		 */
+		public BUILDER withDeserializationSchema(DeserializationSchema <OUT> deserializationSchema) {
+			this.deserializationSchema = deserializationSchema;
+			return (BUILDER) this;
+		}
+
+		/**
+		 * @param projectName The name of the project in GoogleCloudPlatform
+		 * @param subscriptionName The name of the subscription in PubSub
+		 * @return The current PubSubSourceBuilder instance
+		 */
+		public BUILDER withProjectSubscriptionName(String projectName, String subscriptionName) {
+			this.projectName = projectName;
+			this.subscriptionName = subscriptionName;
+			return (BUILDER) this;
+		}
+
+		/**
+		 * Set the custom hostname/port combination of PubSub.
+		 * The ONLY reason to use this is during tests with the emulator provided by Google.
+		 * @param hostAndPort The combination of hostname and port to connect to ("hostname:1234")
+		 * @return The current PubSubSourceBuilder instance
+		 */
+		public BUILDER withHostAndPort(String hostAndPort) {
+			this.hostAndPort = hostAndPort;
+			return (BUILDER) this;
+		}
+
+		/**
+		 * Set a complete SubscriberWrapper.
+		 * The ONLY reason to use this is during tests.
+		 * @param subscriberWrapper The fully instantiated SubscriberWrapper
+		 * @return The current PubSubSourceBuilder instance
+		 */
+		public BUILDER withSubscriberWrapper(SubscriberWrapper subscriberWrapper) {
+			this.subscriberWrapper = subscriberWrapper;
+			return (BUILDER) this;
+		}
+
+		/**
+		 * Actually build the desired instance of the PubSubSourceBuilder.
+		 * @return a brand new SourceFunction
+		 * @throws IOException incase of a problem getting the credentials
+		 * @throws IllegalArgumentException incase required fields were not specified.
+		 */
+		public PSS build() throws IOException {
+			if (serializableCredentialsProvider == null) {
+				serializableCredentialsProvider = SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables();
+			}
+			if (deserializationSchema == null) {
+				throw new IllegalArgumentException("The deserializationSchema has not been specified.");
+			}
+
+			if (subscriberWrapper == null) {
+				if (projectName == null || subscriptionName == null) {
+					throw new IllegalArgumentException("The ProjectName And SubscriptionName have not been specified.");
+				}
+
+				subscriberWrapper =
+					new SubscriberWrapper(serializableCredentialsProvider, ProjectSubscriptionName.of(projectName, subscriptionName));
+
+				if (hostAndPort != null) {
+					subscriberWrapper.withHostAndPort(hostAndPort);
+				}
+			}
+
+			sourceUnderConstruction.setSubscriberWrapper(subscriberWrapper);
+			sourceUnderConstruction.setDeserializationSchema(deserializationSchema);
+
+			return sourceUnderConstruction;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java
new file mode 100644
index 0000000..fb75f43
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider;
+
+import com.google.api.core.ApiService;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannel;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+import java.io.Serializable;
+
+class SubscriberWrapper implements Serializable {
+	private final SerializableCredentialsProvider serializableCredentialsProvider;
+	private final String                          projectId;
+	private final String                          subscriptionId;
+	private       String                          hostAndPort = null;
+
+	private transient Subscriber       subscriber;
+	private transient ManagedChannel   managedChannel = null;
+	private transient TransportChannel channel        = null;
+
+	SubscriberWrapper(SerializableCredentialsProvider serializableCredentialsProvider, ProjectSubscriptionName projectSubscriptionName) {
+		this.serializableCredentialsProvider = serializableCredentialsProvider;
+		this.projectId = projectSubscriptionName.getProject();
+		this.subscriptionId = projectSubscriptionName.getSubscription();
+	}
+
+	void initialize(MessageReceiver messageReceiver) {
+		Subscriber.Builder builder = Subscriber
+				.newBuilder(ProjectSubscriptionName.of(projectId, subscriptionId), messageReceiver)
+				.setCredentialsProvider(serializableCredentialsProvider);
+
+		if (hostAndPort != null) {
+			managedChannel = ManagedChannelBuilder
+					.forTarget(hostAndPort)
+					.usePlaintext(true) // This is 'Ok' because this is ONLY used for testing.
+					.build();
+			channel = GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build();
+			builder.setChannelProvider(FixedTransportChannelProvider.create(channel));
+		}
+
+		this.subscriber = builder.build();
+	}
+
+	/**
+	 * Set the custom hostname/port combination of PubSub.
+	 * The ONLY reason to use this is during tests with the emulator provided by Google.
+	 *
+	 * @param hostAndPort The combination of hostname and port to connect to ("hostname:1234")
+	 * @return The current instance
+	 */
+	public SubscriberWrapper withHostAndPort(String hostAndPort) {
+		this.hostAndPort = hostAndPort;
+		return this;
+	}
+
+	void startBlocking() {
+		ApiService apiService = subscriber.startAsync();
+		apiService.awaitRunning();
+
+		if (apiService.state() != ApiService.State.RUNNING) {
+			throw new IllegalStateException("Could not start PubSubSubscriber, ApiService.State: " + apiService.state());
+		}
+		apiService.awaitTerminated();
+	}
+
+	void stop() {
+		subscriber.stopAsync().awaitTerminated();
+		if (channel != null) {
+			try {
+				channel.close();
+				managedChannel.shutdownNow();
+			} catch (Exception e) {
+				// Ignore
+			}
+		}
+	}
+
+	Subscriber getSubscriber() {
+		return subscriber;
+	}
+}
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java
new file mode 100644
index 0000000..bd04058
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub.common;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.auth.Credentials;
+import com.google.cloud.NoCredentials;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * Wrapper class for CredentialsProvider to make it Serializable. This can be used to pass on Credentials to SourceFunctions
+ */
+public class SerializableCredentialsProvider implements CredentialsProvider, Serializable {
+	private final Credentials credentials;
+
+	/**
+	 * @param credentials The google {@link Credentials} needed to connect to PubSub
+	 */
+	public SerializableCredentialsProvider(Credentials credentials) {
+		this.credentials = credentials;
+	}
+
+	/**
+	 * Creates a SerializableCredentialsProvider for a PubSubSubscription based on environment variables. {@link com.google.cloud.pubsub.v1.SubscriptionAdminSettings}
+	 * @return serializableCredentialsProvider
+	 * @throws IOException thrown by {@link Credentials}
+	 */
+	public static SerializableCredentialsProvider credentialsProviderFromEnvironmentVariables() throws IOException {
+		Credentials credentials = defaultCredentialsProviderBuilder().build().getCredentials();
+		return new SerializableCredentialsProvider(credentials);
+	}
+
+	/**
+	 * Creates a SerializableCredentialsProvider for a PubSubSubscription without any credentials. {@link com.google.cloud.pubsub.v1.SubscriptionAdminSettings}
+	 * This is ONLY useful when running tests locally against Mockito or the Google PubSub emulator
+	 * @see <a href="https://cloud.google.com/pubsub/docs/emulator" target="_top">https://cloud.google.com/pubsub/docs/emulator</a>.
+	 * @return serializableCredentialsProvider
+	 * @throws IOException thrown by {@link Credentials}
+	 */
+	public static SerializableCredentialsProvider withoutCredentials() {
+		return new SerializableCredentialsProvider(NoCredentials.getInstance());
+	}
+
+	@Override
+	public Credentials getCredentials() {
+		return credentials;
+	}
+}
diff --git a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java
new file mode 100644
index 0000000..f98731b
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java
@@ -0,0 +1,112 @@
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+/**
+ * Test for {@link Bound}.
+ */
+public class BoundTest {
+	private SourceFunction<Object> sourceFunction = mock(SourceFunction.class);
+
+	@Test
+	public void testNoShutdownBeforeCounterLimit() {
+		Bound<Object> bound = Bound.boundByAmountOfMessages(10);
+		bound.start(sourceFunction);
+		sleep(150L);
+
+		bound.receivedMessage();
+		verifyZeroInteractions(sourceFunction);
+	}
+
+	@Test
+	public void testShutdownOnCounterLimit() {
+		Bound<Object> bound = Bound.boundByAmountOfMessages(3);
+		bound.start(sourceFunction);
+
+		bound.receivedMessage();
+		bound.receivedMessage();
+		bound.receivedMessage();
+
+		verify(sourceFunction, times(1)).cancel();
+	}
+
+	@Test
+	public void testNoShutdownBeforeTimerLimit() {
+		Bound<Object> bound = Bound.boundByTimeSinceLastMessage(1000L);
+		bound.start(sourceFunction);
+		for (int i = 0; i < 10; i++) {
+			bound.receivedMessage();
+		}
+
+		verifyZeroInteractions(sourceFunction);
+	}
+
+	@Test
+	public void testShutdownAfterTimerLimitNoMessageReceived() {
+		Bound<Object> bound = Bound.boundByTimeSinceLastMessage(100L);
+		bound.start(sourceFunction);
+		sleep(250L);
+		verify(sourceFunction, times(1)).cancel();
+	}
+
+	@Test
+	public void testShutdownAfterTimerLimitAfterMessageReceived() {
+		Bound<Object> bound = Bound.boundByTimeSinceLastMessage(100L);
+		bound.start(sourceFunction);
+		sleep(50L);
+
+		bound.receivedMessage();
+		sleep(50L);
+		verifyZeroInteractions(sourceFunction);
+
+		sleep(200L);
+		verify(sourceFunction, times(1)).cancel();
+	}
+
+	@Test
+	public void testCounterOrTimerMaxMessages() {
+		Bound<Object> bound = Bound.boundByAmountOfMessagesOrTimeSinceLastMessage(3, 1000L);
+		bound.start(sourceFunction);
+
+		bound.receivedMessage();
+		bound.receivedMessage();
+		bound.receivedMessage();
+
+		verify(sourceFunction, times(1)).cancel();
+	}
+
+	@Test
+	public void testCounterOrTimerTimerElapsed() {
+		Bound<Object> bound = Bound.boundByAmountOfMessagesOrTimeSinceLastMessage(1L, 100L);
+		bound.start(sourceFunction);
+		sleep(200L);
+		verify(sourceFunction, times(1)).cancel();
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testExceptionThrownIfStartNotCalled() {
+		Bound<Object> bound = Bound.boundByAmountOfMessagesOrTimeSinceLastMessage(1L, 100L);
+		bound.receivedMessage();
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testExceptionThrownIfStartCalledTwice() {
+		Bound<Object> bound = Bound.boundByAmountOfMessagesOrTimeSinceLastMessage(1L, 100L);
+		bound.start(sourceFunction);
+		bound.start(sourceFunction);
+	}
+
+	private void sleep(long sleepTime) {
+		try {
+			Thread.sleep(sleepTime);
+		} catch (InterruptedException e) {
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java
new file mode 100644
index 0000000..805f823
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java
@@ -0,0 +1,70 @@
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import org.junit.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link BoundedPubSubSource}.
+ */
+public class BoundedPubSubSourceTest {
+	private final Bound<Object> bound = mock(Bound.class);
+	private final SubscriberWrapper subscriberWrapper = mock(SubscriberWrapper.class);
+	private final SourceFunction.SourceContext<Object> sourceContext = mock(SourceFunction.SourceContext.class);
+	private final AckReplyConsumer ackReplyConsumer = mock(AckReplyConsumer.class);
+	private final DeserializationSchema<Object> deserializationSchema = mock(DeserializationSchema.class);
+
+	private FunctionInitializationContext functionInitializationContext = mock(FunctionInitializationContext.class);
+	private OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
+	private StreamingRuntimeContext streamingRuntimeContext = mock(StreamingRuntimeContext.class);
+
+	@Test
+	public void testBoundIsUsed() throws Exception {
+		BoundedPubSubSource<Object> boundedPubSubSource = createAndInitializeBoundedPubSubSource();
+		boundedPubSubSource.setBound(bound);
+
+		boundedPubSubSource.run(sourceContext);
+		verify(bound, times(1)).start(boundedPubSubSource);
+
+		boundedPubSubSource.receiveMessage(pubSubMessage(), ackReplyConsumer);
+		verify(bound, times(1)).receivedMessage();
+	}
+
+	private BoundedPubSubSource<Object> createAndInitializeBoundedPubSubSource() throws Exception {
+		when(sourceContext.getCheckpointLock()).thenReturn(new Object());
+		when(functionInitializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
+		when(operatorStateStore.getSerializableListState(any(String.class))).thenReturn(null);
+		when(streamingRuntimeContext.isCheckpointingEnabled()).thenReturn(true);
+
+		BoundedPubSubSource<Object> boundedPubSubSource = BoundedPubSubSource.newBuilder()
+			.withoutCredentials()
+			.withSubscriberWrapper(subscriberWrapper)
+			.withDeserializationSchema(deserializationSchema)
+			.build();
+		boundedPubSubSource.initializeState(functionInitializationContext);
+		boundedPubSubSource.setRuntimeContext(streamingRuntimeContext);
+		boundedPubSubSource.open(null);
+
+		return boundedPubSubSource;
+	}
+
+	private PubsubMessage pubSubMessage() {
+		return PubsubMessage.newBuilder()
+				.setMessageId("message-id")
+				.setData(ByteString.copyFrom("some-message".getBytes()))
+				.build();
+	}
+}
diff --git a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java
new file mode 100644
index 0000000..73ca53b
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
+
+
+/**
+ * Test for {@link SourceFunction}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class PubSubSourceTest {
+	private static final String MESSAGE = "Message";
+	private static final byte[] SERIALIZED_MESSAGE = MESSAGE.getBytes();
+	@Mock
+	private SubscriberWrapper subscriberWrapper;
+	@Mock
+	private org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<String> sourceContext;
+	@Mock
+	private DeserializationSchema<String> deserializationSchema;
+	@Mock
+	private AckReplyConsumer ackReplyConsumer;
+	@Mock
+	private StreamingRuntimeContext streamingRuntimeContext;
+	@Mock
+	private RuntimeContext runtimeContext;
+	@Mock
+	private OperatorStateStore operatorStateStore;
+	@Mock
+	private FunctionInitializationContext functionInitializationContext;
+
+	@Test
+	public void testOpenWithCheckpointing() throws Exception {
+		when(streamingRuntimeContext.isCheckpointingEnabled()).thenReturn(true);
+
+		PubSubSource<String> pubSubSource = createTestSource();
+		pubSubSource.setRuntimeContext(streamingRuntimeContext);
+		pubSubSource.open(null);
+
+		verify(subscriberWrapper, times(1)).initialize(pubSubSource);
+	}
+
+	@Test
+	public void testRun() throws IOException {
+		PubSubSource<String> pubSubSource = createTestSource();
+		pubSubSource.run(sourceContext);
+
+		verify(subscriberWrapper, times(1)).startBlocking();
+	}
+
+	@Test
+	public void testWithCheckpoints() throws Exception {
+		when(deserializationSchema.deserialize(SERIALIZED_MESSAGE)).thenReturn(MESSAGE);
+		when(streamingRuntimeContext.isCheckpointingEnabled()).thenReturn(true);
+		when(sourceContext.getCheckpointLock()).thenReturn("some object to lock on");
+		when(functionInitializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
+		when(operatorStateStore.getSerializableListState(any(String.class))).thenReturn(null);
+
+		PubSubSource<String> pubSubSource = createTestSource();
+		pubSubSource.initializeState(functionInitializationContext);
+		pubSubSource.setRuntimeContext(streamingRuntimeContext);
+		pubSubSource.open(null);
+		verify(subscriberWrapper, times(1)).initialize(pubSubSource);
+
+		pubSubSource.run(sourceContext);
+
+		pubSubSource.receiveMessage(pubSubMessage(), ackReplyConsumer);
+
+		verify(sourceContext, times(1)).getCheckpointLock();
+		verify(sourceContext, times(1)).collect(MESSAGE);
+		verifyZeroInteractions(ackReplyConsumer);
+	}
+
+	@Test
+	public void testMessagesAcknowledged() throws Exception {
+		when(streamingRuntimeContext.isCheckpointingEnabled()).thenReturn(true);
+
+		PubSubSource<String> pubSubSource = createTestSource();
+		pubSubSource.setRuntimeContext(streamingRuntimeContext);
+		pubSubSource.open(null);
+
+		List<AckReplyConsumer> input = Collections.singletonList(ackReplyConsumer);
+
+		pubSubSource.acknowledgeSessionIDs(input);
+
+		verify(ackReplyConsumer, times(1)).ack();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testOnceWithoutCheckpointing() throws Exception {
+		PubSubSource<String> pubSubSource = createTestSource();
+		pubSubSource.setRuntimeContext(runtimeContext);
+
+		pubSubSource.open(null);
+	}
+
+	private PubSubSource<String> createTestSource() throws IOException {
+		return PubSubSource.<String>newBuilder()
+			.withoutCredentials()
+			.withSubscriberWrapper(subscriberWrapper)
+			.withDeserializationSchema(deserializationSchema)
+			.build();
+	}
+
+	private PubsubMessage pubSubMessage() {
+		return PubsubMessage.newBuilder()
+							.setData(ByteString.copyFrom(SERIALIZED_MESSAGE))
+							.build();
+	}
+}
diff --git a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapperTest.java b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapperTest.java
new file mode 100644
index 0000000..f5deb07
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapperTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider;
+
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.apache.flink.api.java.ClosureCleaner.ensureSerializable;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Tests for {@link SubscriberWrapper}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class SubscriberWrapperTest {
+	@Mock
+	private SerializableCredentialsProvider credentialsProvider;
+
+	@Mock
+	private MessageReceiver messageReceiver;
+
+	@Test
+	public void testSerializedSubscriberBuilder() throws Exception {
+		SubscriberWrapper factory = new SubscriberWrapper(SerializableCredentialsProvider.withoutCredentials(), ProjectSubscriptionName.of("projectId", "subscriptionId"));
+		ensureSerializable(factory);
+	}
+
+	@Test
+	public void testInitialisation() {
+		SubscriberWrapper factory = new SubscriberWrapper(credentialsProvider, ProjectSubscriptionName.of("projectId", "subscriptionId"));
+		factory.initialize(messageReceiver);
+
+		assertThat(factory.getSubscriber().getSubscriptionNameString(), is(ProjectSubscriptionName.format("projectId", "subscriptionId")));
+	}
+}
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index e6d601d..1c47830 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -56,6 +56,7 @@ under the License.
 		<module>flink-connector-cassandra</module>
 		<module>flink-connector-filesystem</module>
 		<module>flink-connector-kafka</module>
+		<module>flink-connector-pubsub</module>
 	</modules>
 
 	<!-- override these root dependencies as 'provided', so they don't end up
diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml
index c1b52a4..888399e 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -64,6 +64,17 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-pubsub_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.google.cloud</groupId>
+			<artifactId>google-cloud-pubsub</artifactId>
+			<version>1.31.0</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-shaded-jackson</artifactId>
 		</dependency>
 
@@ -364,6 +375,7 @@ under the License.
 							</includes>
 						</configuration>
 					</execution>
+
 				</executions>
 			</plugin>
 
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/IntegerSerializer.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/IntegerSerializer.java
new file mode 100644
index 0000000..6e9d1d5
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/IntegerSerializer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.pubsub;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+import java.math.BigInteger;
+
+class IntegerSerializer implements DeserializationSchema<Integer>, SerializationSchema<Integer> {
+
+	@Override
+	public Integer deserialize(byte[] bytes) throws IOException {
+		return new BigInteger(bytes).intValue();
+	}
+
+	@Override
+	public boolean isEndOfStream(Integer integer) {
+		return false;
+	}
+
+	@Override
+	public TypeInformation<Integer> getProducedType() {
+		return TypeInformation.of(Integer.class);
+	}
+
+	@Override
+	public byte[] serialize(Integer integer) {
+		return BigInteger.valueOf(integer).toByteArray();
+	}
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java
new file mode 100644
index 0000000..612da41
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.pubsub;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.pubsub.PubSubSink;
+import org.apache.flink.streaming.connectors.pubsub.PubSubSourceBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple PubSub example.
+ *
+ * <p>Before starting a flink job it will publish 10 messages on the input topic.
+ *
+ * Then a flink job is started to read these 10 messages from the input-subscription,
+ * it will print them to stdout
+ * and then write them to a the output-topic.</p>
+ */
+public class PubSubExample {
+	private static final Logger LOG = LoggerFactory.getLogger(PubSubExample.class);
+
+	public static void main(String[] args) throws Exception {
+		// parse input arguments
+		final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+		if (parameterTool.getNumberOfParameters() < 3) {
+			System.out.println("Missing parameters!\n" +
+								"Usage: flink run PubSub.jar --input-subscription <subscription> --input-topicName <topic> --output-topicName " +
+								"--google-project <google project name> ");
+			//return;
+		}
+
+		String projectName = parameterTool.getRequired("google-project");
+		String inputTopicName = parameterTool.getRequired("input-topicName");
+		String subscriptionName = parameterTool.getRequired("input-subscription");
+		String outputTopicName = parameterTool.getRequired("output-topicName");
+
+		PubSubPublisher pubSubPublisher = new PubSubPublisher(projectName, inputTopicName);
+
+		pubSubPublisher.publish();
+		runFlinkJob(projectName, subscriptionName, outputTopicName);
+	}
+
+	private static void runFlinkJob(String projectName, String subscriptionName, String outputTopicName) throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		env.addSource(PubSubSourceBuilder.<Integer>builder()
+										.withProjectSubscriptionName(projectName, subscriptionName)
+										.withDeserializationSchema(new IntegerSerializer())
+										.withMode(PubSubSourceBuilder.Mode.NONE)
+										.build())
+			.map(PubSubExample::printAndReturn).disableChaining()
+			.addSink(PubSubSink.<Integer>newBuilder()
+							.withProjectName(projectName)
+							.withTopicName(outputTopicName)
+							.withSerializationSchema(new IntegerSerializer())
+							.build());
+
+		env.execute("Flink Streaming PubSubReader");
+	}
+
+	private static Integer printAndReturn(Integer i) {
+		LOG.info("Processed message with payload: " + i);
+		return i;
+	}
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubPublisher.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubPublisher.java
new file mode 100644
index 0000000..7507945
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubPublisher.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.pubsub;
+
+import com.google.api.core.ApiFuture;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+
+import java.math.BigInteger;
+
+class PubSubPublisher {
+	private final String projectName;
+	private final String topicName;
+
+	PubSubPublisher(String projectName, String topicName) {
+		this.projectName = projectName;
+		this.topicName = topicName;
+	}
+
+	void publish() {
+		Publisher publisher = null;
+		try {
+			publisher = Publisher.newBuilder(ProjectTopicName.of(projectName, topicName)).build();
+			long counter = 0;
+			while (counter < 10) {
+				ByteString messageData = ByteString.copyFrom(BigInteger.valueOf(counter).toByteArray());
+				PubsubMessage message = PubsubMessage.newBuilder().setData(messageData).build();
+
+				ApiFuture<String> future = publisher.publish(message);
+				future.get();
+				System.out.println("Published message: " + counter);
+				Thread.sleep(100L);
+
+				counter++;
+			}
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		} finally {
+			try {
+				if (publisher != null) {
+					publisher.shutdown();
+				}
+			} catch (Exception e) {
+			}
+		}
+	}
+}