You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2019/08/19 15:00:55 UTC

[flink] branch release-1.9 updated: [FLINK-13231] [pubsub] Replace Max outstanding acknowledgement ids limit with a FlinkConnectorRateLimiter

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new b3ce2c7  [FLINK-13231] [pubsub] Replace Max outstanding acknowledgement ids limit with a FlinkConnectorRateLimiter
b3ce2c7 is described below

commit b3ce2c76332da287ccf05d291fedb27517dd8c32
Author: Richard Deurwaarder <rd...@bol.com>
AuthorDate: Wed Aug 7 21:21:03 2019 +0200

    [FLINK-13231] [pubsub] Replace Max outstanding acknowledgement ids limit with a FlinkConnectorRateLimiter
---
 .../connectors/gcp/pubsub/PubSubSource.java        | 46 +++++++++++-----------
 .../connectors/gcp/pubsub/PubSubSourceTest.java    | 17 ++++++--
 .../examples/gcp/pubsub/PubSubExample.java         |  1 +
 3 files changed, 37 insertions(+), 27 deletions(-)

diff --git a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
index 4ddd816..1c7baaf 100644
--- a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
+++ b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.connectors.gcp.pubsub;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;
+import org.apache.flink.api.common.io.ratelimiting.GuavaFlinkConnectorRateLimiter;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -40,8 +42,6 @@ import com.google.cloud.pubsub.v1.Subscriber;
 import com.google.pubsub.v1.ProjectSubscriptionName;
 import com.google.pubsub.v1.PubsubMessage;
 import com.google.pubsub.v1.ReceivedMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -57,13 +57,12 @@ import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCreden
  */
 public class PubSubSource<OUT> extends RichSourceFunction<OUT>
 	implements ResultTypeQueryable<OUT>, ParallelSourceFunction<OUT>, CheckpointListener, ListCheckpointed<AcknowledgeIdsForCheckpoint<String>> {
-	public static final int NO_MAX_MESSAGES_TO_ACKNOWLEDGE_LIMIT = -1;
-	private static final Logger LOG = LoggerFactory.getLogger(PubSubSource.class);
 	protected final PubSubDeserializationSchema<OUT> deserializationSchema;
 	protected final PubSubSubscriberFactory pubSubSubscriberFactory;
 	protected final Credentials credentials;
-	protected final int maxMessagesToAcknowledge;
 	protected final AcknowledgeOnCheckpointFactory acknowledgeOnCheckpointFactory;
+	protected final FlinkConnectorRateLimiter rateLimiter;
+	protected final int messagePerSecondRateLimit;
 
 	protected transient AcknowledgeOnCheckpoint<String> acknowledgeOnCheckpoint;
 	protected transient PubSubSubscriber subscriber;
@@ -73,13 +72,15 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
 	PubSubSource(PubSubDeserializationSchema<OUT> deserializationSchema,
 				PubSubSubscriberFactory pubSubSubscriberFactory,
 				Credentials credentials,
-				int maxMessagesToAcknowledge,
-				AcknowledgeOnCheckpointFactory acknowledgeOnCheckpointFactory) {
+				AcknowledgeOnCheckpointFactory acknowledgeOnCheckpointFactory,
+				FlinkConnectorRateLimiter rateLimiter,
+				int messagePerSecondRateLimit) {
 		this.deserializationSchema = deserializationSchema;
 		this.pubSubSubscriberFactory = pubSubSubscriberFactory;
 		this.credentials = credentials;
-		this.maxMessagesToAcknowledge = maxMessagesToAcknowledge;
 		this.acknowledgeOnCheckpointFactory = acknowledgeOnCheckpointFactory;
+		this.rateLimiter = rateLimiter;
+		this.messagePerSecondRateLimit = messagePerSecondRateLimit;
 	}
 
 	@Override
@@ -92,6 +93,10 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
 
 		getRuntimeContext().getMetricGroup().gauge("PubSubMessagesProcessedNotAcked", this::getOutstandingMessagesToAck);
 
+		//convert per-subtask-limit to global rate limit, as FlinkConnectorRateLimiter::setRate expects a global rate limit.
+		rateLimiter.setRate(messagePerSecondRateLimit * getRuntimeContext().getNumberOfParallelSubtasks());
+		rateLimiter.open(getRuntimeContext());
+
 		createAndSetPubSubSubscriber();
 		this.isRunning = true;
 	}
@@ -104,11 +109,6 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
 	public void run(SourceContext<OUT> sourceContext) throws Exception {
 		while (isRunning) {
 			try {
-				if (maxMessagesToAcknowledgeLimitReached()) {
-					LOG.debug("Sleeping because there are {} messages waiting to be ack'ed but limit is {}", getOutstandingMessagesToAck(), maxMessagesToAcknowledge);
-					Thread.sleep(100);
-					continue;
-				}
 
 				processMessage(sourceContext, subscriber.pull());
 			} catch (InterruptedException | CancellationException e) {
@@ -119,6 +119,8 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
 	}
 
 	void processMessage(SourceContext<OUT> sourceContext, List<ReceivedMessage> messages) throws Exception {
+		rateLimiter.acquire(messages.size());
+
 		synchronized (sourceContext.getCheckpointLock()) {
 			for (ReceivedMessage message : messages) {
 				acknowledgeOnCheckpoint.addAcknowledgeId(message.getAckId());
@@ -137,10 +139,6 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
 		}
 	}
 
-	private boolean maxMessagesToAcknowledgeLimitReached() throws Exception {
-		return maxMessagesToAcknowledge != NO_MAX_MESSAGES_TO_ACKNOWLEDGE_LIMIT && getOutstandingMessagesToAck() > maxMessagesToAcknowledge;
-	}
-
 	private Integer getOutstandingMessagesToAck() {
 		return acknowledgeOnCheckpoint.numberOfOutstandingAcknowledgements();
 	}
@@ -197,6 +195,7 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
 		private PubSubSubscriberFactory pubSubSubscriberFactory;
 		private Credentials credentials;
 		private int maxMessageToAcknowledge = 10000;
+		private int messagePerSecondRateLimit = 100000;
 
 		private PubSubSourceBuilder(DeserializationSchema<OUT> deserializationSchema) {
 			Preconditions.checkNotNull(deserializationSchema);
@@ -264,12 +263,13 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
 		}
 
 		/**
-		 * Set a limit of the number of outstanding or to-be acknowledged messages.
-		 * default is 10000. Adjust this if you have high checkpoint intervals and / or run into memory issues
-		 * due to the amount of acknowledgement ids. Use {@link PubSubSource}.NO_MAX_MESSAGES_TO_ACKNOWLEDGE_LIMIT if you want to remove the limit.
+		 * Set a limit on the rate of messages per second received. This limit is per parallel instance of the source function.
+		 * Default is set to 100000 messages per second.
+		 *
+		 * @param messagePerSecondRateLimit the message per second rate limit.
 		 */
-		public PubSubSourceBuilder<OUT> withMaxMessageToAcknowledge(int maxMessageToAcknowledge) {
-			this.maxMessageToAcknowledge = maxMessageToAcknowledge;
+		public PubSubSourceBuilder<OUT> withMessageRateLimit(int messagePerSecondRateLimit) {
+			this.messagePerSecondRateLimit = messagePerSecondRateLimit;
 			return this;
 		}
 
@@ -292,7 +292,7 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
 																			100);
 			}
 
-			return new PubSubSource<>(deserializationSchema, pubSubSubscriberFactory, credentials, maxMessageToAcknowledge, new AcknowledgeOnCheckpointFactory());
+			return new PubSubSource<>(deserializationSchema, pubSubSubscriberFactory, credentials, new AcknowledgeOnCheckpointFactory(), new GuavaFlinkConnectorRateLimiter(), messagePerSecondRateLimit);
 		}
 	}
 
diff --git a/flink-connectors/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSourceTest.java b/flink-connectors/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSourceTest.java
index 943f075..bb6f0c3 100644
--- a/flink-connectors/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSourceTest.java
+++ b/flink-connectors/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSourceTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.gcp.pubsub;
 
+import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -78,6 +79,8 @@ public class PubSubSourceTest {
 	private Credentials credentials;
 	@Mock
 	private PubSubSubscriber pubsubSubscriber;
+	@Mock
+	private FlinkConnectorRateLimiter rateLimiter;
 
 	private PubSubSource<String> pubSubSource;
 
@@ -91,8 +94,9 @@ public class PubSubSourceTest {
 		pubSubSource = new PubSubSource<>(deserializationSchema,
 			pubSubSubscriberFactory,
 			credentials,
-			100,
-			acknowledgeOnCheckpointFactory);
+			acknowledgeOnCheckpointFactory,
+			rateLimiter,
+			1024);
 		pubSubSource.setRuntimeContext(streamingRuntimeContext);
 	}
 
@@ -120,10 +124,15 @@ public class PubSubSourceTest {
 		when(sourceContext.getCheckpointLock()).thenReturn("some object to lock on");
 
 		pubSubSource.open(null);
-		pubSubSource.processMessage(sourceContext, asList(receivedMessage("firstAckId", pubSubMessage(FIRST_MESSAGE)),
-															receivedMessage("secondAckId", pubSubMessage(SECOND_MESSAGE))));
+		List<ReceivedMessage> receivedMessages = asList(
+			receivedMessage("firstAckId", pubSubMessage(FIRST_MESSAGE)),
+			receivedMessage("secondAckId", pubSubMessage(SECOND_MESSAGE))
+		);
+		pubSubSource.processMessage(sourceContext, receivedMessages);
 
 		//verify handling messages
+		verify(rateLimiter, times(1)).acquire(2);
+
 		verify(sourceContext, times(1)).getCheckpointLock();
 		verify(deserializationSchema, times(1)).isEndOfStream(FIRST_MESSAGE);
 		verify(deserializationSchema, times(1)).deserialize(pubSubMessage(FIRST_MESSAGE));
diff --git a/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java b/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
index a960176..b79c67e 100644
--- a/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
+++ b/flink-examples/flink-examples-build-helper/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
@@ -67,6 +67,7 @@ public class PubSubExample {
 								.withDeserializationSchema(new IntegerSerializer())
 								.withProjectName(projectName)
 								.withSubscriptionName(subscriptionName)
+								.withMessageRateLimit(1)
 								.build())
 			.map(PubSubExample::printAndReturn).disableChaining()
 			.addSink(PubSubSink.newBuilder()