You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2019/07/05 12:46:20 UTC
[flink] 05/05: [FLINK-9311] [pubsub] Improvements to builders +
minor improvement to PubSubSink flush logic
This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f2494cec219ad48f64993a04bec5d20464c11c94
Author: Jiangjie (Becket) Qin <be...@gmail.com>
AuthorDate: Fri Jul 5 01:05:41 2019 +0800
[FLINK-9311] [pubsub] Improvements to builders + minor improvement to PubSubSink flush logic
---
docs/dev/connectors/pubsub.md | 8 +--
.../connectors/gcp/pubsub/PubSubSink.java | 60 ++++++++++++++--------
.../connectors/gcp/pubsub/PubSubSource.java | 29 +++++------
.../gcp/pubsub/EmulatedPubSubSinkTest.java | 4 +-
.../gcp/pubsub/EmulatedPubSubSourceTest.java | 2 +-
.../examples/gcp/pubsub/PubSubExample.java | 4 +-
6 files changed, 61 insertions(+), 46 deletions(-)
diff --git a/docs/dev/connectors/pubsub.md b/docs/dev/connectors/pubsub.md
index bfa6f4a..7c14cd5 100644
--- a/docs/dev/connectors/pubsub.md
+++ b/docs/dev/connectors/pubsub.md
@@ -62,7 +62,7 @@ Example:
StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DeserializationSchema<SomeObject> deserializer = (...);
-SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder(SomeObject.class)
+SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder()
.withDeserializationSchema(deserializer)
.withProjectName("project")
.withSubscriptionName("subscription")
@@ -89,7 +89,7 @@ Example:
DataStream<SomeObject> dataStream = (...);
SerializationSchema<SomeObject> serializationSchema = (...);
-SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder(SomeObject.class)
+SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder()
.withDeserializationSchema(deserializer)
.withProjectName("project")
.withSubscriptionName("subscription")
@@ -117,13 +117,13 @@ The following example shows how you would create a source to read messages from
<div data-lang="java" markdown="1">
{% highlight java %}
DeserializationSchema<SomeObject> deserializationSchema = (...);
-SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder(SomeObject.class)
+SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder()
.withDeserializationSchema(deserializationSchema)
.withProjectName("my-fake-project")
.withSubscriptionName("subscription")
.withPubSubSubscriberFactory(new PubSubSubscriberFactoryForEmulator("localhost:1234", "my-fake-project", "subscription", 10, Duration.ofSeconds(15), 100))
.build();
-SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder(SomeObject.class)
+SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder()
.withDeserializationSchema(deserializationSchema)
.withProjectName("my-fake-project")
.withSubscriptionName("subscription")
diff --git a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java
index d280dc9..f314d3e 100644
--- a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java
+++ b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java
@@ -46,8 +46,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
@@ -63,7 +63,7 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed
private final AtomicReference<Exception> exceptionAtomicReference;
private final ApiFutureCallback<String> failureHandler;
- private final ConcurrentLinkedQueue<ApiFuture<String>> outstandingFutures;
+ private final AtomicInteger numPendingFutures;
private final Credentials credentials;
private final SerializationSchema<IN> serializationSchema;
private final String projectName;
@@ -81,7 +81,7 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed
String hostAndPortForEmulator) {
this.exceptionAtomicReference = new AtomicReference<>();
this.failureHandler = new FailureHandler();
- this.outstandingFutures = new ConcurrentLinkedQueue<>();
+ this.numPendingFutures = new AtomicInteger(0);
this.credentials = credentials;
this.serializationSchema = serializationSchema;
this.projectName = projectName;
@@ -162,18 +162,17 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed
.build();
ApiFuture<String> future = publisher.publish(pubsubMessage);
- outstandingFutures.add(future);
+ numPendingFutures.incrementAndGet();
ApiFutures.addCallback(future, failureHandler, directExecutor());
}
/**
* Create a builder for a new PubSubSink.
*
- * @param <IN> The generic of the type that is to be written into the sink.
* @return a new PubSubSinkBuilder instance
*/
- public static <IN> SerializationSchemaBuilder<IN> newBuilder(Class<IN> clazz) {
- return new PubSubSinkBuilder<>();
+ public static SerializationSchemaBuilder newBuilder() {
+ return new SerializationSchemaBuilder();
}
@Override
@@ -181,6 +180,8 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed
//before checkpoints make sure all the batched / buffered pubsub messages have actually been sent
publisher.publishAllOutstanding();
+ // At this point, no new messages will be published because this thread has successfully acquired
+ // the checkpoint lock. So we just wait for all the pending futures to complete.
waitForFuturesToComplete();
if (exceptionAtomicReference.get() != null) {
throw exceptionAtomicReference.get();
@@ -188,8 +189,17 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed
}
private void waitForFuturesToComplete() {
- while (isRunning && !outstandingFutures.isEmpty()) {
- outstandingFutures.removeIf(ApiFuture::isDone);
+ // We have to synchronize on numPendingFutures here to ensure the notification won't be missed.
+ synchronized (numPendingFutures) {
+ while (isRunning && numPendingFutures.get() > 0) {
+ try {
+ numPendingFutures.wait();
+ } catch (InterruptedException e) {
+ // Simply cache the interrupted exception. Supposedly the thread will exit the loop
+ // gracefully when it checks the isRunning flag.
+ LOG.info("Interrupted when waiting for futures to complete");
+ }
+ }
}
}
@@ -202,7 +212,7 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed
*
* @param <IN> Type of PubSubSink to create.
*/
- public static class PubSubSinkBuilder<IN> implements SerializationSchemaBuilder<IN>, ProjectNameBuilder<IN>, TopicNameBuilder<IN> {
+ public static class PubSubSinkBuilder<IN> implements ProjectNameBuilder<IN>, TopicNameBuilder<IN> {
private SerializationSchema<IN> serializationSchema;
private String projectName;
private String topicName;
@@ -210,7 +220,9 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed
private Credentials credentials;
private String hostAndPort;
- private PubSubSinkBuilder() { }
+ private PubSubSinkBuilder(SerializationSchema<IN> serializationSchema) {
+ this.serializationSchema = serializationSchema;
+ }
/**
* Set the credentials.
@@ -225,13 +237,6 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed
}
@Override
- public ProjectNameBuilder<IN> withSerializationSchema(SerializationSchema<IN> serializationSchema) {
- Preconditions.checkNotNull(serializationSchema);
- this.serializationSchema = serializationSchema;
- return this;
- }
-
- @Override
public TopicNameBuilder<IN> withProjectName(String projectName) {
Preconditions.checkNotNull(projectName);
this.projectName = projectName;
@@ -275,11 +280,13 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed
/**
* Part of {@link PubSubSinkBuilder} to set required fields.
*/
- public interface SerializationSchemaBuilder<IN> {
+ public static class SerializationSchemaBuilder {
/**
* Set the SerializationSchema used to Serialize objects to be added as payloads of PubSubMessages.
*/
- ProjectNameBuilder<IN> withSerializationSchema(SerializationSchema<IN> deserializationSchema);
+ public <IN> ProjectNameBuilder<IN> withSerializationSchema(SerializationSchema<IN> deserializationSchema) {
+ return new PubSubSinkBuilder<>(deserializationSchema);
+ }
}
/**
@@ -305,13 +312,24 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> implements Checkpointed
private class FailureHandler implements ApiFutureCallback<String>, Serializable {
@Override
public void onFailure(Throwable t) {
+ ackAndMaybeNotifyNoPendingFutures();
exceptionAtomicReference.set(new RuntimeException("Failed trying to publish message", t));
}
@Override
public void onSuccess(String result) {
- //do nothing on success
+ ackAndMaybeNotifyNoPendingFutures();
LOG.debug("Successfully published message with id: {}", result);
}
+
+ private void ackAndMaybeNotifyNoPendingFutures() {
+ // When there are no pending futures anymore, notify the thread that is waiting for
+ // all the pending futures to be completed.
+ if (numPendingFutures.decrementAndGet() == 0) {
+ synchronized (numPendingFutures) {
+ numPendingFutures.notify();
+ }
+ }
+ }
}
}
diff --git a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
index d093e0a..4ddd816 100644
--- a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
+++ b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
@@ -155,8 +155,8 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
return deserializationSchema.getProducedType();
}
- public static <OUT> DeserializationSchemaBuilder<OUT> newBuilder(Class<OUT> clazz) {
- return new PubSubSourceBuilder<>();
+ public static DeserializationSchemaBuilder newBuilder() {
+ return new DeserializationSchemaBuilder();
}
@Override
@@ -189,7 +189,7 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
*
* @param <OUT> The type of objects which will be read
*/
- public static class PubSubSourceBuilder<OUT> implements DeserializationSchemaBuilder<OUT>, ProjectNameBuilder<OUT>, SubscriptionNameBuilder<OUT> {
+ public static class PubSubSourceBuilder<OUT> implements ProjectNameBuilder<OUT>, SubscriptionNameBuilder<OUT> {
private PubSubDeserializationSchema<OUT> deserializationSchema;
private String projectName;
private String subscriptionName;
@@ -198,25 +198,18 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
private Credentials credentials;
private int maxMessageToAcknowledge = 10000;
- protected PubSubSourceBuilder() {
- }
-
- @Override
- public ProjectNameBuilder withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema) {
+ private PubSubSourceBuilder(DeserializationSchema<OUT> deserializationSchema) {
Preconditions.checkNotNull(deserializationSchema);
this.deserializationSchema = new DeserializationSchemaWrapper<>(deserializationSchema);
- return this;
}
- @Override
- public ProjectNameBuilder withDeserializationSchema(PubSubDeserializationSchema deserializationSchema) {
+ private PubSubSourceBuilder(PubSubDeserializationSchema<OUT> deserializationSchema) {
Preconditions.checkNotNull(deserializationSchema);
this.deserializationSchema = deserializationSchema;
- return this;
}
@Override
- public SubscriptionNameBuilder withProjectName(String projectName) {
+ public SubscriptionNameBuilder<OUT> withProjectName(String projectName) {
Preconditions.checkNotNull(projectName);
this.projectName = projectName;
return this;
@@ -306,17 +299,21 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
/**
* Part of {@link PubSubSourceBuilder} to set required fields.
*/
- public interface DeserializationSchemaBuilder<OUT> {
+ public static class DeserializationSchemaBuilder {
/**
* Set the DeserializationSchema used to deserialize incoming PubSubMessages.
* If you want access to meta data of a PubSubMessage use the overloaded withDeserializationSchema({@link PubSubDeserializationSchema}) method instead.
*/
- ProjectNameBuilder<OUT> withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema);
+ public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema) {
+ return new PubSubSourceBuilder<>(deserializationSchema);
+ }
/**
* Set the DeserializationSchema used to deserialize incoming PubSubMessages.
*/
- ProjectNameBuilder<OUT> withDeserializationSchema(PubSubDeserializationSchema<OUT> deserializationSchema);
+ public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(PubSubDeserializationSchema<OUT> deserializationSchema) {
+ return new PubSubSourceBuilder<>(deserializationSchema);
+ }
}
/**
diff --git a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java
index 0566e01..5c0c3b1 100644
--- a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java
+++ b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java
@@ -77,7 +77,7 @@ public class EmulatedPubSubSinkTest extends GCloudUnitTestBase {
// Sink into pubsub
theData
- .addSink(PubSubSink.newBuilder(String.class)
+ .addSink(PubSubSink.newBuilder()
.withSerializationSchema(new SimpleStringSchema())
.withProjectName(PROJECT_NAME)
.withTopicName(TOPIC_NAME)
@@ -116,7 +116,7 @@ public class EmulatedPubSubSinkTest extends GCloudUnitTestBase {
env.addSource(new SingleInputSourceFunction())
.map((MapFunction<String, String>) StringUtils::reverse)
- .addSink(PubSubSink.newBuilder(String.class)
+ .addSink(PubSubSink.newBuilder()
.withSerializationSchema(new SimpleStringSchema())
.withProjectName(PROJECT_NAME)
.withTopicName(TOPIC_NAME)
diff --git a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java
index f3f9b0a..b28569c 100644
--- a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java
+++ b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java
@@ -96,7 +96,7 @@ public class EmulatedPubSubSourceTest extends GCloudUnitTestBase {
env.setRestartStrategy(RestartStrategies.noRestart());
DataStream<String> fromPubSub = env
- .addSource(PubSubSource.newBuilder(String.class)
+ .addSource(PubSubSource.newBuilder()
.withDeserializationSchema(new BoundedStringDeserializer(10))
.withProjectName(PROJECT_NAME)
.withSubscriptionName(SUBSCRIPTION_NAME)
diff --git a/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java b/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
index 7b66577..a960176 100644
--- a/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
+++ b/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
@@ -63,13 +63,13 @@ public class PubSubExample {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000L);
- env.addSource(PubSubSource.newBuilder(Integer.class)
+ env.addSource(PubSubSource.newBuilder()
.withDeserializationSchema(new IntegerSerializer())
.withProjectName(projectName)
.withSubscriptionName(subscriptionName)
.build())
.map(PubSubExample::printAndReturn).disableChaining()
- .addSink(PubSubSink.newBuilder(Integer.class)
+ .addSink(PubSubSink.newBuilder()
.withSerializationSchema(new IntegerSerializer())
.withProjectName(projectName)
.withTopicName(outputTopicName).build());