You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2019/07/05 12:46:16 UTC
[flink] 01/05: [FLINK-9311] [pubsub] Add PubSubSource and
PubSubSink connectors
This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0937601972674cc548446616796566ad2eab9b81
Author: Richard Deurwaarder <ri...@xeli.eu>
AuthorDate: Sat May 26 14:59:32 2018 +0200
[FLINK-9311] [pubsub] Add PubSubSource and PubSubSink connectors
This closes #6594
---
flink-connectors/flink-connector-pubsub/pom.xml | 119 ++++++++++
.../flink/streaming/connectors/pubsub/Bound.java | 99 ++++++++
.../connectors/pubsub/BoundedPubSubSource.java | 79 +++++++
.../streaming/connectors/pubsub/PubSubSink.java | 241 +++++++++++++++++++
.../streaming/connectors/pubsub/PubSubSource.java | 257 +++++++++++++++++++++
.../connectors/pubsub/SubscriberWrapper.java | 104 +++++++++
.../common/SerializableCredentialsProvider.java | 67 ++++++
.../streaming/connectors/pubsub/BoundTest.java | 112 +++++++++
.../connectors/pubsub/BoundedPubSubSourceTest.java | 70 ++++++
.../connectors/pubsub/PubSubSourceTest.java | 148 ++++++++++++
.../connectors/pubsub/SubscriberWrapperTest.java | 57 +++++
flink-connectors/pom.xml | 1 +
flink-examples/flink-examples-streaming/pom.xml | 12 +
.../examples/pubsub/IntegerSerializer.java | 48 ++++
.../streaming/examples/pubsub/PubSubExample.java | 84 +++++++
.../streaming/examples/pubsub/PubSubPublisher.java | 64 +++++
16 files changed, 1562 insertions(+)
diff --git a/flink-connectors/flink-connector-pubsub/pom.xml b/flink-connectors/flink-connector-pubsub/pom.xml
new file mode 100644
index 0000000..a6e8d72
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/pom.xml
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connectors</artifactId>
+ <version>1.7-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-pubsub_${scala.binary.version}</artifactId>
+ <name>flink-connector-pubsub</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <pubsub.version>1.37.1</pubsub.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.cloud</groupId>
+ <artifactId>google-cloud-pubsub</artifactId>
+ <version>${pubsub.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadeTestJar>false</shadeTestJar>
+ <artifactSet>
+ <includes>
+ <include>*:*</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>com.google.guava</pattern>
+ <shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.guava</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google.common.base</pattern>
+ <shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.common.base</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.grpc.auth</pattern>
+ <shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.auth</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.grpc.protobuf</pattern>
+ <shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.protobuf</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
new file mode 100644
index 0000000..b37cc45
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
@@ -0,0 +1,99 @@
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Timer;
+import java.util.TimerTask;
+
+class Bound<OUT> implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(Bound.class);
+
+ private final Bound.Mode mode;
+ private final long maxMessagedReceived;
+ private final long maxTimeBetweenMessages;
+
+ private SourceFunction<OUT> sourceFunction;
+ private transient Timer timer;
+ private long messagesReceived;
+ private long lastReceivedMessage;
+ private boolean cancelled = false;
+
+ private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) {
+ this.mode = mode;
+ this.maxMessagedReceived = maxMessagedReceived;
+ this.maxTimeBetweenMessages = maxTimeBetweenMessages;
+ this.messagesReceived = 0L;
+ }
+
+ static <OUT> Bound<OUT> boundByAmountOfMessages(long maxMessagedReceived) {
+ return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L);
+ }
+
+ static <OUT> Bound<OUT> boundByTimeSinceLastMessage(long maxTimeBetweenMessages) {
+ return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages);
+ }
+
+ static <OUT> Bound<OUT> boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long maxTimeBetweenMessages) {
+ return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, maxTimeBetweenMessages);
+ }
+
+ private TimerTask shutdownPubSubSource() {
+ return new TimerTask() {
+ @Override
+ public void run() {
+ if (maxTimeBetweenMessagesElapsed()) {
+ cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source");
+ timer.cancel();
+ }
+ }
+ };
+ }
+
+ private synchronized boolean maxTimeBetweenMessagesElapsed() {
+ return System.currentTimeMillis() - lastReceivedMessage > maxTimeBetweenMessages;
+ }
+
+ private synchronized void cancelPubSubSource(String logMessage) {
+ if (!cancelled) {
+ cancelled = true;
+ sourceFunction.cancel();
+ LOG.info(logMessage);
+ }
+ }
+
+ void start(SourceFunction<OUT> sourceFunction) {
+ if (this.sourceFunction != null) {
+ throw new IllegalStateException("start() already called");
+ }
+
+ this.sourceFunction = sourceFunction;
+ messagesReceived = 0;
+
+ if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) {
+ lastReceivedMessage = System.currentTimeMillis();
+ timer = new Timer();
+ timer.schedule(shutdownPubSubSource(), 0, 100);
+ }
+ }
+
+ synchronized void receivedMessage() {
+ if (sourceFunction == null) {
+ throw new IllegalStateException("start() not called");
+ }
+
+ lastReceivedMessage = System.currentTimeMillis();
+ messagesReceived++;
+
+ if ((mode == Mode.COUNTER || mode == Mode.COUNTER_OR_TIMER) && messagesReceived >= maxMessagedReceived) {
+ cancelPubSubSource("BoundedSourceFunction: Max received messages --> canceling source");
+ }
+ }
+
+ private enum Mode {
+ COUNTER, TIMER, COUNTER_OR_TIMER
+ }
+}
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java
new file mode 100644
index 0000000..5f829ae
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java
@@ -0,0 +1,79 @@
+package org.apache.flink.streaming.connectors.pubsub;
+
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.pubsub.v1.PubsubMessage;
+
+import java.io.IOException;
+
+class BoundedPubSubSource<OUT> extends PubSubSource<OUT> {
+ private Bound<OUT> bound;
+
+ private BoundedPubSubSource() {
+ super();
+ }
+
+ protected void setBound(Bound<OUT> bound) {
+ this.bound = bound;
+ }
+
+ @Override
+ public void run(SourceContext<OUT> sourceContext) {
+ bound.start(this);
+ super.run(sourceContext);
+ }
+
+ @Override
+ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
+ super.receiveMessage(message, consumer);
+ bound.receivedMessage();
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <OUT> BoundedPubSubSourceBuilder<OUT, ? extends PubSubSource, ? extends BoundedPubSubSourceBuilder> newBuilder() {
+ return new BoundedPubSubSourceBuilder<>(new BoundedPubSubSource<OUT>());
+ }
+
+ @SuppressWarnings("unchecked")
+ public static class BoundedPubSubSourceBuilder<OUT, PSS extends BoundedPubSubSource<OUT>, BUILDER extends BoundedPubSubSourceBuilder<OUT, PSS, BUILDER>> extends PubSubSourceBuilder<OUT, PSS, BUILDER> {
+ private Long boundedByAmountOfMessages;
+ private Long boundedByTimeSinceLastMessage;
+
+ BoundedPubSubSourceBuilder(PSS sourceUnderConstruction) {
+ super(sourceUnderConstruction);
+ }
+
+ public BUILDER boundedByAmountOfMessages(long maxAmountOfMessages) {
+ boundedByAmountOfMessages = maxAmountOfMessages;
+ return (BUILDER) this;
+ }
+
+ public BUILDER boundedByTimeSinceLastMessage(long timeSinceLastMessage) {
+ boundedByTimeSinceLastMessage = timeSinceLastMessage;
+ return (BUILDER) this;
+ }
+
+ private Bound <OUT> createBound() {
+ if (boundedByAmountOfMessages != null && boundedByTimeSinceLastMessage != null) {
+ return Bound.boundByAmountOfMessagesOrTimeSinceLastMessage(boundedByAmountOfMessages, boundedByTimeSinceLastMessage);
+ }
+
+ if (boundedByAmountOfMessages != null) {
+ return Bound.boundByAmountOfMessages(boundedByAmountOfMessages);
+ }
+
+ if (boundedByTimeSinceLastMessage != null) {
+ return Bound.boundByTimeSinceLastMessage(boundedByTimeSinceLastMessage);
+ }
+
+ // This is functionally speaking no bound.
+ return Bound.boundByAmountOfMessages(Long.MAX_VALUE);
+ }
+
+ @Override
+ public PSS build() throws IOException {
+ sourceUnderConstruction.setBound(createBound());
+ return super.build();
+ }
+ }
+
+}
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
new file mode 100644
index 0000000..92c9c6c
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import com.google.api.gax.core.NoCredentialsProvider;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannel;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+import java.io.IOException;
+
+/**
+ * A sink function that outputs to PubSub.
+ * @param <IN> type of PubSubSink messages to write
+ */
+public class PubSubSink<IN> extends RichSinkFunction<IN> {
+
+ private final SerializableCredentialsProvider serializableCredentialsProvider;
+ private final SerializationSchema<IN> serializationSchema;
+ private final String projectName;
+ private final String topicName;
+ private String hostAndPort = null;
+
+ private transient Publisher publisher;
+
+ public PubSubSink(SerializableCredentialsProvider serializableCredentialsProvider, SerializationSchema<IN> serializationSchema, String projectName, String topicName) {
+ this.serializableCredentialsProvider = serializableCredentialsProvider;
+ this.serializationSchema = serializationSchema;
+ this.projectName = projectName;
+ this.topicName = topicName;
+ }
+
+ /**
+ * Set the custom hostname/port combination of PubSub.
+ * The ONLY reason to use this is during tests with the emulator provided by Google.
+ *
+ * @param hostAndPort The combination of hostname and port to connect to ("hostname:1234")
+ * @return The current instance
+ */
+ public PubSubSink<IN> withHostAndPort(String hostAndPort) {
+ this.hostAndPort = hostAndPort;
+ return this;
+ }
+
+ private transient ManagedChannel managedChannel = null;
+ private transient TransportChannel channel = null;
+
+ @Override
+ public void open(Configuration configuration) throws Exception {
+ Publisher.Builder builder = Publisher
+ .newBuilder(ProjectTopicName.of(projectName, topicName))
+ .setCredentialsProvider(serializableCredentialsProvider);
+
+ if (hostAndPort != null) {
+ managedChannel = ManagedChannelBuilder
+ .forTarget(hostAndPort)
+ .usePlaintext(true) // This is 'Ok' because this is ONLY used for testing.
+ .build();
+ channel = GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build();
+ builder.setChannelProvider(FixedTransportChannelProvider.create(channel));
+ }
+
+ publisher = builder.build();
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ publisher.shutdown();
+ if (channel != null) {
+ channel.close();
+ managedChannel.shutdownNow();
+ }
+ }
+
+ @Override
+ public void invoke(IN message, SinkFunction.Context context) {
+ PubsubMessage pubsubMessage = PubsubMessage
+ .newBuilder()
+ .setData(ByteString.copyFrom(serializationSchema.serialize(message)))
+ .build();
+ publisher.publish(pubsubMessage);
+ publisher.publishAllOutstanding();
+ }
+
+ /**
+ * Create a builder for a new PubSubSink.
+ * @param <IN> The generic of the type that is to be written into the sink.
+ * @return a new PubSubSinkBuilder instance
+ */
+ public static <IN> PubSubSinkBuilder<IN> newBuilder() {
+ return new PubSubSinkBuilder<>();
+ }
+
+ /**
+ * PubSubSinkBuilder to create a PubSubSink.
+ * @param <IN> Type of PubSubSink to create.
+ */
+ public static class PubSubSinkBuilder<IN> {
+ private SerializableCredentialsProvider serializableCredentialsProvider = null;
+ private SerializationSchema<IN> serializationSchema = null;
+ private String projectName = null;
+ private String topicName = null;
+ private String hostAndPort = null;
+
+ private PubSubSinkBuilder() {
+ }
+
+ /**
+ * Set the credentials.
+ * If this is not used then the credentials are picked up from the environment variables.
+ * @param credentials the Credentials needed to connect.
+ * @return The current PubSubSinkBuilder instance
+ */
+ public PubSubSinkBuilder<IN> withCredentials(Credentials credentials) {
+ this.serializableCredentialsProvider = new SerializableCredentialsProvider(credentials);
+ return this;
+ }
+
+ /**
+ * Set the CredentialsProvider.
+ * If this is not used then the credentials are picked up from the environment variables.
+ * @param credentialsProvider the custom SerializableCredentialsProvider instance.
+ * @return The current PubSubSinkBuilder instance
+ */
+ public PubSubSinkBuilder<IN> withCredentialsProvider(CredentialsProvider credentialsProvider) throws IOException {
+ return withCredentials(credentialsProvider.getCredentials());
+ }
+
+ /**
+ * Set the credentials to be absent.
+ * This means that no credentials are to be used at all.
+ * @return The current PubSubSinkBuilder instance
+ */
+ public PubSubSinkBuilder<IN> withoutCredentials() {
+ this.serializableCredentialsProvider = SerializableCredentialsProvider.withoutCredentials();
+ return this;
+ }
+
+ /**
+ * @param serializationSchema Instance of a SerializationSchema that converts the IN into a byte[]
+ * @return The current PubSubSinkBuilder instance
+ */
+ public PubSubSinkBuilder<IN> withSerializationSchema(SerializationSchema<IN> serializationSchema) {
+ this.serializationSchema = serializationSchema;
+ return this;
+ }
+
+ /**
+ * @param projectName The name of the project in PubSub
+ * @return The current PubSubSinkBuilder instance
+ */
+ public PubSubSinkBuilder<IN> withProjectName (String projectName) {
+ this.projectName = projectName;
+ return this;
+ }
+
+ /**
+ * @param topicName The name of the topic in PubSub
+ * @return The current PubSubSinkBuilder instance
+ */
+ public PubSubSinkBuilder<IN> withTopicName (String topicName) {
+ this.topicName = topicName;
+ return this;
+ }
+
+
+ /**
+ * Set the custom hostname/port combination of PubSub.
+ * The ONLY reason to use this is during tests with the emulator provided by Google.
+ * @param hostAndPort The combination of hostname and port to connect to ("hostname:1234")
+ * @return The current PubSubSinkBuilder instance
+ */
+ public PubSubSinkBuilder<IN> withHostAndPort(String hostAndPort) {
+ this.hostAndPort = hostAndPort;
+ return this;
+ }
+
+ /**
+ * Actually builder the desired instance of the PubSubSink.
+ * @return a brand new PubSubSink
+ * @throws IOException incase of a problem getting the credentials
+ * @throws IllegalArgumentException incase required fields were not specified.
+ */
+ public PubSubSink<IN> build() throws IOException {
+ if (serializableCredentialsProvider == null) {
+ serializableCredentialsProvider = SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables();
+ }
+ if (serializationSchema == null) {
+ throw new IllegalArgumentException("The serializationSchema has not been specified.");
+ }
+ if (projectName == null) {
+ throw new IllegalArgumentException("The projectName has not been specified.");
+ }
+ if (topicName == null) {
+ throw new IllegalArgumentException("The topicName has not been specified.");
+ }
+
+ PubSubSink<IN> pubSubSink = new PubSubSink<>(
+ serializableCredentialsProvider,
+ serializationSchema,
+ projectName, topicName);
+
+ if (hostAndPort != null) {
+ pubSubSink.withHostAndPort(hostAndPort);
+ }
+
+ return pubSubSink;
+ }
+ }
+
+}
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java
new file mode 100644
index 0000000..8f8c689
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PubsubMessage;
+
+import java.io.IOException;
+import java.util.List;
+
+
+/**
+ * PubSub Source, this Source will consume PubSub messages from a subscription and Acknowledge them as soon as they have been received.
+ */
+public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, AckReplyConsumer> implements MessageReceiver, ResultTypeQueryable<OUT>, ParallelSourceFunction<OUT> {
+ private DeserializationSchema<OUT> deserializationSchema;
+ private SubscriberWrapper subscriberWrapper;
+
+ protected transient SourceContext<OUT> sourceContext = null;
+
+ protected PubSubSource() {
+ super(String.class);
+ }
+
+ protected void setDeserializationSchema(DeserializationSchema<OUT> deserializationSchema) {
+ this.deserializationSchema = deserializationSchema;
+ }
+
+ protected void setSubscriberWrapper(SubscriberWrapper subscriberWrapper) {
+ this.subscriberWrapper = subscriberWrapper;
+ }
+
+ @Override
+ public void open(Configuration configuration) throws Exception {
+ super.open(configuration);
+ subscriberWrapper.initialize(this);
+ if (hasNoCheckpointingEnabled(getRuntimeContext())) {
+ throw new IllegalArgumentException("Checkpointing needs to be enabled to support: PubSub ATLEAST_ONCE");
+ }
+ }
+
+ private boolean hasNoCheckpointingEnabled(RuntimeContext runtimeContext) {
+ return !(runtimeContext instanceof StreamingRuntimeContext && ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled());
+ }
+
+ @Override
+ protected void acknowledgeSessionIDs(List<AckReplyConsumer> ackReplyConsumers) {
+ ackReplyConsumers.forEach(AckReplyConsumer::ack);
+ }
+
+ @Override
+ public void run(SourceContext<OUT> sourceContext) {
+ this.sourceContext = sourceContext;
+ subscriberWrapper.startBlocking();
+ }
+
+ @Override
+ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
+ if (sourceContext == null) {
+ consumer.nack();
+ return;
+ }
+
+ processMessage(message, consumer);
+ }
+
+ private void processMessage(PubsubMessage message, AckReplyConsumer ackReplyConsumer) {
+ synchronized (sourceContext.getCheckpointLock()) {
+ boolean alreadyProcessed = !addId(message.getMessageId());
+ if (alreadyProcessed) {
+ return;
+ }
+
+ sessionIds.add(ackReplyConsumer);
+ sourceContext.collect(deserializeMessage(message));
+ }
+ }
+
+ @Override
+ public void cancel() {
+ subscriberWrapper.stop();
+ }
+
+ private OUT deserializeMessage(PubsubMessage message) {
+ try {
+ return deserializationSchema.deserialize(message.getData().toByteArray());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public TypeInformation<OUT> getProducedType() {
+ return deserializationSchema.getProducedType();
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <OUT> PubSubSourceBuilder<OUT, ? extends PubSubSource, ? extends PubSubSourceBuilder> newBuilder() {
+ return new PubSubSourceBuilder<>(new PubSubSource<OUT>());
+ }
+
+ /**
+ * Builder to create PubSubSource.
+ * @param <OUT> The type of objects which will be read
+ * @param <PSS> The type of PubSubSource
+ * @param <BUILDER> The type of Builder to create the PubSubSource
+ */
+ public static class PubSubSourceBuilder<OUT, PSS extends PubSubSource<OUT>, BUILDER extends PubSubSourceBuilder<OUT, PSS, BUILDER>> {
+ protected PSS sourceUnderConstruction;
+
+ private SubscriberWrapper subscriberWrapper = null;
+ private SerializableCredentialsProvider serializableCredentialsProvider;
+ private DeserializationSchema<OUT> deserializationSchema;
+ private String projectName;
+ private String subscriptionName;
+ private String hostAndPort;
+
+ protected PubSubSourceBuilder(PSS sourceUnderConstruction) {
+ this.sourceUnderConstruction = sourceUnderConstruction;
+ }
+
+ /**
+ * Set the credentials.
+ * If this is not used then the credentials are picked up from the environment variables.
+ * @param credentials the Credentials needed to connect.
+ * @return The current PubSubSourceBuilder instance
+ */
+ public BUILDER withCredentials(Credentials credentials) {
+ this.serializableCredentialsProvider = new SerializableCredentialsProvider(credentials);
+ return (BUILDER) this;
+ }
+
+ /**
+ * Set the CredentialsProvider.
+ * If this is not used then the credentials are picked up from the environment variables.
+ * @param credentialsProvider the custom SerializableCredentialsProvider instance.
+ * @return The current PubSubSourceBuilder instance
+ */
+ public BUILDER withCredentialsProvider(CredentialsProvider credentialsProvider) throws IOException {
+ return withCredentials(credentialsProvider.getCredentials());
+ }
+
+ /**
+ * Set the credentials to be absent.
+ * This means that no credentials are to be used at all.
+ * @return The current PubSubSourceBuilder instance
+ */
+ public BUILDER withoutCredentials() {
+ this.serializableCredentialsProvider = SerializableCredentialsProvider.withoutCredentials();
+ return (BUILDER) this;
+ }
+
+ /**
+ * @param deserializationSchema Instance of a DeserializationSchema that converts the OUT into a byte[]
+ * @return The current PubSubSourceBuilder instance
+ */
+ public BUILDER withDeserializationSchema(DeserializationSchema <OUT> deserializationSchema) {
+ this.deserializationSchema = deserializationSchema;
+ return (BUILDER) this;
+ }
+
+ /**
+ * @param projectName The name of the project in GoogleCloudPlatform
+ * @param subscriptionName The name of the subscription in PubSub
+ * @return The current PubSubSourceBuilder instance
+ */
+ public BUILDER withProjectSubscriptionName(String projectName, String subscriptionName) {
+ this.projectName = projectName;
+ this.subscriptionName = subscriptionName;
+ return (BUILDER) this;
+ }
+
+ /**
+ * Set the custom hostname/port combination of PubSub.
+ * The ONLY reason to use this is during tests with the emulator provided by Google.
+ * @param hostAndPort The combination of hostname and port to connect to ("hostname:1234")
+ * @return The current PubSubSourceBuilder instance
+ */
+ public BUILDER withHostAndPort(String hostAndPort) {
+ this.hostAndPort = hostAndPort;
+ return (BUILDER) this;
+ }
+
+ /**
+ * Set a complete SubscriberWrapper.
+ * The ONLY reason to use this is during tests.
+ * @param subscriberWrapper The fully instantiated SubscriberWrapper
+ * @return The current PubSubSourceBuilder instance
+ */
+ public BUILDER withSubscriberWrapper(SubscriberWrapper subscriberWrapper) {
+ this.subscriberWrapper = subscriberWrapper;
+ return (BUILDER) this;
+ }
+
+ /**
+ * Actually build the desired instance of the PubSubSourceBuilder.
+ * @return a brand new SourceFunction
+ * @throws IOException incase of a problem getting the credentials
+ * @throws IllegalArgumentException incase required fields were not specified.
+ */
+ public PSS build() throws IOException {
+ if (serializableCredentialsProvider == null) {
+ serializableCredentialsProvider = SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables();
+ }
+ if (deserializationSchema == null) {
+ throw new IllegalArgumentException("The deserializationSchema has not been specified.");
+ }
+
+ if (subscriberWrapper == null) {
+ if (projectName == null || subscriptionName == null) {
+ throw new IllegalArgumentException("The ProjectName And SubscriptionName have not been specified.");
+ }
+
+ subscriberWrapper =
+ new SubscriberWrapper(serializableCredentialsProvider, ProjectSubscriptionName.of(projectName, subscriptionName));
+
+ if (hostAndPort != null) {
+ subscriberWrapper.withHostAndPort(hostAndPort);
+ }
+ }
+
+ sourceUnderConstruction.setSubscriberWrapper(subscriberWrapper);
+ sourceUnderConstruction.setDeserializationSchema(deserializationSchema);
+
+ return sourceUnderConstruction;
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java
new file mode 100644
index 0000000..fb75f43
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider;
+
+import com.google.api.core.ApiService;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannel;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+import java.io.Serializable;
+
+class SubscriberWrapper implements Serializable {
+ private final SerializableCredentialsProvider serializableCredentialsProvider;
+ private final String projectId;
+ private final String subscriptionId;
+ private String hostAndPort = null;
+
+ private transient Subscriber subscriber;
+ private transient ManagedChannel managedChannel = null;
+ private transient TransportChannel channel = null;
+
+ SubscriberWrapper(SerializableCredentialsProvider serializableCredentialsProvider, ProjectSubscriptionName projectSubscriptionName) {
+ this.serializableCredentialsProvider = serializableCredentialsProvider;
+ this.projectId = projectSubscriptionName.getProject();
+ this.subscriptionId = projectSubscriptionName.getSubscription();
+ }
+
+ void initialize(MessageReceiver messageReceiver) {
+ Subscriber.Builder builder = Subscriber
+ .newBuilder(ProjectSubscriptionName.of(projectId, subscriptionId), messageReceiver)
+ .setCredentialsProvider(serializableCredentialsProvider);
+
+ if (hostAndPort != null) {
+ managedChannel = ManagedChannelBuilder
+ .forTarget(hostAndPort)
+ .usePlaintext(true) // This is 'Ok' because this is ONLY used for testing.
+ .build();
+ channel = GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build();
+ builder.setChannelProvider(FixedTransportChannelProvider.create(channel));
+ }
+
+ this.subscriber = builder.build();
+ }
+
+ /**
+ * Set the custom hostname/port combination of PubSub.
+ * The ONLY reason to use this is during tests with the emulator provided by Google.
+ *
+ * @param hostAndPort The combination of hostname and port to connect to ("hostname:1234")
+ * @return The current instance
+ */
+ public SubscriberWrapper withHostAndPort(String hostAndPort) {
+ this.hostAndPort = hostAndPort;
+ return this;
+ }
+
+ void startBlocking() {
+ ApiService apiService = subscriber.startAsync();
+ apiService.awaitRunning();
+
+ if (apiService.state() != ApiService.State.RUNNING) {
+ throw new IllegalStateException("Could not start PubSubSubscriber, ApiService.State: " + apiService.state());
+ }
+ apiService.awaitTerminated();
+ }
+
+ void stop() {
+ subscriber.stopAsync().awaitTerminated();
+ if (channel != null) {
+ try {
+ channel.close();
+ managedChannel.shutdownNow();
+ } catch (Exception e) {
+ // Ignore
+ }
+ }
+ }
+
+ Subscriber getSubscriber() {
+ return subscriber;
+ }
+}
diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java
new file mode 100644
index 0000000..bd04058
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub.common;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.auth.Credentials;
+import com.google.cloud.NoCredentials;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * Wrapper class for CredentialsProvider to make it Serializable. This can be used to pass on Credentials to SourceFunctions
+ */
+public class SerializableCredentialsProvider implements CredentialsProvider, Serializable {
+ private final Credentials credentials;
+
+ /**
+ * @param credentials The google {@link Credentials} needed to connect to PubSub
+ */
+ public SerializableCredentialsProvider(Credentials credentials) {
+ this.credentials = credentials;
+ }
+
+ /**
+ * Creates a SerializableCredentialsProvider for a PubSubSubscription based on environment variables. {@link com.google.cloud.pubsub.v1.SubscriptionAdminSettings}
+ * @return serializableCredentialsProvider
+ * @throws IOException thrown by {@link Credentials}
+ */
+ public static SerializableCredentialsProvider credentialsProviderFromEnvironmentVariables() throws IOException {
+ Credentials credentials = defaultCredentialsProviderBuilder().build().getCredentials();
+ return new SerializableCredentialsProvider(credentials);
+ }
+
+ /**
+ * Creates a SerializableCredentialsProvider for a PubSubSubscription without any credentials. {@link com.google.cloud.pubsub.v1.SubscriptionAdminSettings}
+ * This is ONLY useful when running tests locally against Mockito or the Google PubSub emulator
+ * @see <a href="https://cloud.google.com/pubsub/docs/emulator" target="_top">https://cloud.google.com/pubsub/docs/emulator</a>.
+ * @return serializableCredentialsProvider
+ * @throws IOException thrown by {@link Credentials}
+ */
+ public static SerializableCredentialsProvider withoutCredentials() {
+ return new SerializableCredentialsProvider(NoCredentials.getInstance());
+ }
+
+ @Override
+ public Credentials getCredentials() {
+ return credentials;
+ }
+}
diff --git a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java
new file mode 100644
index 0000000..f98731b
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java
@@ -0,0 +1,112 @@
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+/**
+ * Test for {@link Bound}.
+ */
+public class BoundTest {
+ private SourceFunction<Object> sourceFunction = mock(SourceFunction.class);
+
+ @Test
+ public void testNoShutdownBeforeCounterLimit() {
+ Bound<Object> bound = Bound.boundByAmountOfMessages(10);
+ bound.start(sourceFunction);
+ sleep(150L);
+
+ bound.receivedMessage();
+ verifyZeroInteractions(sourceFunction);
+ }
+
+ @Test
+ public void testShutdownOnCounterLimit() {
+ Bound<Object> bound = Bound.boundByAmountOfMessages(3);
+ bound.start(sourceFunction);
+
+ bound.receivedMessage();
+ bound.receivedMessage();
+ bound.receivedMessage();
+
+ verify(sourceFunction, times(1)).cancel();
+ }
+
+ @Test
+ public void testNoShutdownBeforeTimerLimit() {
+ Bound<Object> bound = Bound.boundByTimeSinceLastMessage(1000L);
+ bound.start(sourceFunction);
+ for (int i = 0; i < 10; i++) {
+ bound.receivedMessage();
+ }
+
+ verifyZeroInteractions(sourceFunction);
+ }
+
+ @Test
+ public void testShutdownAfterTimerLimitNoMessageReceived() {
+ Bound<Object> bound = Bound.boundByTimeSinceLastMessage(100L);
+ bound.start(sourceFunction);
+ sleep(250L);
+ verify(sourceFunction, times(1)).cancel();
+ }
+
+ @Test
+ public void testShutdownAfterTimerLimitAfterMessageReceived() {
+ Bound<Object> bound = Bound.boundByTimeSinceLastMessage(100L);
+ bound.start(sourceFunction);
+ sleep(50L);
+
+ bound.receivedMessage();
+ sleep(50L);
+ verifyZeroInteractions(sourceFunction);
+
+ sleep(200L);
+ verify(sourceFunction, times(1)).cancel();
+ }
+
+ @Test
+ public void testCounterOrTimerMaxMessages() {
+ Bound<Object> bound = Bound.boundByAmountOfMessagesOrTimeSinceLastMessage(3, 1000L);
+ bound.start(sourceFunction);
+
+ bound.receivedMessage();
+ bound.receivedMessage();
+ bound.receivedMessage();
+
+ verify(sourceFunction, times(1)).cancel();
+ }
+
+ @Test
+ public void testCounterOrTimerTimerElapsed() {
+ Bound<Object> bound = Bound.boundByAmountOfMessagesOrTimeSinceLastMessage(1L, 100L);
+ bound.start(sourceFunction);
+ sleep(200L);
+ verify(sourceFunction, times(1)).cancel();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testExceptionThrownIfStartNotCalled() {
+ Bound<Object> bound = Bound.boundByAmountOfMessagesOrTimeSinceLastMessage(1L, 100L);
+ bound.receivedMessage();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testExceptionThrownIfStartCalledTwice() {
+ Bound<Object> bound = Bound.boundByAmountOfMessagesOrTimeSinceLastMessage(1L, 100L);
+ bound.start(sourceFunction);
+ bound.start(sourceFunction);
+ }
+
+ private void sleep(long sleepTime) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java
new file mode 100644
index 0000000..805f823
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java
@@ -0,0 +1,70 @@
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import org.junit.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link BoundedPubSubSource}.
+ */
+public class BoundedPubSubSourceTest {
+ private final Bound<Object> bound = mock(Bound.class);
+ private final SubscriberWrapper subscriberWrapper = mock(SubscriberWrapper.class);
+ private final SourceFunction.SourceContext<Object> sourceContext = mock(SourceFunction.SourceContext.class);
+ private final AckReplyConsumer ackReplyConsumer = mock(AckReplyConsumer.class);
+ private final DeserializationSchema<Object> deserializationSchema = mock(DeserializationSchema.class);
+
+ private FunctionInitializationContext functionInitializationContext = mock(FunctionInitializationContext.class);
+ private OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
+ private StreamingRuntimeContext streamingRuntimeContext = mock(StreamingRuntimeContext.class);
+
+ @Test
+ public void testBoundIsUsed() throws Exception {
+ BoundedPubSubSource<Object> boundedPubSubSource = createAndInitializeBoundedPubSubSource();
+ boundedPubSubSource.setBound(bound);
+
+ boundedPubSubSource.run(sourceContext);
+ verify(bound, times(1)).start(boundedPubSubSource);
+
+ boundedPubSubSource.receiveMessage(pubSubMessage(), ackReplyConsumer);
+ verify(bound, times(1)).receivedMessage();
+ }
+
+ private BoundedPubSubSource<Object> createAndInitializeBoundedPubSubSource() throws Exception {
+ when(sourceContext.getCheckpointLock()).thenReturn(new Object());
+ when(functionInitializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
+ when(operatorStateStore.getSerializableListState(any(String.class))).thenReturn(null);
+ when(streamingRuntimeContext.isCheckpointingEnabled()).thenReturn(true);
+
+ BoundedPubSubSource<Object> boundedPubSubSource = BoundedPubSubSource.newBuilder()
+ .withoutCredentials()
+ .withSubscriberWrapper(subscriberWrapper)
+ .withDeserializationSchema(deserializationSchema)
+ .build();
+ boundedPubSubSource.initializeState(functionInitializationContext);
+ boundedPubSubSource.setRuntimeContext(streamingRuntimeContext);
+ boundedPubSubSource.open(null);
+
+ return boundedPubSubSource;
+ }
+
+ private PubsubMessage pubSubMessage() {
+ return PubsubMessage.newBuilder()
+ .setMessageId("message-id")
+ .setData(ByteString.copyFrom("some-message".getBytes()))
+ .build();
+ }
+}
diff --git a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java
new file mode 100644
index 0000000..73ca53b
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
+
+
+/**
+ * Test for {@link SourceFunction}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class PubSubSourceTest {
+ private static final String MESSAGE = "Message";
+ private static final byte[] SERIALIZED_MESSAGE = MESSAGE.getBytes();
+ @Mock
+ private SubscriberWrapper subscriberWrapper;
+ @Mock
+ private org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<String> sourceContext;
+ @Mock
+ private DeserializationSchema<String> deserializationSchema;
+ @Mock
+ private AckReplyConsumer ackReplyConsumer;
+ @Mock
+ private StreamingRuntimeContext streamingRuntimeContext;
+ @Mock
+ private RuntimeContext runtimeContext;
+ @Mock
+ private OperatorStateStore operatorStateStore;
+ @Mock
+ private FunctionInitializationContext functionInitializationContext;
+
+ @Test
+ public void testOpenWithCheckpointing() throws Exception {
+ when(streamingRuntimeContext.isCheckpointingEnabled()).thenReturn(true);
+
+ PubSubSource<String> pubSubSource = createTestSource();
+ pubSubSource.setRuntimeContext(streamingRuntimeContext);
+ pubSubSource.open(null);
+
+ verify(subscriberWrapper, times(1)).initialize(pubSubSource);
+ }
+
+ @Test
+ public void testRun() throws IOException {
+ PubSubSource<String> pubSubSource = createTestSource();
+ pubSubSource.run(sourceContext);
+
+ verify(subscriberWrapper, times(1)).startBlocking();
+ }
+
+ @Test
+ public void testWithCheckpoints() throws Exception {
+ when(deserializationSchema.deserialize(SERIALIZED_MESSAGE)).thenReturn(MESSAGE);
+ when(streamingRuntimeContext.isCheckpointingEnabled()).thenReturn(true);
+ when(sourceContext.getCheckpointLock()).thenReturn("some object to lock on");
+ when(functionInitializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
+ when(operatorStateStore.getSerializableListState(any(String.class))).thenReturn(null);
+
+ PubSubSource<String> pubSubSource = createTestSource();
+ pubSubSource.initializeState(functionInitializationContext);
+ pubSubSource.setRuntimeContext(streamingRuntimeContext);
+ pubSubSource.open(null);
+ verify(subscriberWrapper, times(1)).initialize(pubSubSource);
+
+ pubSubSource.run(sourceContext);
+
+ pubSubSource.receiveMessage(pubSubMessage(), ackReplyConsumer);
+
+ verify(sourceContext, times(1)).getCheckpointLock();
+ verify(sourceContext, times(1)).collect(MESSAGE);
+ verifyZeroInteractions(ackReplyConsumer);
+ }
+
+ @Test
+ public void testMessagesAcknowledged() throws Exception {
+ when(streamingRuntimeContext.isCheckpointingEnabled()).thenReturn(true);
+
+ PubSubSource<String> pubSubSource = createTestSource();
+ pubSubSource.setRuntimeContext(streamingRuntimeContext);
+ pubSubSource.open(null);
+
+ List<AckReplyConsumer> input = Collections.singletonList(ackReplyConsumer);
+
+ pubSubSource.acknowledgeSessionIDs(input);
+
+ verify(ackReplyConsumer, times(1)).ack();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testOnceWithoutCheckpointing() throws Exception {
+ PubSubSource<String> pubSubSource = createTestSource();
+ pubSubSource.setRuntimeContext(runtimeContext);
+
+ pubSubSource.open(null);
+ }
+
+ private PubSubSource<String> createTestSource() throws IOException {
+ return PubSubSource.<String>newBuilder()
+ .withoutCredentials()
+ .withSubscriberWrapper(subscriberWrapper)
+ .withDeserializationSchema(deserializationSchema)
+ .build();
+ }
+
+ private PubsubMessage pubSubMessage() {
+ return PubsubMessage.newBuilder()
+ .setData(ByteString.copyFrom(SERIALIZED_MESSAGE))
+ .build();
+ }
+}
diff --git a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapperTest.java b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapperTest.java
new file mode 100644
index 0000000..f5deb07
--- /dev/null
+++ b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapperTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider;
+
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.apache.flink.api.java.ClosureCleaner.ensureSerializable;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Tests for {@link SubscriberWrapper}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class SubscriberWrapperTest {
+ @Mock
+ private SerializableCredentialsProvider credentialsProvider;
+
+ @Mock
+ private MessageReceiver messageReceiver;
+
+ @Test
+ public void testSerializedSubscriberBuilder() throws Exception {
+ SubscriberWrapper factory = new SubscriberWrapper(SerializableCredentialsProvider.withoutCredentials(), ProjectSubscriptionName.of("projectId", "subscriptionId"));
+ ensureSerializable(factory);
+ }
+
+ @Test
+ public void testInitialisation() {
+ SubscriberWrapper factory = new SubscriberWrapper(credentialsProvider, ProjectSubscriptionName.of("projectId", "subscriptionId"));
+ factory.initialize(messageReceiver);
+
+ assertThat(factory.getSubscriber().getSubscriptionNameString(), is(ProjectSubscriptionName.format("projectId", "subscriptionId")));
+ }
+}
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index e6d601d..1c47830 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -56,6 +56,7 @@ under the License.
<module>flink-connector-cassandra</module>
<module>flink-connector-filesystem</module>
<module>flink-connector-kafka</module>
+ <module>flink-connector-pubsub</module>
</modules>
<!-- override these root dependencies as 'provided', so they don't end up
diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml
index c1b52a4..888399e 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -64,6 +64,17 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-pubsub_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.cloud</groupId>
+ <artifactId>google-cloud-pubsub</artifactId>
+ <version>1.31.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
</dependency>
@@ -364,6 +375,7 @@ under the License.
</includes>
</configuration>
</execution>
+
</executions>
</plugin>
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/IntegerSerializer.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/IntegerSerializer.java
new file mode 100644
index 0000000..6e9d1d5
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/IntegerSerializer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.pubsub;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+import java.math.BigInteger;
+
+class IntegerSerializer implements DeserializationSchema<Integer>, SerializationSchema<Integer> {
+
+ @Override
+ public Integer deserialize(byte[] bytes) throws IOException {
+ return new BigInteger(bytes).intValue();
+ }
+
+ @Override
+ public boolean isEndOfStream(Integer integer) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Integer> getProducedType() {
+ return TypeInformation.of(Integer.class);
+ }
+
+ @Override
+ public byte[] serialize(Integer integer) {
+ return BigInteger.valueOf(integer).toByteArray();
+ }
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java
new file mode 100644
index 0000000..612da41
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.pubsub;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.pubsub.PubSubSink;
+import org.apache.flink.streaming.connectors.pubsub.PubSubSourceBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple PubSub example.
+ *
+ * <p>Before starting a flink job it will publish 10 messages on the input topic.
+ *
+ * Then a flink job is started to read these 10 messages from the input-subscription,
+ * it will print them to stdout
+ * and then write them to a the output-topic.</p>
+ */
+public class PubSubExample {
+ private static final Logger LOG = LoggerFactory.getLogger(PubSubExample.class);
+
+ public static void main(String[] args) throws Exception {
+ // parse input arguments
+ final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+ if (parameterTool.getNumberOfParameters() < 3) {
+ System.out.println("Missing parameters!\n" +
+ "Usage: flink run PubSub.jar --input-subscription <subscription> --input-topicName <topic> --output-topicName " +
+ "--google-project <google project name> ");
+ //return;
+ }
+
+ String projectName = parameterTool.getRequired("google-project");
+ String inputTopicName = parameterTool.getRequired("input-topicName");
+ String subscriptionName = parameterTool.getRequired("input-subscription");
+ String outputTopicName = parameterTool.getRequired("output-topicName");
+
+ PubSubPublisher pubSubPublisher = new PubSubPublisher(projectName, inputTopicName);
+
+ pubSubPublisher.publish();
+ runFlinkJob(projectName, subscriptionName, outputTopicName);
+ }
+
+ private static void runFlinkJob(String projectName, String subscriptionName, String outputTopicName) throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ env.addSource(PubSubSourceBuilder.<Integer>builder()
+ .withProjectSubscriptionName(projectName, subscriptionName)
+ .withDeserializationSchema(new IntegerSerializer())
+ .withMode(PubSubSourceBuilder.Mode.NONE)
+ .build())
+ .map(PubSubExample::printAndReturn).disableChaining()
+ .addSink(PubSubSink.<Integer>newBuilder()
+ .withProjectName(projectName)
+ .withTopicName(outputTopicName)
+ .withSerializationSchema(new IntegerSerializer())
+ .build());
+
+ env.execute("Flink Streaming PubSubReader");
+ }
+
+ private static Integer printAndReturn(Integer i) {
+ LOG.info("Processed message with payload: " + i);
+ return i;
+ }
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubPublisher.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubPublisher.java
new file mode 100644
index 0000000..7507945
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubPublisher.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.pubsub;
+
+import com.google.api.core.ApiFuture;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+
+import java.math.BigInteger;
+
+class PubSubPublisher {
+ private final String projectName;
+ private final String topicName;
+
+ PubSubPublisher(String projectName, String topicName) {
+ this.projectName = projectName;
+ this.topicName = topicName;
+ }
+
+ void publish() {
+ Publisher publisher = null;
+ try {
+ publisher = Publisher.newBuilder(ProjectTopicName.of(projectName, topicName)).build();
+ long counter = 0;
+ while (counter < 10) {
+ ByteString messageData = ByteString.copyFrom(BigInteger.valueOf(counter).toByteArray());
+ PubsubMessage message = PubsubMessage.newBuilder().setData(messageData).build();
+
+ ApiFuture<String> future = publisher.publish(message);
+ future.get();
+ System.out.println("Published message: " + counter);
+ Thread.sleep(100L);
+
+ counter++;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ if (publisher != null) {
+ publisher.shutdown();
+ }
+ } catch (Exception e) {
+ }
+ }
+ }
+}