You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Cerchie (via GitHub)" <gi...@apache.org> on 2023/01/24 20:26:04 UTC

[GitHub] [kafka] Cerchie opened a new pull request, #13161: Kafka 14128

Cerchie opened a new pull request, #13161:
URL: https://github.com/apache/kafka/pull/13161

   In response to [14128](https://issues.apache.org/jira/browse/KAFKA-14128). Addresses by moving final catch condition into an else block. 
   
   Testing strategy: I'm attempting a unit test first.  I've cp'ed from the test above so I can use the MockTime and InternalTopicManager. Currently, when I run the test, it's throwing an error the top line of which is: 
   
   ```
   expected org.apache.kafka.common.errors.TimeoutException to be thrown, but nothing was thrown
   ```
   
   This is prior to the issue of figuring out the text of the error, which is my next step. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on pull request #13161: Kafka 14128

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on PR #13161:
URL: https://github.com/apache/kafka/pull/13161#issuecomment-1441323418

   Thanks for the PR. Merged to `trunk` and cherry-picked to `3.4` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Cerchie commented on a diff in pull request #13161: Kafka 14128

Posted by "Cerchie (via GitHub)" <gi...@apache.org>.
Cerchie commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1088951633


##########
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java:
##########
@@ -160,7 +160,7 @@ private void maybeThrowCancellationException(Throwable cause) {
      * Waits if necessary for this future to complete, and then returns its result.
      */
     @Override
-    public T get() throws InterruptedException, ExecutionException {
+    public abstract T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {

Review Comment:
   That makes sense! I think this change might be in here by accident -- I thought I had done that. Thanks for pointing that out. I'll remove it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Cerchie commented on a diff in pull request #13161: Kafka 14128

Posted by "Cerchie (via GitHub)" <gi...@apache.org>.
Cerchie commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1092026440


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -521,7 +524,7 @@ protected Map<String, Integer> getNumPartitions(final Set<String> topics,
         for (final Map.Entry<String, KafkaFuture<TopicDescription>> topicFuture : futures.entrySet()) {
             final String topicName = topicFuture.getKey();
             try {
-                final TopicDescription topicDescription = topicFuture.getValue().get();
+                final TopicDescription topicDescription = topicFuture.getValue().get(Long.parseLong(DEFAULT_API_TIMEOUT_MS_CONFIG), TimeUnit.MILLISECONDS);

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13161: Kafka 14128

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1090718859


##########
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java:
##########
@@ -160,7 +160,7 @@ private void maybeThrowCancellationException(Throwable cause) {
      * Waits if necessary for this future to complete, and then returns its result.
      */
     @Override
-    public T get() throws InterruptedException, ExecutionException {
+    public abstract T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {

Review Comment:
   Thanks for the follow-up. 
   
   Trying to understand your use case. 
   
   The `MetadataRequest` used to describe topics timed out. All retries were exhausted (4 lines) and a `TimeoutException`  (FQN `org.apache.kafka.common.errors.TimeoutException`) was propagated to the future (that is the future was completed exceptionally), and then propagated to the caller resulting in the behaviour observed.
   
   ```
   2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Disconnecting from node 3 due to request timeout.
   2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Cancelled in-flight METADATA request with correlation id 985 due to node 3 being disconnected (elapsed time since creation: 60023ms, elapsed time since send: 60023ms, request timeout: 30000ms)
   2022-07-29 13:39:37.867 ERROR 25843 --- [-StreamThread-1] o.a.k.s.p.i.InternalTopicManager         : stream-thread [main] Unexpected error during topic description for L.DII.A-COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog.
   Error message was: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1659101977830, tries=1, nextAllowedTryMs=1659101977955) timed out at 1659101977855 after 1 attempt(s)
   2022-07-29 13:39:37.869  INFO 25843 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
   ```
   
   Therefore, it seems that:
   
   - The failure of the "describe-topics" invocation was already correctly propagated via the future.
   - This failure results of the time out of the underlying Metadata request **and** exhaustion of retries.
   
   If the broker(s) were only temporarily unavailable, it seems increasing the number of retries may have helped?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Cerchie commented on a diff in pull request #13161: Kafka 14128

Posted by "Cerchie (via GitHub)" <gi...@apache.org>.
Cerchie commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1092027837


##########
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java:
##########
@@ -160,7 +160,7 @@ private void maybeThrowCancellationException(Throwable cause) {
      * Waits if necessary for this future to complete, and then returns its result.
      */
     @Override
-    public T get() throws InterruptedException, ExecutionException {
+    public abstract T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {

Review Comment:
   I'm not sure what you mean here -- for more on the problem, read https://issues.apache.org/jira/browse/KAFKA-14128 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax merged pull request #13161: Kafka 14128

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax merged PR #13161:
URL: https://github.com/apache/kafka/pull/13161


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Cerchie commented on a diff in pull request #13161: Kafka 14128

Posted by "Cerchie (via GitHub)" <gi...@apache.org>.
Cerchie commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1092034974


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -466,7 +469,10 @@ public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) {
                                                 topicName)
                                         );
                                     }
-                                } else {
+                                } else if (cause instanceof  TimeoutException) {

Review Comment:
   ah -- that's right. done. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13161: Kafka 14128

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1090718859


##########
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java:
##########
@@ -160,7 +160,7 @@ private void maybeThrowCancellationException(Throwable cause) {
      * Waits if necessary for this future to complete, and then returns its result.
      */
     @Override
-    public T get() throws InterruptedException, ExecutionException {
+    public abstract T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {

Review Comment:
   Thanks for the follow-up. 
   
   Trying to understand your use case. 
   
   The `MetadataRequest` used to describe topics timed out. All retries were exhausted (4 lines) and a `TimeoutException`  (FQN `org.apache.kafka.common.errors.TimeoutException`) was propagated to the future (that is, the future was completed exceptionally), and then propagated to the caller resulting in the behaviour observed.
   
   ```
   2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Disconnecting from node 3 due to request timeout.
   2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Cancelled in-flight METADATA request with correlation id 985 due to node 3 being disconnected (elapsed time since creation: 60023ms, elapsed time since send: 60023ms, request timeout: 30000ms)
   2022-07-29 13:39:37.867 ERROR 25843 --- [-StreamThread-1] o.a.k.s.p.i.InternalTopicManager         : stream-thread [main] Unexpected error during topic description for L.DII.A-COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog.
   Error message was: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1659101977830, tries=1, nextAllowedTryMs=1659101977955) timed out at 1659101977855 after 1 attempt(s)
   2022-07-29 13:39:37.869  INFO 25843 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
   ```
   
   Therefore, it seems that:
   
   - The failure of the "describe-topics" invocation was already correctly propagated via the future.
   - This failure results of the time out of the underlying Metadata request **and** exhaustion of retries.
   
   If the broker(s) were only temporarily unavailable, it seems increasing the number of retries may have helped?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13161: Kafka 14128

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1091223587


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -538,6 +544,8 @@ protected Map<String, Integer> getNumPartitions(final Set<String> topics,
                     tempUnknownTopics.add(topicName);
                     log.debug("The leader of topic {} is not available.\n" +
                         "Error message was: {}", topicName, cause.toString());
+                } else if (cause instanceof TimeoutException) {
+                    throw new RuntimeException();

Review Comment:
   We should not throw an exception here, but do what we do inside the existing `catch TimeoutException` block (that we can remove).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13161: Kafka 14128

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1091222822


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -466,7 +469,10 @@ public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) {
                                                 topicName)
                                         );
                                     }
-                                } else {
+                                } else if (cause instanceof  TimeoutException) {

Review Comment:
   I actually believe, we can remove the `catch TimeoutException` block below, because `get()` should never throw a timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Cerchie commented on a diff in pull request #13161: Kafka 14128

Posted by "Cerchie (via GitHub)" <gi...@apache.org>.
Cerchie commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1092035905


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -538,6 +544,8 @@ protected Map<String, Integer> getNumPartitions(final Set<String> topics,
                     tempUnknownTopics.add(topicName);
                     log.debug("The leader of topic {} is not available.\n" +
                         "Error message was: {}", topicName, cause.toString());
+                } else if (cause instanceof TimeoutException) {
+                    throw new RuntimeException();

Review Comment:
   Thank you -- removed the exception and the existing block and replaced the exception with the correct behavior. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13161: Kafka 14128

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1088898259


##########
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java:
##########
@@ -160,7 +160,7 @@ private void maybeThrowCancellationException(Throwable cause) {
      * Waits if necessary for this future to complete, and then returns its result.
      */
     @Override
-    public T get() throws InterruptedException, ExecutionException {
+    public abstract T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {

Review Comment:
   Apologies if I misread, why not use the method `get` with timeout line 177?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13161: Kafka 14128

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1091058054


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -521,7 +524,7 @@ protected Map<String, Integer> getNumPartitions(final Set<String> topics,
         for (final Map.Entry<String, KafkaFuture<TopicDescription>> topicFuture : futures.entrySet()) {
             final String topicName = topicFuture.getKey();
             try {
-                final TopicDescription topicDescription = topicFuture.getValue().get();
+                final TopicDescription topicDescription = topicFuture.getValue().get(Long.parseLong(DEFAULT_API_TIMEOUT_MS_CONFIG), TimeUnit.MILLISECONDS);

Review Comment:
   Do not put timeout parameters on Future.get. This will not abort the operation. If you want timeouts, they should be done by adjusting your admin client configuration parameters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org