You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/03 00:00:29 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

ableegoldman opened a new pull request #8787:
URL: https://github.com/apache/kafka/pull/8787


   Split out the optimized source changelogs and fetch the committed offsets rather than the end offset for task lag computation


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#discussion_r436208312



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -562,23 +564,18 @@ private void restoreChangelog(final ChangelogMetadata changelogMetadata) {
     }
 
     private Map<TopicPartition, Long> committedOffsetForChangelogs(final Set<TopicPartition> partitions) {
-        if (partitions.isEmpty())
-            return Collections.emptyMap();
-

Review comment:
       Ah, now I see it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
##########
@@ -95,19 +99,65 @@ public static String getTaskProducerClientId(final String threadClientId, final
         return result;
     }
 
-    public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsets(final Collection<TopicPartition> partitions,
-                                                                             final Admin adminClient) {
-        final Map<TopicPartition, ListOffsetsResultInfo> endOffsets;
+    /**
+     * @throws StreamsException if the consumer throws an exception
+     * @throws org.apache.kafka.common.errors.TimeoutException if the request times out
+     */
+    public static Map<TopicPartition, Long> fetchCommittedOffsets(final Set<TopicPartition> partitions,
+                                                                  final Consumer<byte[], byte[]> consumer) {
+        if (partitions.isEmpty()) {
+            return Collections.emptyMap();
+        }
+
+        final Map<TopicPartition, Long> committedOffsets;
         try {
-            final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> future =  adminClient.listOffsets(
-                partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())))
-                                                                                        .all();
-            endOffsets = future.get();
+            // those which do not have a committed offset would default to 0
+            committedOffsets = consumer.committed(partitions).entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : e.getValue().offset()));
+        } catch (final TimeoutException e) {
+            LOG.warn("The committed offsets request timed out, try increasing the consumer client's default.api.timeout.ms", e);
+            throw e;
+        } catch (final KafkaException e) {
+            LOG.warn("The committed offsets request failed.", e);
+            throw new StreamsException(String.format("Failed to retrieve end offsets for %s", partitions), e);
+        }
+
+        return committedOffsets;
+    }
 
+    public static KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> fetchEndOffsetsFuture(final Collection<TopicPartition> partitions,
+                                                                                                final Admin adminClient) {
+        return adminClient.listOffsets(
+            partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())))
+            .all();
+    }
+
+    /**
+     * A helper method that wraps the {@code Future#get} call and rethrows any thrown exception as a StreamsException
+     * @throws StreamsException if the admin client request throws an exception
+     * @throws org.apache.kafka.common.errors.TimeoutException if the request times out
+     */
+    public static Map<TopicPartition, ListOffsetsResultInfo> getEndOffsets(final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> endOffsetsFuture) {
+        try {
+            return endOffsetsFuture.get();
+        } catch (final TimeoutException e) {
+            LOG.warn("The listOffsets request timed out, try increasing the admin client's default.api.timeout.ms", e);
+            throw e;

Review comment:
       Upon retrospect, I'm not sure if this is possible. The javadoc for Future#get indicates that any exception would be wrapped in an ExecutionException. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#discussion_r435416204



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -562,23 +564,18 @@ private void restoreChangelog(final ChangelogMetadata changelogMetadata) {
     }
 
     private Map<TopicPartition, Long> committedOffsetForChangelogs(final Set<TopicPartition> partitions) {
-        if (partitions.isEmpty())
-            return Collections.emptyMap();
-

Review comment:
       What's the idea of dropping this?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -562,23 +564,18 @@ private void restoreChangelog(final ChangelogMetadata changelogMetadata) {
     }
 
     private Map<TopicPartition, Long> committedOffsetForChangelogs(final Set<TopicPartition> partitions) {
-        if (partitions.isEmpty())
-            return Collections.emptyMap();
-
         final Map<TopicPartition, Long> committedOffsets;
         try {
-            // those do not have a committed offset would default to 0
-            committedOffsets =  mainConsumer.committed(partitions).entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : e.getValue().offset()));
-        } catch (final TimeoutException e) {
-            // if it timed out we just retry next time.
-            return Collections.emptyMap();
-        } catch (final KafkaException e) {
-            throw new StreamsException(String.format("Failed to retrieve end offsets for %s", partitions), e);
+            committedOffsets = fetchCommittedOffsets(partitions, mainConsumer);
+        } catch (final StreamsException e) {
+            if (e.getCause() instanceof TimeoutException) {

Review comment:
       This seems to be a step backwards, actually. Why wrap it as a StreamsException only just to immediately unwrap it again?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#discussion_r438477719



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -763,18 +778,36 @@ private boolean populateClientStatesMap(final Map<UUID, ClientState> clientState
                     .flatMap(Collection::stream)
                     .collect(Collectors.toList());
 
-            final Collection<TopicPartition> allPreexistingChangelogPartitions = new ArrayList<>(allChangelogPartitions);
-            allPreexistingChangelogPartitions.removeIf(partition -> newlyCreatedChangelogs.contains(partition.topic()));
+            final Set<TopicPartition> preexistingChangelogPartitions = new HashSet<>();
+            final Set<TopicPartition> preexistingSourceChangelogPartitions = new HashSet<>();
+            final Set<TopicPartition> newlyCreatedChangelogPartitions = new HashSet<>();
+            for (final TopicPartition changelog : allChangelogPartitions) {
+                if (newlyCreatedChangelogs.contains(changelog.topic())) {
+                    newlyCreatedChangelogPartitions.add(changelog);
+                } else if (optimizedSourceChangelogs.contains(changelog.topic())) {
+                    preexistingSourceChangelogPartitions.add(changelog);
+                } else {
+                    preexistingChangelogPartitions.add(changelog);
+                }
+            }
+
+            // Make the listOffsets request first so it can  fetch the offsets for non-source changelogs
+            // asynchronously while we use the blocking Consumer#committed call to fetch source-changelog offsets
+            final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> endOffsetsFuture =
+                fetchEndOffsetsFuture(preexistingChangelogPartitions, adminClient);
 
-            final Collection<TopicPartition> allNewlyCreatedChangelogPartitions = new ArrayList<>(allChangelogPartitions);
-            allNewlyCreatedChangelogPartitions.removeAll(allPreexistingChangelogPartitions);
+            final Map<TopicPartition, Long> sourceChangelogEndOffsets =
+                fetchCommittedOffsets(preexistingSourceChangelogPartitions, taskManager.mainConsumer());
 
-            final Map<TopicPartition, ListOffsetsResultInfo> endOffsets =
-                fetchEndOffsets(allPreexistingChangelogPartitions, adminClient);
+            final Map<TopicPartition, ListOffsetsResultInfo> endOffsets = ClientUtils.getEndOffsets(endOffsetsFuture);
 
-            allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, changelogsByStatefulTask, allNewlyCreatedChangelogPartitions);
+            allTaskEndOffsetSums = computeEndOffsetSumsByTask(
+                changelogsByStatefulTask,
+                endOffsets,
+                sourceChangelogEndOffsets,
+                newlyCreatedChangelogPartitions);
             fetchEndOffsetsSuccessful = true;
-        } catch (final StreamsException e) {
+        } catch (final StreamsException | TimeoutException e) {

Review comment:
       That sounds reasonable, but I think if you throw an exception in the assignor, it just calls the assignor again in a tight loop, which seems worse than backing off and trying again later.
   
   If you want to propose this change, maybe you can verify what exactly happens if we throw. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#discussion_r435593477



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -562,23 +564,18 @@ private void restoreChangelog(final ChangelogMetadata changelogMetadata) {
     }
 
     private Map<TopicPartition, Long> committedOffsetForChangelogs(final Set<TopicPartition> partitions) {
-        if (partitions.isEmpty())
-            return Collections.emptyMap();
-
         final Map<TopicPartition, Long> committedOffsets;
         try {
-            // those do not have a committed offset would default to 0
-            committedOffsets =  mainConsumer.committed(partitions).entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : e.getValue().offset()));
-        } catch (final TimeoutException e) {
-            // if it timed out we just retry next time.
-            return Collections.emptyMap();
-        } catch (final KafkaException e) {
-            throw new StreamsException(String.format("Failed to retrieve end offsets for %s", partitions), e);
+            committedOffsets = fetchCommittedOffsets(partitions, mainConsumer);
+        } catch (final StreamsException e) {
+            if (e.getCause() instanceof TimeoutException) {

Review comment:
       I thought this might raise some eyebrows. I wanted to keep the ClientUtils methods consistent, and thought wrapping everything as a StreamsException would be cleaner. But maybe it makes more sense to throw the TimeoutException separately...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#issuecomment-641639091


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#issuecomment-641712436


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#issuecomment-642327022






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#discussion_r435647575



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -763,18 +778,36 @@ private boolean populateClientStatesMap(final Map<UUID, ClientState> clientState
                     .flatMap(Collection::stream)
                     .collect(Collectors.toList());
 
-            final Collection<TopicPartition> allPreexistingChangelogPartitions = new ArrayList<>(allChangelogPartitions);
-            allPreexistingChangelogPartitions.removeIf(partition -> newlyCreatedChangelogs.contains(partition.topic()));
+            final Set<TopicPartition> preexistingChangelogPartitions = new HashSet<>();
+            final Set<TopicPartition> preexistingSourceChangelogPartitions = new HashSet<>();
+            final Set<TopicPartition> newlyCreatedChangelogPartitions = new HashSet<>();
+            for (final TopicPartition changelog : allChangelogPartitions) {
+                if (newlyCreatedChangelogs.contains(changelog.topic())) {
+                    newlyCreatedChangelogPartitions.add(changelog);
+                } else if (optimizedSourceChangelogs.contains(changelog.topic())) {
+                    preexistingSourceChangelogPartitions.add(changelog);
+                } else {
+                    preexistingChangelogPartitions.add(changelog);
+                }
+            }
+
+            // Make the listOffsets request first so it can  fetch the offsets for non-source changelogs
+            // asynchronously while we use the blocking Consumer#committed call to fetch source-changelog offsets
+            final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> endOffsetsFuture =
+                fetchEndOffsetsFuture(preexistingChangelogPartitions, adminClient);
 
-            final Collection<TopicPartition> allNewlyCreatedChangelogPartitions = new ArrayList<>(allChangelogPartitions);
-            allNewlyCreatedChangelogPartitions.removeAll(allPreexistingChangelogPartitions);
+            final Map<TopicPartition, Long> sourceChangelogEndOffsets =
+                fetchCommittedOffsets(preexistingSourceChangelogPartitions, taskManager.mainConsumer());
 
-            final Map<TopicPartition, ListOffsetsResultInfo> endOffsets =
-                fetchEndOffsets(allPreexistingChangelogPartitions, adminClient);
+            final Map<TopicPartition, ListOffsetsResultInfo> endOffsets = ClientUtils.getEndOffsets(endOffsetsFuture);
 
-            allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, changelogsByStatefulTask, allNewlyCreatedChangelogPartitions);
+            allTaskEndOffsetSums = computeEndOffsetSumsByTask(
+                changelogsByStatefulTask,
+                endOffsets,
+                sourceChangelogEndOffsets,
+                newlyCreatedChangelogPartitions);
             fetchEndOffsetsSuccessful = true;
-        } catch (final StreamsException e) {
+        } catch (final StreamsException | TimeoutException e) {

Review comment:
       @vvcephei I've been wondering if maybe we should _only_  catch the TimeoutException, and interpret a StreamsException as fatal (like IllegalStateException for example). This is how we were using  `Consumer#committed` in the StoreChangelogReader, and AFAICT that only throws KafkaException on "unrecoverable errors" (quoted from javadocs)
   But I can't tell whether the Admin's `listOffsets` might throw on transient errors, so I'm leaning towards catching both just to be safe. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#discussion_r437788584



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
##########
@@ -95,19 +99,65 @@ public static String getTaskProducerClientId(final String threadClientId, final
         return result;
     }
 
-    public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsets(final Collection<TopicPartition> partitions,
-                                                                             final Admin adminClient) {
-        final Map<TopicPartition, ListOffsetsResultInfo> endOffsets;
+    /**
+     * @throws StreamsException if the consumer throws an exception
+     * @throws org.apache.kafka.common.errors.TimeoutException if the request times out
+     */
+    public static Map<TopicPartition, Long> fetchCommittedOffsets(final Set<TopicPartition> partitions,
+                                                                  final Consumer<byte[], byte[]> consumer) {
+        if (partitions.isEmpty()) {
+            return Collections.emptyMap();
+        }
+
+        final Map<TopicPartition, Long> committedOffsets;
         try {
-            final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> future =  adminClient.listOffsets(
-                partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())))
-                                                                                        .all();
-            endOffsets = future.get();
+            // those which do not have a committed offset would default to 0
+            committedOffsets = consumer.committed(partitions).entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : e.getValue().offset()));
+        } catch (final TimeoutException e) {
+            LOG.warn("The committed offsets request timed out, try increasing the consumer client's default.api.timeout.ms", e);
+            throw e;
+        } catch (final KafkaException e) {
+            LOG.warn("The committed offsets request failed.", e);
+            throw new StreamsException(String.format("Failed to retrieve end offsets for %s", partitions), e);
+        }
+
+        return committedOffsets;
+    }
 
+    public static KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> fetchEndOffsetsFuture(final Collection<TopicPartition> partitions,
+                                                                                                final Admin adminClient) {
+        return adminClient.listOffsets(
+            partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())))
+            .all();
+    }
+
+    /**
+     * A helper method that wraps the {@code Future#get} call and rethrows any thrown exception as a StreamsException
+     * @throws StreamsException if the admin client request throws an exception
+     * @throws org.apache.kafka.common.errors.TimeoutException if the request times out
+     */
+    public static Map<TopicPartition, ListOffsetsResultInfo> getEndOffsets(final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> endOffsetsFuture) {
+        try {
+            return endOffsetsFuture.get();
+        } catch (final TimeoutException e) {
+            LOG.warn("The listOffsets request timed out, try increasing the admin client's default.api.timeout.ms", e);
+            throw e;

Review comment:
       Good catch. Do you think it should still be thrown/treated separately, though?  See also my comment in StreamsPartitionAssignor below 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#discussion_r438495057



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -763,18 +778,36 @@ private boolean populateClientStatesMap(final Map<UUID, ClientState> clientState
                     .flatMap(Collection::stream)
                     .collect(Collectors.toList());
 
-            final Collection<TopicPartition> allPreexistingChangelogPartitions = new ArrayList<>(allChangelogPartitions);
-            allPreexistingChangelogPartitions.removeIf(partition -> newlyCreatedChangelogs.contains(partition.topic()));
+            final Set<TopicPartition> preexistingChangelogPartitions = new HashSet<>();
+            final Set<TopicPartition> preexistingSourceChangelogPartitions = new HashSet<>();
+            final Set<TopicPartition> newlyCreatedChangelogPartitions = new HashSet<>();
+            for (final TopicPartition changelog : allChangelogPartitions) {
+                if (newlyCreatedChangelogs.contains(changelog.topic())) {
+                    newlyCreatedChangelogPartitions.add(changelog);
+                } else if (optimizedSourceChangelogs.contains(changelog.topic())) {
+                    preexistingSourceChangelogPartitions.add(changelog);
+                } else {
+                    preexistingChangelogPartitions.add(changelog);
+                }
+            }
+
+            // Make the listOffsets request first so it can  fetch the offsets for non-source changelogs
+            // asynchronously while we use the blocking Consumer#committed call to fetch source-changelog offsets
+            final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> endOffsetsFuture =
+                fetchEndOffsetsFuture(preexistingChangelogPartitions, adminClient);
 
-            final Collection<TopicPartition> allNewlyCreatedChangelogPartitions = new ArrayList<>(allChangelogPartitions);
-            allNewlyCreatedChangelogPartitions.removeAll(allPreexistingChangelogPartitions);
+            final Map<TopicPartition, Long> sourceChangelogEndOffsets =
+                fetchCommittedOffsets(preexistingSourceChangelogPartitions, taskManager.mainConsumer());
 
-            final Map<TopicPartition, ListOffsetsResultInfo> endOffsets =
-                fetchEndOffsets(allPreexistingChangelogPartitions, adminClient);
+            final Map<TopicPartition, ListOffsetsResultInfo> endOffsets = ClientUtils.getEndOffsets(endOffsetsFuture);
 
-            allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, changelogsByStatefulTask, allNewlyCreatedChangelogPartitions);
+            allTaskEndOffsetSums = computeEndOffsetSumsByTask(
+                changelogsByStatefulTask,
+                endOffsets,
+                sourceChangelogEndOffsets,
+                newlyCreatedChangelogPartitions);
             fetchEndOffsetsSuccessful = true;
-        } catch (final StreamsException e) {
+        } catch (final StreamsException | TimeoutException e) {

Review comment:
       > if you throw an exception in the assignor, it just calls the assignor again in a tight loop
   
   Wouldn't the leader thread just die? Not saying that that's ideal, either. But it's at least in line with how exceptions thrown by other admin client requests in the assignment are currently handled.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#discussion_r435590436



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -562,23 +564,18 @@ private void restoreChangelog(final ChangelogMetadata changelogMetadata) {
     }
 
     private Map<TopicPartition, Long> committedOffsetForChangelogs(final Set<TopicPartition> partitions) {
-        if (partitions.isEmpty())
-            return Collections.emptyMap();
-

Review comment:
       The diff is a bit misleading, this was also factored out into the new `ClientUtils#fetchCommittedOffsets` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #8787:
URL: https://github.com/apache/kafka/pull/8787


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#issuecomment-641639379


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#issuecomment-639972603


   Java14 build passed, Java 11 and 8 builds failed with env issue


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#issuecomment-637888549


   Ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#issuecomment-639205947


   Hey @vvcephei, I addressed your comments and added tests.  Let me know if there's any test coverage that still seems missing


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#issuecomment-639893286


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org