You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/10/05 19:45:27 UTC
[kafka] branch 2.5 updated: KAFKA-10531: Check for negative values
to Thread.sleep call (#9347)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new d2c332a KAFKA-10531: Check for negative values to Thread.sleep call (#9347)
d2c332a is described below
commit d2c332a1bebb02f876bbb6e3afeb75f952b8142f
Author: Vikas Singh <vi...@confluent.io>
AuthorDate: Mon Oct 5 12:06:55 2020 -0700
KAFKA-10531: Check for negative values to Thread.sleep call (#9347)
System.currentTimeMillis() is not monotonic, so using that to calculate time to sleep can result in negative values. That will throw IllegalArgumentException.
This change checks for that and sleeps for a second (to avoid tight loop) if the value returned is negative.
Author: Shaik Zakir Hussain <zh...@confluent.io>
Reviewer: Randall Hauch <rh...@gmail.com>
---
.../java/org/apache/kafka/connect/util/KafkaBasedLog.java | 15 +++++++++------
1 file changed, 9 insertions(+), 6 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 69d2588..5248715 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -31,7 +31,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +44,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
/**
@@ -70,7 +70,8 @@ import java.util.concurrent.Future;
*/
public class KafkaBasedLog<K, V> {
private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class);
- private static final long CREATE_TOPIC_TIMEOUT_MS = 30000;
+ private static final long CREATE_TOPIC_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30);
+ private static final long MAX_SLEEP_MS = TimeUnit.SECONDS.toMillis(1);
private Time time;
private final String topic;
@@ -133,11 +134,13 @@ public class KafkaBasedLog<K, V> {
List<TopicPartition> partitions = new ArrayList<>();
// We expect that the topics will have been created either manually by the user or automatically by the herder
- List<PartitionInfo> partitionInfos = null;
- long started = time.milliseconds();
- while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
+ List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+ long started = time.nanoseconds();
+ long sleepMs = 100;
+ while (partitionInfos == null && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) {
+ time.sleep(sleepMs);
+ sleepMs = Math.min(2 * sleepMs, MAX_SLEEP_MS);
partitionInfos = consumer.partitionsFor(topic);
- Utils.sleep(Math.min(time.milliseconds() - started, 1000));
}
if (partitionInfos == null)
throw new ConnectException("Could not look up partition metadata for offset backing store topic in" +