You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2019/07/05 12:46:20 UTC

[flink] 05/05: [FLINK-9311] [pubsub] Improvements to builders + minor improvement to PubSubSink flush logic

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f2494cec219ad48f64993a04bec5d20464c11c94
Author: Jiangjie (Becket) Qin <be...@gmail.com>
AuthorDate: Fri Jul 5 01:05:41 2019 +0800

    [FLINK-9311] [pubsub] Improvements to builders + minor improvement to PubSubSink flush logic
---
 docs/dev/connectors/pubsub.md                      |  8 +--
 .../connectors/gcp/pubsub/PubSubSink.java          | 60 ++++++++++++++--------
 .../connectors/gcp/pubsub/PubSubSource.java        | 29 +++++------
 .../gcp/pubsub/EmulatedPubSubSinkTest.java         |  4 +-
 .../gcp/pubsub/EmulatedPubSubSourceTest.java       |  2 +-
 .../examples/gcp/pubsub/PubSubExample.java         |  4 +-
 6 files changed, 61 insertions(+), 46 deletions(-)

diff --git a/docs/dev/connectors/pubsub.md b/docs/dev/connectors/pubsub.md
index bfa6f4a..7c14cd5 100644
--- a/docs/dev/connectors/pubsub.md
+++ b/docs/dev/connectors/pubsub.md
@@ -62,7 +62,7 @@ Example:
 StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
 
 DeserializationSchema<SomeObject> deserializer = (...);
-SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder(SomeObject.class)
+SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder()
                                                       .withDeserializationSchema(deserializer)
                                                       .withProjectName("project")
                                                       .withSubscriptionName("subscription")
@@ -89,7 +89,7 @@ Example:
 DataStream<SomeObject> dataStream = (...);
 
 SerializationSchema<SomeObject> serializationSchema = (...);
-SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder(SomeObject.class)
+SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder()
                                                 .withDeserializationSchema(deserializer)
                                                 .withProjectName("project")
                                                 .withSubscriptionName("subscription")
@@ -117,13 +117,13 @@ The following example shows how you would create a source to read messages from
 <div data-lang="java" markdown="1">
 {% highlight java %}
 DeserializationSchema<SomeObject> deserializationSchema = (...);
-SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder(SomeObject.class)
+SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder()
                                                       .withDeserializationSchema(deserializationSchema)
                                                       .withProjectName("my-fake-project")
                                                       .withSubscriptionName("subscription")
                                                       .withPubSubSubscriberFactory(new PubSubSubscriberFactoryForEmulator("localhost:1234", "my-fake-project", "subscription", 10, Duration.ofSeconds(15), 100))
                                                       .build();
-SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder(SomeObject.class)
+SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder()
                                                 .withDeserializationSchema(deserializationSchema)
                                                 .withProjectName("my-fake-project")
                                                 .withSubscriptionName("subscription")
diff --git a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java
index d280dc9..f314d3e 100644
--- a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java
+++ b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java
@@ -46,8 +46,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
@@ -63,7 +63,7 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed
 
 	private final AtomicReference<Exception> exceptionAtomicReference;
 	private final ApiFutureCallback<String> failureHandler;
-	private final ConcurrentLinkedQueue<ApiFuture<String>> outstandingFutures;
+	private final AtomicInteger numPendingFutures;
 	private final Credentials credentials;
 	private final SerializationSchema<IN> serializationSchema;
 	private final String projectName;
@@ -81,7 +81,7 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed
 		String hostAndPortForEmulator) {
 		this.exceptionAtomicReference = new AtomicReference<>();
 		this.failureHandler = new FailureHandler();
-		this.outstandingFutures = new ConcurrentLinkedQueue<>();
+		this.numPendingFutures = new AtomicInteger(0);
 		this.credentials = credentials;
 		this.serializationSchema = serializationSchema;
 		this.projectName = projectName;
@@ -162,18 +162,17 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed
 			.build();
 
 		ApiFuture<String> future = publisher.publish(pubsubMessage);
-		outstandingFutures.add(future);
+		numPendingFutures.incrementAndGet();
 		ApiFutures.addCallback(future, failureHandler, directExecutor());
 	}
 
 	/**
 	 * Create a builder for a new PubSubSink.
 	 *
-	 * @param <IN> The generic of the type that is to be written into the sink.
 	 * @return a new PubSubSinkBuilder instance
 	 */
-	public static <IN> SerializationSchemaBuilder<IN> newBuilder(Class<IN> clazz) {
-		return new PubSubSinkBuilder<>();
+	public static SerializationSchemaBuilder newBuilder() {
+		return new SerializationSchemaBuilder();
 	}
 
 	@Override
@@ -181,6 +180,8 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed
 		//before checkpoints make sure all the batched / buffered pubsub messages have actually been sent
 		publisher.publishAllOutstanding();
 
+		// At this point, no new messages will be published because this thread has successfully acquired
+		// the checkpoint lock. So we just wait for all the pending futures to complete.
 		waitForFuturesToComplete();
 		if (exceptionAtomicReference.get() != null) {
 			throw exceptionAtomicReference.get();
@@ -188,8 +189,17 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed
 	}
 
 	private void waitForFuturesToComplete() {
-		while (isRunning && !outstandingFutures.isEmpty()) {
-			outstandingFutures.removeIf(ApiFuture::isDone);
+		// We have to synchronize on numPendingFutures here to ensure the notification won't be missed.
+		synchronized (numPendingFutures) {
+			while (isRunning && numPendingFutures.get() > 0) {
+				try {
+					numPendingFutures.wait();
+				} catch (InterruptedException e) {
+					// Simply cache the interrupted exception. Supposedly the thread will exit the loop
+					// gracefully when it checks the isRunning flag.
+					LOG.info("Interrupted when waiting for futures to complete");
+				}
+			}
 		}
 	}
 
@@ -202,7 +212,7 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed
 	 *
 	 * @param <IN> Type of PubSubSink to create.
 	 */
-	public static class PubSubSinkBuilder<IN> implements SerializationSchemaBuilder<IN>, ProjectNameBuilder<IN>, TopicNameBuilder<IN> {
+	public static class PubSubSinkBuilder<IN> implements ProjectNameBuilder<IN>, TopicNameBuilder<IN> {
 		private SerializationSchema<IN> serializationSchema;
 		private String projectName;
 		private String topicName;
@@ -210,7 +220,9 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed
 		private Credentials credentials;
 		private String hostAndPort;
 
-		private PubSubSinkBuilder() { }
+		private PubSubSinkBuilder(SerializationSchema<IN> serializationSchema) {
+			this.serializationSchema = serializationSchema;
+		}
 
 		/**
 		 * Set the credentials.
@@ -225,13 +237,6 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed
 		}
 
 		@Override
-		public ProjectNameBuilder<IN> withSerializationSchema(SerializationSchema<IN> serializationSchema) {
-			Preconditions.checkNotNull(serializationSchema);
-			this.serializationSchema = serializationSchema;
-			return this;
-		}
-
-		@Override
 		public TopicNameBuilder<IN> withProjectName(String projectName) {
 			Preconditions.checkNotNull(projectName);
 			this.projectName = projectName;
@@ -275,11 +280,13 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed
 	/**
 	 * Part of {@link PubSubSinkBuilder} to set required fields.
 	 */
-	public interface SerializationSchemaBuilder<IN> {
+	public static class SerializationSchemaBuilder {
 		/**
 		 * Set the SerializationSchema used to Serialize objects to be added as payloads of PubSubMessages.
 		 */
-		ProjectNameBuilder<IN> withSerializationSchema(SerializationSchema<IN> deserializationSchema);
+		public <IN> ProjectNameBuilder<IN> withSerializationSchema(SerializationSchema<IN> deserializationSchema) {
+			return new PubSubSinkBuilder<>(deserializationSchema);
+		}
 	}
 
 	/**
@@ -305,13 +312,24 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed
 	private class FailureHandler implements ApiFutureCallback<String>, Serializable {
 		@Override
 		public void onFailure(Throwable t) {
+			ackAndMaybeNotifyNoPendingFutures();
 			exceptionAtomicReference.set(new RuntimeException("Failed trying to publish message", t));
 		}
 
 		@Override
 		public void onSuccess(String result) {
-			//do nothing on success
+			ackAndMaybeNotifyNoPendingFutures();
 			LOG.debug("Successfully published message with id: {}", result);
 		}
+
+		private void ackAndMaybeNotifyNoPendingFutures() {
+			// When there are no pending futures anymore, notify the thread that is waiting for
+			// all the pending futures to be completed.
+			if (numPendingFutures.decrementAndGet() == 0) {
+				synchronized (numPendingFutures) {
+					numPendingFutures.notify();
+				}
+			}
+		}
 	}
 }
diff --git a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
index d093e0a..4ddd816 100644
--- a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
+++ b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
@@ -155,8 +155,8 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
 		return deserializationSchema.getProducedType();
 	}
 
-	public static <OUT> DeserializationSchemaBuilder<OUT> newBuilder(Class<OUT> clazz) {
-		return new PubSubSourceBuilder<>();
+	public static DeserializationSchemaBuilder newBuilder() {
+		return new DeserializationSchemaBuilder();
 	}
 
 	@Override
@@ -189,7 +189,7 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
 	 *
 	 * @param <OUT> The type of objects which will be read
 	 */
-	public static class PubSubSourceBuilder<OUT> implements DeserializationSchemaBuilder<OUT>, ProjectNameBuilder<OUT>, SubscriptionNameBuilder<OUT> {
+	public static class PubSubSourceBuilder<OUT> implements ProjectNameBuilder<OUT>, SubscriptionNameBuilder<OUT> {
 		private PubSubDeserializationSchema<OUT> deserializationSchema;
 		private String projectName;
 		private String subscriptionName;
@@ -198,25 +198,18 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
 		private Credentials credentials;
 		private int maxMessageToAcknowledge = 10000;
 
-		protected PubSubSourceBuilder() {
-		}
-
-		@Override
-		public ProjectNameBuilder withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema) {
+		private PubSubSourceBuilder(DeserializationSchema<OUT> deserializationSchema) {
 			Preconditions.checkNotNull(deserializationSchema);
 			this.deserializationSchema = new DeserializationSchemaWrapper<>(deserializationSchema);
-			return this;
 		}
 
-		@Override
-		public ProjectNameBuilder withDeserializationSchema(PubSubDeserializationSchema deserializationSchema) {
+		private PubSubSourceBuilder(PubSubDeserializationSchema<OUT> deserializationSchema) {
 			Preconditions.checkNotNull(deserializationSchema);
 			this.deserializationSchema = deserializationSchema;
-			return this;
 		}
 
 		@Override
-		public SubscriptionNameBuilder withProjectName(String projectName) {
+		public SubscriptionNameBuilder<OUT> withProjectName(String projectName) {
 			Preconditions.checkNotNull(projectName);
 			this.projectName = projectName;
 			return this;
@@ -306,17 +299,21 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
 	/**
 	 * Part of {@link PubSubSourceBuilder} to set required fields.
 	 */
-	public interface DeserializationSchemaBuilder<OUT> {
+	public static class DeserializationSchemaBuilder {
 		/**
 		 * Set the DeserializationSchema used to deserialize incoming PubSubMessages.
 		 * If you want access to meta data of a PubSubMessage use the overloaded withDeserializationSchema({@link PubSubDeserializationSchema}) method instead.
 		 */
-		ProjectNameBuilder<OUT> withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema);
+		public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema) {
+			return new PubSubSourceBuilder<>(deserializationSchema);
+		}
 
 		/**
 		 * Set the DeserializationSchema used to deserialize incoming PubSubMessages.
 		 */
-		ProjectNameBuilder<OUT> withDeserializationSchema(PubSubDeserializationSchema<OUT> deserializationSchema);
+		public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(PubSubDeserializationSchema<OUT> deserializationSchema) {
+			return new PubSubSourceBuilder<>(deserializationSchema);
+		}
 	}
 
 	/**
diff --git a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java
index 0566e01..5c0c3b1 100644
--- a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java
+++ b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java
@@ -77,7 +77,7 @@ public class EmulatedPubSubSinkTest extends GCloudUnitTestBase {
 
 		// Sink into pubsub
 		theData
-			.addSink(PubSubSink.newBuilder(String.class)
+			.addSink(PubSubSink.newBuilder()
 								.withSerializationSchema(new SimpleStringSchema())
 								.withProjectName(PROJECT_NAME)
 								.withTopicName(TOPIC_NAME)
@@ -116,7 +116,7 @@ public class EmulatedPubSubSinkTest extends GCloudUnitTestBase {
 		env.addSource(new SingleInputSourceFunction())
 
 			.map((MapFunction<String, String>) StringUtils::reverse)
-			.addSink(PubSubSink.newBuilder(String.class)
+			.addSink(PubSubSink.newBuilder()
 								.withSerializationSchema(new SimpleStringSchema())
 								.withProjectName(PROJECT_NAME)
 								.withTopicName(TOPIC_NAME)
diff --git a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java
index f3f9b0a..b28569c 100644
--- a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java
+++ b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java
@@ -96,7 +96,7 @@ public class EmulatedPubSubSourceTest extends GCloudUnitTestBase {
 		env.setRestartStrategy(RestartStrategies.noRestart());
 
 		DataStream<String> fromPubSub = env
-			.addSource(PubSubSource.newBuilder(String.class)
+			.addSource(PubSubSource.newBuilder()
 								.withDeserializationSchema(new BoundedStringDeserializer(10))
 								.withProjectName(PROJECT_NAME)
 								.withSubscriptionName(SUBSCRIPTION_NAME)
diff --git a/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java b/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
index 7b66577..a960176 100644
--- a/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
+++ b/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
@@ -63,13 +63,13 @@ public class PubSubExample {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.enableCheckpointing(1000L);
 
-		env.addSource(PubSubSource.newBuilder(Integer.class)
+		env.addSource(PubSubSource.newBuilder()
 								.withDeserializationSchema(new IntegerSerializer())
 								.withProjectName(projectName)
 								.withSubscriptionName(subscriptionName)
 								.build())
 			.map(PubSubExample::printAndReturn).disableChaining()
-			.addSink(PubSubSink.newBuilder(Integer.class)
+			.addSink(PubSubSink.newBuilder()
 								.withSerializationSchema(new IntegerSerializer())
 								.withProjectName(projectName)
 								.withTopicName(outputTopicName).build());