You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/08/19 22:57:00 UTC
[flink] 01/04: [FLINK-13231] [pubsub] Replace Max outstanding
acknowledgement ids limit with a FlinkConnectorRateLimiter
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to annotated tag release-1.9.0-rc3
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c2d9aeacede65724912ed9a2ef87a23181869aa8
Author: Richard Deurwaarder <rd...@bol.com>
AuthorDate: Wed Aug 7 21:21:03 2019 +0200
[FLINK-13231] [pubsub] Replace Max outstanding acknowledgement ids limit with a FlinkConnectorRateLimiter
---
.../connectors/gcp/pubsub/PubSubSource.java | 46 +++++++++++-----------
.../connectors/gcp/pubsub/PubSubSourceTest.java | 17 ++++++--
.../examples/gcp/pubsub/PubSubExample.java | 1 +
3 files changed, 37 insertions(+), 27 deletions(-)
diff --git a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
index 4ddd816..1c7baaf 100644
--- a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
+++ b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
@@ -18,6 +18,8 @@
package org.apache.flink.streaming.connectors.gcp.pubsub;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;
+import org.apache.flink.api.common.io.ratelimiting.GuavaFlinkConnectorRateLimiter;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -40,8 +42,6 @@ import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
@@ -57,13 +57,12 @@ import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCreden
*/
public class PubSubSource<OUT> extends RichSourceFunction<OUT>
implements ResultTypeQueryable<OUT>, ParallelSourceFunction<OUT>, CheckpointListener, ListCheckpointed<AcknowledgeIdsForCheckpoint<String>> {
- public static final int NO_MAX_MESSAGES_TO_ACKNOWLEDGE_LIMIT = -1;
- private static final Logger LOG = LoggerFactory.getLogger(PubSubSource.class);
protected final PubSubDeserializationSchema<OUT> deserializationSchema;
protected final PubSubSubscriberFactory pubSubSubscriberFactory;
protected final Credentials credentials;
- protected final int maxMessagesToAcknowledge;
protected final AcknowledgeOnCheckpointFactory acknowledgeOnCheckpointFactory;
+ protected final FlinkConnectorRateLimiter rateLimiter;
+ protected final int messagePerSecondRateLimit;
protected transient AcknowledgeOnCheckpoint<String> acknowledgeOnCheckpoint;
protected transient PubSubSubscriber subscriber;
@@ -73,13 +72,15 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
PubSubSource(PubSubDeserializationSchema<OUT> deserializationSchema,
PubSubSubscriberFactory pubSubSubscriberFactory,
Credentials credentials,
- int maxMessagesToAcknowledge,
- AcknowledgeOnCheckpointFactory acknowledgeOnCheckpointFactory) {
+ AcknowledgeOnCheckpointFactory acknowledgeOnCheckpointFactory,
+ FlinkConnectorRateLimiter rateLimiter,
+ int messagePerSecondRateLimit) {
this.deserializationSchema = deserializationSchema;
this.pubSubSubscriberFactory = pubSubSubscriberFactory;
this.credentials = credentials;
- this.maxMessagesToAcknowledge = maxMessagesToAcknowledge;
this.acknowledgeOnCheckpointFactory = acknowledgeOnCheckpointFactory;
+ this.rateLimiter = rateLimiter;
+ this.messagePerSecondRateLimit = messagePerSecondRateLimit;
}
@Override
@@ -92,6 +93,10 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
getRuntimeContext().getMetricGroup().gauge("PubSubMessagesProcessedNotAcked", this::getOutstandingMessagesToAck);
+ //convert per-subtask-limit to global rate limit, as FlinkConnectorRateLimiter::setRate expects a global rate limit.
+ rateLimiter.setRate(messagePerSecondRateLimit * getRuntimeContext().getNumberOfParallelSubtasks());
+ rateLimiter.open(getRuntimeContext());
+
createAndSetPubSubSubscriber();
this.isRunning = true;
}
@@ -104,11 +109,6 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
public void run(SourceContext<OUT> sourceContext) throws Exception {
while (isRunning) {
try {
- if (maxMessagesToAcknowledgeLimitReached()) {
- LOG.debug("Sleeping because there are {} messages waiting to be ack'ed but limit is {}", getOutstandingMessagesToAck(), maxMessagesToAcknowledge);
- Thread.sleep(100);
- continue;
- }
processMessage(sourceContext, subscriber.pull());
} catch (InterruptedException | CancellationException e) {
@@ -119,6 +119,8 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
}
void processMessage(SourceContext<OUT> sourceContext, List<ReceivedMessage> messages) throws Exception {
+ rateLimiter.acquire(messages.size());
+
synchronized (sourceContext.getCheckpointLock()) {
for (ReceivedMessage message : messages) {
acknowledgeOnCheckpoint.addAcknowledgeId(message.getAckId());
@@ -137,10 +139,6 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
}
}
- private boolean maxMessagesToAcknowledgeLimitReached() throws Exception {
- return maxMessagesToAcknowledge != NO_MAX_MESSAGES_TO_ACKNOWLEDGE_LIMIT && getOutstandingMessagesToAck() > maxMessagesToAcknowledge;
- }
-
private Integer getOutstandingMessagesToAck() {
return acknowledgeOnCheckpoint.numberOfOutstandingAcknowledgements();
}
@@ -197,6 +195,7 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
private PubSubSubscriberFactory pubSubSubscriberFactory;
private Credentials credentials;
private int maxMessageToAcknowledge = 10000;
+ private int messagePerSecondRateLimit = 100000;
private PubSubSourceBuilder(DeserializationSchema<OUT> deserializationSchema) {
Preconditions.checkNotNull(deserializationSchema);
@@ -264,12 +263,13 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
}
/**
- * Set a limit of the number of outstanding or to-be acknowledged messages.
- * default is 10000. Adjust this if you have high checkpoint intervals and / or run into memory issues
- * due to the amount of acknowledgement ids. Use {@link PubSubSource}.NO_MAX_MESSAGES_TO_ACKNOWLEDGE_LIMIT if you want to remove the limit.
+ * Set a limit on the rate of messages per second received. This limit is per parallel instance of the source function.
+ * Default is set to 100000 messages per second.
+ *
+ * @param messagePerSecondRateLimit the message per second rate limit.
*/
- public PubSubSourceBuilder<OUT> withMaxMessageToAcknowledge(int maxMessageToAcknowledge) {
- this.maxMessageToAcknowledge = maxMessageToAcknowledge;
+ public PubSubSourceBuilder<OUT> withMessageRateLimit(int messagePerSecondRateLimit) {
+ this.messagePerSecondRateLimit = messagePerSecondRateLimit;
return this;
}
@@ -292,7 +292,7 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
100);
}
- return new PubSubSource<>(deserializationSchema, pubSubSubscriberFactory, credentials, maxMessageToAcknowledge, new AcknowledgeOnCheckpointFactory());
+ return new PubSubSource<>(deserializationSchema, pubSubSubscriberFactory, credentials, new AcknowledgeOnCheckpointFactory(), new GuavaFlinkConnectorRateLimiter(), messagePerSecondRateLimit);
}
}
diff --git a/flink-connectors/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSourceTest.java b/flink-connectors/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSourceTest.java
index 943f075..bb6f0c3 100644
--- a/flink-connectors/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSourceTest.java
+++ b/flink-connectors/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSourceTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.gcp.pubsub;
+import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -78,6 +79,8 @@ public class PubSubSourceTest {
private Credentials credentials;
@Mock
private PubSubSubscriber pubsubSubscriber;
+ @Mock
+ private FlinkConnectorRateLimiter rateLimiter;
private PubSubSource<String> pubSubSource;
@@ -91,8 +94,9 @@ public class PubSubSourceTest {
pubSubSource = new PubSubSource<>(deserializationSchema,
pubSubSubscriberFactory,
credentials,
- 100,
- acknowledgeOnCheckpointFactory);
+ acknowledgeOnCheckpointFactory,
+ rateLimiter,
+ 1024);
pubSubSource.setRuntimeContext(streamingRuntimeContext);
}
@@ -120,10 +124,15 @@ public class PubSubSourceTest {
when(sourceContext.getCheckpointLock()).thenReturn("some object to lock on");
pubSubSource.open(null);
- pubSubSource.processMessage(sourceContext, asList(receivedMessage("firstAckId", pubSubMessage(FIRST_MESSAGE)),
- receivedMessage("secondAckId", pubSubMessage(SECOND_MESSAGE))));
+ List<ReceivedMessage> receivedMessages = asList(
+ receivedMessage("firstAckId", pubSubMessage(FIRST_MESSAGE)),
+ receivedMessage("secondAckId", pubSubMessage(SECOND_MESSAGE))
+ );
+ pubSubSource.processMessage(sourceContext, receivedMessages);
//verify handling messages
+ verify(rateLimiter, times(1)).acquire(2);
+
verify(sourceContext, times(1)).getCheckpointLock();
verify(deserializationSchema, times(1)).isEndOfStream(FIRST_MESSAGE);
verify(deserializationSchema, times(1)).deserialize(pubSubMessage(FIRST_MESSAGE));
diff --git a/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java b/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
index a960176..b79c67e 100644
--- a/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
+++ b/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
@@ -67,6 +67,7 @@ public class PubSubExample {
.withDeserializationSchema(new IntegerSerializer())
.withProjectName(projectName)
.withSubscriptionName(subscriptionName)
+ .withMessageRateLimit(1)
.build())
.map(PubSubExample::printAndReturn).disableChaining()
.addSink(PubSubSink.newBuilder()