You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/28 04:12:28 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

ableegoldman opened a new pull request #8738:
URL: https://github.com/apache/kafka/pull/8738


   Fixes the admin client timeout supplied to `future.get(timeout)` to always time out based on the configured `default.api.timeout.ms`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #8738: MINOR: remove unnecessary timeout for admin request

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #8738:
URL: https://github.com/apache/kafka/pull/8738


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r432013319



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -551,6 +551,7 @@ void runLoop() {
                 if (nextProbingRebalanceMs.get() < time.milliseconds()) {
                     log.info("Triggering the followup rebalance scheduled for {} ms.", nextProbingRebalanceMs.get());
                     mainConsumer.enforceRebalance();
+                    nextProbingRebalanceMs.set(Long.MAX_VALUE);

Review comment:
       Yep. It just provides a notice to the consumer to enforce that a rebalance will occur on the next poll 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r431984817



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1246,7 +1247,12 @@ public void cleanUp() {
         }
 
         log.debug("Current changelog positions: {}", allChangelogPositions);
-        final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets = fetchEndOffsetsWithoutTimeout(allPartitions, adminClient);
+        final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets =
+            fetchEndOffsets(
+                allPartitions,
+                adminClient,
+                getAdminDefaultApiTimeoutMs(config)

Review comment:
       I did it this way since we need to get the admin's default.api.timeout in other places where we only have the `streamsConfig`, so we may as well just pass that as the argument to `getAdminDefaultApiTimeoutMs`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8738: MINOR: remove unnecessary timeout for admin request

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#issuecomment-635711087


   Thanks! I actually don't think there ended up being anything relevant t o2.5 in the final form of this PR. Except maybe adding `@throws StreamsException` to the `allLocalStorePartitionLags` javadocs, but not sure that warrants an entire PR


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8738: MINOR: remove unnecessary timeout for admin request

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r431571493



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
##########
@@ -90,24 +96,21 @@ public static String getTaskProducerClientId(final String threadClientId, final
         return result;
     }
 
-    public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsetsWithoutTimeout(final Collection<TopicPartition> partitions,

Review comment:
       Removed this as we presumably always want to apply a timeout and do not want `KafkaStreams#allLocalStorePartitionLags` to block indefinitely 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r432123697



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1246,7 +1247,12 @@ public void cleanUp() {
         }
 
         log.debug("Current changelog positions: {}", allChangelogPositions);
-        final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets = fetchEndOffsetsWithoutTimeout(allPartitions, adminClient);
+        final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets =
+            fetchEndOffsets(
+                allPartitions,
+                adminClient,
+                getAdminDefaultApiTimeoutMs(config)

Review comment:
       This change might require a KIP... \cc @vvcephei @guozhangwang WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#issuecomment-635603479


   \o/


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8738: MINOR: remove unnecessary timeout for admin request

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#issuecomment-635706567


   Merged to `trunk` and cherry-picked to `2.6`. Do we want this in `2.5`, too? If yes, we would need a new PR. Not possible to cherry-pick.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8738: MINOR: remove unnecessary timeout for admin request

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#issuecomment-636131964


   Ack. In 2.5 we use the AdminClient directly in `allLocalStorePartitionLags` and don't apply a timeout on `get()`. -- Might still be worth to do quick PR to update the JavaDocs :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#issuecomment-635603373


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8738: MINOR: remove unnecessary timeout for admin request

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#issuecomment-635644310


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#issuecomment-635553612


   It looks like all the review comments are addressed, and all the tests passed, so I'll proceed to merge.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r432123361



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1246,7 +1247,12 @@ public void cleanUp() {
         }
 
         log.debug("Current changelog positions: {}", allChangelogPositions);
-        final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets = fetchEndOffsetsWithoutTimeout(allPartitions, adminClient);
+        final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets =
+            fetchEndOffsets(
+                allPartitions,
+                adminClient,
+                getAdminDefaultApiTimeoutMs(config)

Review comment:
       We should update the JavaDocs that this method may throw a `TimeoutException` now. What make we wondering if this is a public API change? Was there any discussion on the original KIP about the behavior of `allLocalStorePartitionLags` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r432121655



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
##########
@@ -45,42 +50,42 @@ public void fetchEndOffsetsShouldRethrowRuntimeExceptionAsStreamsException() {
         final Admin adminClient = EasyMock.createMock(AdminClient.class);
         EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andThrow(new RuntimeException());
         replay(adminClient);
-        assertThrows(StreamsException.class, () ->  fetchEndOffsetsWithoutTimeout(emptyList(), adminClient));
+        assertThrows(StreamsException.class, () ->  fetchEndOffsets(emptyList(), adminClient, 60_000L));
         verify(adminClient);
     }
 
     @Test
-    public void fetchEndOffsetsShouldRethrowInterruptedExceptionAsStreamsException() throws InterruptedException, ExecutionException {
+    public void fetchEndOffsetsShouldRethrowInterruptedExceptionAsStreamsException() throws Exception {
         final Admin adminClient = EasyMock.createMock(AdminClient.class);
         final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
         final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = EasyMock.createMock(KafkaFuture.class);
 
         EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result);
         EasyMock.expect(result.all()).andStubReturn(allFuture);
-        EasyMock.expect(allFuture.get()).andThrow(new InterruptedException());
+        EasyMock.expect(allFuture.get(60000L, TimeUnit.MILLISECONDS)).andThrow(new InterruptedException());

Review comment:
       As above (also below)
   
   Also nit: `60_000L` (if just `60` and `TimeUnit.SECONDS`?)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#issuecomment-635562905






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8738: MINOR: remove unnecessary timeout for admin request

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r432153623



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1246,7 +1246,7 @@ public void cleanUp() {
         }
 
         log.debug("Current changelog positions: {}", allChangelogPositions);
-        final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets = fetchEndOffsetsWithoutTimeout(allPartitions, adminClient);
+        final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets = fetchEndOffsets(allPartitions, adminClient);

Review comment:
       Can we please update the JavaDocs of `allLocalStorePartitionLags` to state that a `StreamsException` could be thrown?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r431974679



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
##########
@@ -89,7 +89,7 @@ public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, Il
                 mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
                 mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000"),
                 mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener),
-                mkEntry(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 9)
+                mkEntry(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 90_000)

Review comment:
       For fun

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
##########
@@ -89,7 +89,7 @@ public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, Il
                 mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
                 mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000"),
                 mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener),
-                mkEntry(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 9)
+                mkEntry(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 90_000)

Review comment:
       I mean, because the value has to be larger than the `request.timeout.ms` which defaults to 30,000




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r431984937



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
##########
@@ -90,24 +96,21 @@ public static String getTaskProducerClientId(final String threadClientId, final
         return result;
     }
 
-    public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsetsWithoutTimeout(final Collection<TopicPartition> partitions,
-                                                                                           final Admin adminClient) {
-        return fetchEndOffsets(partitions, adminClient, null);
+    public static int getAdminDefaultApiTimeoutMs(final StreamsConfig streamsConfig) {
+        final InternalAdminClientConfig dummyAdmin = new InternalAdminClientConfig(streamsConfig.getAdminConfigs("dummy"));

Review comment:
       Ack




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#issuecomment-635562805


   I just resolved the conflicts.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#issuecomment-635561957


   Oh, actually @ableegoldman , it looks like there was a conflict with the other PR I just merged.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r432006306



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
##########
@@ -89,7 +89,7 @@ public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, Il
                 mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
                 mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000"),
                 mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener),
-                mkEntry(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 9)
+                mkEntry(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 90_000)

Review comment:
       Ah, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8738: MINOR: remove unnecessary timeout for admin request

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#issuecomment-637012071


   Fair enough: https://github.com/apache/kafka/pull/8772


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r432009371



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -551,6 +551,7 @@ void runLoop() {
                 if (nextProbingRebalanceMs.get() < time.milliseconds()) {
                     log.info("Triggering the followup rebalance scheduled for {} ms.", nextProbingRebalanceMs.get());
                     mainConsumer.enforceRebalance();
+                    nextProbingRebalanceMs.set(Long.MAX_VALUE);

Review comment:
       enforceRebalance is guaranteed not to actually run the assignment logic, right? That will only run during a call to poll, I'm hoping. Otherwise, this line should go before the call.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r431571493



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
##########
@@ -90,24 +96,21 @@ public static String getTaskProducerClientId(final String threadClientId, final
         return result;
     }
 
-    public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsetsWithoutTimeout(final Collection<TopicPartition> partitions,

Review comment:
       Removed this as we presumably always want to apply a timeout and do not want `KafkaStreams#allLocalStorePartitionLags` to block indefinitely 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8738: MINOR: remove unnecessary timeout for admin request

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r432158766



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1246,7 +1246,7 @@ public void cleanUp() {
         }
 
         log.debug("Current changelog positions: {}", allChangelogPositions);
-        final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets = fetchEndOffsetsWithoutTimeout(allPartitions, adminClient);
+        final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets = fetchEndOffsets(allPartitions, adminClient);

Review comment:
       ack, done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#issuecomment-635473947


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8738: MINOR: remove unnecessary timeout for admin request

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r432153623



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1246,7 +1246,7 @@ public void cleanUp() {
         }
 
         log.debug("Current changelog positions: {}", allChangelogPositions);
-        final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets = fetchEndOffsetsWithoutTimeout(allPartitions, adminClient);
+        final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets = fetchEndOffsets(allPartitions, adminClient);

Review comment:
       Can we please update the JavaDocs of `allLocalStorePartitionLags` to state that a timeout exception could be thrown?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r431966350



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
##########
@@ -40,9 +41,14 @@
 import org.slf4j.LoggerFactory;
 
 public class ClientUtils {
-
     private static final Logger LOG = LoggerFactory.getLogger(ClientUtils.class);
 
+    static final class InternalAdminClientConfig extends AdminClientConfig {

Review comment:
       How about QuietAdminClientConfig, like QuietStreamsConfig?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
##########
@@ -90,24 +96,21 @@ public static String getTaskProducerClientId(final String threadClientId, final
         return result;
     }
 
-    public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsetsWithoutTimeout(final Collection<TopicPartition> partitions,
-                                                                                           final Admin adminClient) {
-        return fetchEndOffsets(partitions, adminClient, null);
+    public static int getAdminDefaultApiTimeoutMs(final StreamsConfig streamsConfig) {
+        final InternalAdminClientConfig dummyAdmin = new InternalAdminClientConfig(streamsConfig.getAdminConfigs("dummy"));
+        return dummyAdmin.getInt(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
     }
 
     public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsets(final Collection<TopicPartition> partitions,
                                                                              final Admin adminClient,
-                                                                             final Duration timeout) {
+                                                                             final long timeoutMs) {

Review comment:
       I think it's because we're now also calling it right after calling `getAdminDefaultApiTimeoutMs`, so it seems a bummer to create a Duration from millis and then immediately convert it back to millis. 

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
##########
@@ -89,7 +89,7 @@ public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, Il
                 mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
                 mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000"),
                 mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener),
-                mkEntry(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 9)
+                mkEntry(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 90_000)

Review comment:
       Just curious, why the change from 9 to 90,000?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r431974862



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
##########
@@ -40,9 +41,14 @@
 import org.slf4j.LoggerFactory;
 
 public class ClientUtils {
-
     private static final Logger LOG = LoggerFactory.getLogger(ClientUtils.class);
 
+    static final class InternalAdminClientConfig extends AdminClientConfig {

Review comment:
       Good call




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r431571250



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -44,12 +45,6 @@
     private final static String INTERRUPTED_ERROR_MESSAGE = "Thread got interrupted. This indicates a bug. " +
         "Please report at https://issues.apache.org/jira/projects/KAFKA or dev-mailing list (https://kafka.apache.org/contact).";
 
-    private static final class InternalAdminClientConfig extends AdminClientConfig {

Review comment:
       Moved to `ClientUtil`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8738: MINOR: remove unnecessary timeout for admin request

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#issuecomment-635633587


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r431571947



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -551,6 +551,7 @@ void runLoop() {
                 if (nextProbingRebalanceMs.get() < time.milliseconds()) {
                     log.info("Triggering the followup rebalance scheduled for {} ms.", nextProbingRebalanceMs.get());
                     mainConsumer.enforceRebalance();
+                    nextProbingRebalanceMs.set(Long.MAX_VALUE);

Review comment:
       This is neither relevant to this PR nor required for correctness, but I noticed the log message above tends to spam the logs in some tests. Since this gets set/reset at the end of every rebalance, we may as well reset it here to avoid an avalanche of `Triggering the followup rebalance...`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r432121258



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
##########
@@ -45,42 +50,42 @@ public void fetchEndOffsetsShouldRethrowRuntimeExceptionAsStreamsException() {
         final Admin adminClient = EasyMock.createMock(AdminClient.class);
         EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andThrow(new RuntimeException());
         replay(adminClient);
-        assertThrows(StreamsException.class, () ->  fetchEndOffsetsWithoutTimeout(emptyList(), adminClient));
+        assertThrows(StreamsException.class, () ->  fetchEndOffsets(emptyList(), adminClient, 60_000L));

Review comment:
       Should we pass in `MAX_VALUE` to avoid introducing test flakyness?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#issuecomment-635589437






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8738: MINOR: remove unnecessary timeout for admin request

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#issuecomment-635633530


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8738: MINOR: remove unnecessary timeout for admin request

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#issuecomment-635637908


   Checkstyle error:
   ```
   [ant:checkstyle] [ERROR] /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.12/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java:21:8: Unused import - org.apache.kafka.clients.admin.AdminClientConfig. [UnusedImports]
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8738: MINOR: remove unnecessary timeout for admin request

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#issuecomment-635644144


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r431976033



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
##########
@@ -90,24 +96,21 @@ public static String getTaskProducerClientId(final String threadClientId, final
         return result;
     }
 
-    public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsetsWithoutTimeout(final Collection<TopicPartition> partitions,
-                                                                                           final Admin adminClient) {
-        return fetchEndOffsets(partitions, adminClient, null);
+    public static int getAdminDefaultApiTimeoutMs(final StreamsConfig streamsConfig) {
+        final InternalAdminClientConfig dummyAdmin = new InternalAdminClientConfig(streamsConfig.getAdminConfigs("dummy"));
+        return dummyAdmin.getInt(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
     }
 
     public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsets(final Collection<TopicPartition> partitions,
                                                                              final Admin adminClient,
-                                                                             final Duration timeout) {
+                                                                             final long timeoutMs) {

Review comment:
       Yep




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8738: MINOR: apply default.api.timeout.ms to fetchEndOffsets

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#issuecomment-635563117


   ... or not


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org