You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "bvolpato (via GitHub)" <gi...@apache.org> on 2023/07/27 01:25:23 UTC

[GitHub] [beam] bvolpato opened a new pull request, #27702: Reduce number of connections used by KafkaIO restriction trackers

bvolpato opened a new pull request, #27702:
URL: https://github.com/apache/beam/pull/27702

   It was recently identified that `ReadFromKafkaDoFn`'s `restrictionTracker` leaked connections that are not being polled anymore.
   
   This can cause considerable performance degradation for Kafka clusters, as they rely on the `connections.max.idle.ms` to clean it up.
   
   
   It was also hard to track where new connections were coming from, so I've added a few logs to indicate when a new connection is needed in the critical path.
   
   
   `KafkaLatestOffsetEstimator` doesn't hold any state, so by reusing it across backlog statistics for the same DoFn instance, we can save a huge amount of connections.
   
   
   ![image](https://github.com/apache/beam/assets/3207647/2d394410-f4dd-4daa-87c8-2b0aacab2cb0)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] bvolpato commented on a diff in pull request #27702: Reduce number of connections used by KafkaIO restriction trackers

Posted by "bvolpato (via GitHub)" <gi...@apache.org>.
bvolpato commented on code in PR #27702:
URL: https://github.com/apache/beam/pull/27702#discussion_r1276660690


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -311,17 +324,26 @@ public OffsetRangeTracker restrictionTracker(
     if (restriction.getTo() < Long.MAX_VALUE) {
       return new OffsetRangeTracker(restriction);
     }
-    Map<String, Object> updatedConsumerConfig =
-        overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
-    KafkaLatestOffsetEstimator offsetPoller =
-        new KafkaLatestOffsetEstimator(
-            consumerFactoryFn.apply(

Review Comment:
   I may not follow why we need a pool. This proposal is to reuse the same consumer / estimator for the same topic partition over and over -- so not more than 1 connection.
   
   The intent was already to use memoization, so it's clearly just an estimate of the backlog.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on a diff in pull request #27702: Reduce number of connections used by KafkaIO restriction trackers

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #27702:
URL: https://github.com/apache/beam/pull/27702#discussion_r1276703515


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -311,17 +324,26 @@ public OffsetRangeTracker restrictionTracker(
     if (restriction.getTo() < Long.MAX_VALUE) {
       return new OffsetRangeTracker(restriction);
     }
-    Map<String, Object> updatedConsumerConfig =
-        overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
-    KafkaLatestOffsetEstimator offsetPoller =
-        new KafkaLatestOffsetEstimator(
-            consumerFactoryFn.apply(
-                KafkaIOUtils.getOffsetConsumerConfig(
-                    "tracker-" + kafkaSourceDescriptor.getTopicPartition(),
-                    offsetConsumerConfig,
-                    updatedConsumerConfig)),
-            kafkaSourceDescriptor.getTopicPartition());
-    return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetPoller);
+    final Map<TopicPartition, KafkaLatestOffsetEstimator> offsetEstimatorCacheInstance =
+        Preconditions.checkStateNotNull(this.offsetEstimatorCache);
+
+    TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
+    KafkaLatestOffsetEstimator offsetEstimator = offsetEstimatorCacheInstance.get(topicPartition);

Review Comment:
   I think it is good for the code's health to have the connections/consumers managed at an outer level and passed in, not created deep in the call stack. I also note that we are seeing more than 1 connection when we expect 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on a diff in pull request #27702: Reduce number of connections used by KafkaIO restriction trackers

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #27702:
URL: https://github.com/apache/beam/pull/27702#discussion_r1276649342


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -311,17 +324,26 @@ public OffsetRangeTracker restrictionTracker(
     if (restriction.getTo() < Long.MAX_VALUE) {
       return new OffsetRangeTracker(restriction);
     }
-    Map<String, Object> updatedConsumerConfig =
-        overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
-    KafkaLatestOffsetEstimator offsetPoller =
-        new KafkaLatestOffsetEstimator(
-            consumerFactoryFn.apply(
-                KafkaIOUtils.getOffsetConsumerConfig(
-                    "tracker-" + kafkaSourceDescriptor.getTopicPartition(),
-                    offsetConsumerConfig,
-                    updatedConsumerConfig)),
-            kafkaSourceDescriptor.getTopicPartition());
-    return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetPoller);
+    final Map<TopicPartition, KafkaLatestOffsetEstimator> offsetEstimatorCacheInstance =
+        Preconditions.checkStateNotNull(this.offsetEstimatorCache);
+
+    TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
+    KafkaLatestOffsetEstimator offsetEstimator = offsetEstimatorCacheInstance.get(topicPartition);

Review Comment:
   The estimator should be cheap and use a consumer that is created outside of it, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on pull request #27702: Reduce number of connections used by KafkaIO restriction trackers

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on PR #27702:
URL: https://github.com/apache/beam/pull/27702#issuecomment-1654251820

   Also totally possible I am reading it wrong or misunderstanding what I am hearing from everyone about testing of this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #27702: Reduce number of connections used by KafkaIO restriction trackers

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #27702:
URL: https://github.com/apache/beam/pull/27702#discussion_r1276622056


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -311,17 +324,26 @@ public OffsetRangeTracker restrictionTracker(
     if (restriction.getTo() < Long.MAX_VALUE) {
       return new OffsetRangeTracker(restriction);
     }
-    Map<String, Object> updatedConsumerConfig =
-        overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
-    KafkaLatestOffsetEstimator offsetPoller =
-        new KafkaLatestOffsetEstimator(
-            consumerFactoryFn.apply(
-                KafkaIOUtils.getOffsetConsumerConfig(
-                    "tracker-" + kafkaSourceDescriptor.getTopicPartition(),
-                    offsetConsumerConfig,
-                    updatedConsumerConfig)),
-            kafkaSourceDescriptor.getTopicPartition());
-    return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetPoller);
+    final Map<TopicPartition, KafkaLatestOffsetEstimator> offsetEstimatorCacheInstance =
+        Preconditions.checkStateNotNull(this.offsetEstimatorCache);
+
+    TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
+    KafkaLatestOffsetEstimator offsetEstimator = offsetEstimatorCacheInstance.get(topicPartition);
+    if (offsetEstimator == null || offsetEstimator.isClosed()) {

Review Comment:
   Consider add some comments about why caching approach is needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] bvolpato commented on a diff in pull request #27702: Reduce number of connections used by KafkaIO restriction trackers

Posted by "bvolpato (via GitHub)" <gi...@apache.org>.
bvolpato commented on code in PR #27702:
URL: https://github.com/apache/beam/pull/27702#discussion_r1276717799


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -311,17 +324,26 @@ public OffsetRangeTracker restrictionTracker(
     if (restriction.getTo() < Long.MAX_VALUE) {
       return new OffsetRangeTracker(restriction);
     }
-    Map<String, Object> updatedConsumerConfig =
-        overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
-    KafkaLatestOffsetEstimator offsetPoller =
-        new KafkaLatestOffsetEstimator(
-            consumerFactoryFn.apply(
-                KafkaIOUtils.getOffsetConsumerConfig(
-                    "tracker-" + kafkaSourceDescriptor.getTopicPartition(),
-                    offsetConsumerConfig,
-                    updatedConsumerConfig)),
-            kafkaSourceDescriptor.getTopicPartition());
-    return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetPoller);
+    final Map<TopicPartition, KafkaLatestOffsetEstimator> offsetEstimatorCacheInstance =
+        Preconditions.checkStateNotNull(this.offsetEstimatorCache);
+
+    TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
+    KafkaLatestOffsetEstimator offsetEstimator = offsetEstimatorCacheInstance.get(topicPartition);

Review Comment:
   I agree -- but TopicPartition comes as an element in a PCollection, so changing the scope is more complicated and requires more deep thinking and testing.
   
   There are some more fundamental changes that need to be done to make this happen -- but given the trouble that this can cause for a few clusters, this feels like a good patch to get in while we think about how to make SDF causing less overhead (I still see a bunch of short-lived consumers/connections for process continuation, which didn't happen in legacy).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on a diff in pull request #27702: Reduce number of connections used by KafkaIO restriction trackers

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #27702:
URL: https://github.com/apache/beam/pull/27702#discussion_r1276652844


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -311,17 +324,26 @@ public OffsetRangeTracker restrictionTracker(
     if (restriction.getTo() < Long.MAX_VALUE) {
       return new OffsetRangeTracker(restriction);
     }
-    Map<String, Object> updatedConsumerConfig =
-        overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
-    KafkaLatestOffsetEstimator offsetPoller =
-        new KafkaLatestOffsetEstimator(
-            consumerFactoryFn.apply(

Review Comment:
   Specifically per-topic-partition consumer pool? I'm inexpert but seems like this is the right granularity to be reusing objects/connections.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on a diff in pull request #27702: Reduce number of connections used by KafkaIO restriction trackers

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #27702:
URL: https://github.com/apache/beam/pull/27702#discussion_r1277730535


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -311,17 +324,26 @@ public OffsetRangeTracker restrictionTracker(
     if (restriction.getTo() < Long.MAX_VALUE) {
       return new OffsetRangeTracker(restriction);
     }
-    Map<String, Object> updatedConsumerConfig =
-        overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
-    KafkaLatestOffsetEstimator offsetPoller =
-        new KafkaLatestOffsetEstimator(
-            consumerFactoryFn.apply(

Review Comment:
   We saw ~5 rather than ~1 didn't we? I'm in too many chats on this :-). At heart, I just don't like the code approach of counting on memoizing a result value as the way we avoid creating a connection, and I prefer having the automatic creation take place at a higher level in the call stack, ideally at the DoFn level. I see that SDF and UnboundedSource have the difference that the topic partition is statically determined vs incoming elements. TBH I'd not expect a major architectural difference. To be continued!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] bvolpato commented on a diff in pull request #27702: Reduce number of connections used by KafkaIO restriction trackers

Posted by "bvolpato (via GitHub)" <gi...@apache.org>.
bvolpato commented on code in PR #27702:
URL: https://github.com/apache/beam/pull/27702#discussion_r1276656854


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -311,17 +324,26 @@ public OffsetRangeTracker restrictionTracker(
     if (restriction.getTo() < Long.MAX_VALUE) {
       return new OffsetRangeTracker(restriction);
     }
-    Map<String, Object> updatedConsumerConfig =
-        overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
-    KafkaLatestOffsetEstimator offsetPoller =
-        new KafkaLatestOffsetEstimator(
-            consumerFactoryFn.apply(
-                KafkaIOUtils.getOffsetConsumerConfig(
-                    "tracker-" + kafkaSourceDescriptor.getTopicPartition(),
-                    offsetConsumerConfig,
-                    updatedConsumerConfig)),
-            kafkaSourceDescriptor.getTopicPartition());
-    return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetPoller);
+    final Map<TopicPartition, KafkaLatestOffsetEstimator> offsetEstimatorCacheInstance =
+        Preconditions.checkStateNotNull(this.offsetEstimatorCache);
+
+    TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
+    KafkaLatestOffsetEstimator offsetEstimator = offsetEstimatorCacheInstance.get(topicPartition);

Review Comment:
   It is cheap -- but the reason why I didn't cache the Consumer is because `KafkaLatestOffsetEstimator` has the offsets memoized, which wouldn't do much unless you reuse the estimator.
   



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -311,17 +324,26 @@ public OffsetRangeTracker restrictionTracker(
     if (restriction.getTo() < Long.MAX_VALUE) {
       return new OffsetRangeTracker(restriction);
     }
-    Map<String, Object> updatedConsumerConfig =
-        overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
-    KafkaLatestOffsetEstimator offsetPoller =
-        new KafkaLatestOffsetEstimator(
-            consumerFactoryFn.apply(
-                KafkaIOUtils.getOffsetConsumerConfig(
-                    "tracker-" + kafkaSourceDescriptor.getTopicPartition(),
-                    offsetConsumerConfig,
-                    updatedConsumerConfig)),
-            kafkaSourceDescriptor.getTopicPartition());
-    return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetPoller);
+    final Map<TopicPartition, KafkaLatestOffsetEstimator> offsetEstimatorCacheInstance =
+        Preconditions.checkStateNotNull(this.offsetEstimatorCache);
+
+    TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
+    KafkaLatestOffsetEstimator offsetEstimator = offsetEstimatorCacheInstance.get(topicPartition);
+    if (offsetEstimator == null || offsetEstimator.isClosed()) {

Review Comment:
   Will add, good point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on a diff in pull request #27702: Reduce number of connections used by KafkaIO restriction trackers

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #27702:
URL: https://github.com/apache/beam/pull/27702#discussion_r1276649852


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -311,17 +324,26 @@ public OffsetRangeTracker restrictionTracker(
     if (restriction.getTo() < Long.MAX_VALUE) {
       return new OffsetRangeTracker(restriction);
     }
-    Map<String, Object> updatedConsumerConfig =
-        overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
-    KafkaLatestOffsetEstimator offsetPoller =
-        new KafkaLatestOffsetEstimator(
-            consumerFactoryFn.apply(

Review Comment:
   factor bad / pool good ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #27702: Reduce number of connections used by KafkaIO restriction trackers

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #27702:
URL: https://github.com/apache/beam/pull/27702#issuecomment-1652813439

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @kennknowles for label java.
   R: @ahmedabu98 for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles merged pull request #27702: Reduce number of connections used by KafkaIO restriction trackers

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles merged PR #27702:
URL: https://github.com/apache/beam/pull/27702


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org