You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/10/05 19:45:27 UTC

[kafka] branch 2.5 updated: KAFKA-10531: Check for negative values to Thread.sleep call (#9347)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new d2c332a  KAFKA-10531: Check for negative values to Thread.sleep call (#9347)
d2c332a is described below

commit d2c332a1bebb02f876bbb6e3afeb75f952b8142f
Author: Vikas Singh <vi...@confluent.io>
AuthorDate: Mon Oct 5 12:06:55 2020 -0700

    KAFKA-10531: Check for negative values to Thread.sleep call (#9347)
    
    System.currentTimeMillis() is not monotonic, so using that to calculate time to sleep can result in negative values. That will throw IllegalArgumentException.
    
    This change checks for that and sleeps for a second (to avoid tight loop) if the value returned is negative.
    
    Author: Shaik Zakir Hussain <zh...@confluent.io>
    Reviewer: Randall Hauch <rh...@gmail.com>
---
 .../java/org/apache/kafka/connect/util/KafkaBasedLog.java | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 69d2588..5248715 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -31,7 +31,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +44,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 
 /**
@@ -70,7 +70,8 @@ import java.util.concurrent.Future;
  */
 public class KafkaBasedLog<K, V> {
     private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class);
-    private static final long CREATE_TOPIC_TIMEOUT_MS = 30000;
+    private static final long CREATE_TOPIC_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30);
+    private static final long MAX_SLEEP_MS = TimeUnit.SECONDS.toMillis(1);
 
     private Time time;
     private final String topic;
@@ -133,11 +134,13 @@ public class KafkaBasedLog<K, V> {
         List<TopicPartition> partitions = new ArrayList<>();
 
         // We expect that the topics will have been created either manually by the user or automatically by the herder
-        List<PartitionInfo> partitionInfos = null;
-        long started = time.milliseconds();
-        while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
+        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+        long started = time.nanoseconds();
+        long sleepMs = 100;
+        while (partitionInfos == null && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) {
+            time.sleep(sleepMs);
+            sleepMs = Math.min(2 * sleepMs, MAX_SLEEP_MS);
             partitionInfos = consumer.partitionsFor(topic);
-            Utils.sleep(Math.min(time.milliseconds() - started, 1000));
         }
         if (partitionInfos == null)
             throw new ConnectException("Could not look up partition metadata for offset backing store topic in" +