You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/28 21:13:59 UTC

[GitHub] [kafka] soondenana opened a new pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

soondenana opened a new pull request #9347:
URL: https://github.com/apache/kafka/pull/9347


   System.currentTimeMillis() is not monotonic, so using that to calculate
   time to sleep can result in negative values. That will throw
   IllegalArgumentException.
   
   This change checks for that and sleeps for a second (to avoid tight
   loop) if the value returned is negative.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r499079604



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -331,6 +331,11 @@ public static ByteBuffer wrapNullable(byte[] array) {
      */
     public static void sleep(long ms) {
         try {
+            if (ms <= 0) {
+                // No need to sleep
+                log.debug("Skipping sleep as asked to sleep for {} msec", ms);

Review comment:
       Overflow is not uncommon when using things like Long.MaxValue for timeouts, so assuming negative is not a bug seems dangerous.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496270581



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -163,6 +163,18 @@ public void start() {
         log.info("Started KafkaBasedLog for topic " + topic);
     }
 
+    /**
+     * Sleep for some time so that topic used for this KafkaBasedLog gets created. Note that
+     * {@code System.currentTimeMillis()} is not monotonic, so check for that condition.

Review comment:
       You can use `hiResClockMs` so that you get the value in `ms`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
soondenana commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496271194



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
##########
@@ -536,6 +536,22 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
         PowerMock.verifyAll();
     }
 
+    /**
+     * Check if the waitForTopicCreate method doesn't throw if time moves backward, and works
+     * correctly if it increases.
+     */
+    @Test
+    public void testWatiForTopicCreate() {

Review comment:
       Fixed (actually removed the test)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r497017395



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -133,11 +133,14 @@ public void start() {
         List<TopicPartition> partitions = new ArrayList<>();
 
         // We expect that the topics will have been created either manually by the user or automatically by the herder
-        List<PartitionInfo> partitionInfos = null;
-        long started = time.milliseconds();
-        while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
+        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+        long started = time.nanoseconds();
+        long maxSleepMs = 1_000;
+        long sleepMs = 10;
+        while (partitionInfos == null && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) {
+            time.sleep(sleepMs);
+            sleepMs = Math.min(2 * sleepMs, maxSleepMs);
             partitionInfos = consumer.partitionsFor(topic);

Review comment:
       It seems to me the behavior of ```timeout``` is not consistent in consumer methods. The ```timeout``` used by other methods (for example:  ```position```, ```offsetsForTimes```, ```beginningOffsets``` and ```endOffsets```) is to await the result of specify partitions. It means consumer will send a request again if the timer is not expired and the specify partition has no metadata (i.e topics data has not been propagated yet). Maybe ```partitionsFor``` should be fixed for consistent behavior.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] soondenana commented on pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
soondenana commented on pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#issuecomment-702938936


   > @soondenana if the goal is to minimize the changes, would it be sufficient to change the code to use `Time.sleep(...)` instead of `Thread.sleep(...)`, and then change the `SystemTime.sleep(...)` implementation to return immediately if the supplied number of milliseconds is <= 0?
   > 
   > Are there performance implications of using `System.nanoTime()` instead of `System.currentTimeMillis()`?
   
   Thanks @rhauch for taking a look. The PR started with only fixing negative sleep vlaue, but as we started looking at code more, there were multiple issue so the "minimize change" idea was dropped. Here are 3 issues with original code:
   
   1. Negative sleep value
   2. Using elapsed time since loop started to decide on next sleep time.
   3. Sleeping even if the `partitionsFor` call would be successful for first time
   
   Considering that I decided to rewrite the loop to fix all three issues. 
   
   Good question on performance. It seems like `System.nanoTime()` is slower than `System.currentTimeMillis` (one is read from h/w using i/o another one is reading from memory), but not sure if that has any implication here for this use case. We are waiting for topic to appear in metadata and sleeping for seconds. Few milliseconds difference should not matter.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
soondenana commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r499033111



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -70,7 +70,7 @@
  */
 public class KafkaBasedLog<K, V> {
     private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class);
-    private static final long CREATE_TOPIC_TIMEOUT_MS = 30000;
+    private static final long CREATE_TOPIC_TIMEOUT_NS = TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] soondenana commented on pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
soondenana commented on pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#issuecomment-703805880


   There was an error when building `streams.examples`:
   
   ```
   [2020-10-05T08:40:05.722Z] [ERROR] Failed to execute goal org.apache.maven.plugins:maven-archetype-plugin:3.2.0:generate (default-cli) on project standalone-pom: A Maven project already exists in the directory /home/jenkins/workspace/Kafka_kafka-pr_PR-9347/streams/quickstart/test-streams-archetype/streams.examples -> [Help 1]
   ```
   
   The failure is not related to this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496904812



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -133,11 +133,14 @@ public void start() {
         List<TopicPartition> partitions = new ArrayList<>();
 
         // We expect that the topics will have been created either manually by the user or automatically by the herder
-        List<PartitionInfo> partitionInfos = null;
-        long started = time.milliseconds();
-        while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
+        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+        long started = time.nanoseconds();
+        long maxSleepMs = 1_000;
+        long sleepMs = 10;
+        while (partitionInfos == null && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) {
+            time.sleep(sleepMs);
+            sleepMs = Math.min(2 * sleepMs, maxSleepMs);
             partitionInfos = consumer.partitionsFor(topic);

Review comment:
       Why not using ```partitionsFor(String topic, Duration timeout) ``` (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1944) to replace while loop?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
soondenana commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496275381



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -133,11 +134,14 @@ public void start() {
         List<TopicPartition> partitions = new ArrayList<>();
 
         // We expect that the topics will have been created either manually by the user or automatically by the herder
-        List<PartitionInfo> partitionInfos = null;
-        long started = time.milliseconds();
-        while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
+        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+        long started = time.nanoseconds();
+        long maxSleepMs = 1_000;
+        long sleepMs = 10;
+        while (partitionInfos == null && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) {
+            Utils.sleep(sleepMs);

Review comment:
       Updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496240453



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -163,6 +163,18 @@ public void start() {
         log.info("Started KafkaBasedLog for topic " + topic);
     }
 
+    /**
+     * Sleep for some time so that topic used for this KafkaBasedLog gets created. Note that
+     * {@code System.currentTimeMillis()} is not monotonic, so check for that condition.

Review comment:
       Why don't we use a monotonic timer?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#issuecomment-702944802


   > Considering that I decided to rewrite the loop to fix all three issues.
   
   That makes sense, and I noticed the same when reviewing.
   
   > Good question on performance. It seems like System.nanoTime() is slower than System.currentTimeMillis (one is read from h/w using i/o another one is reading from memory), but not sure if that has any implication here for this use case. We are waiting for topic to appear in metadata and sleeping for seconds. Few milliseconds difference should not matter.
   
   That's probably true. But if `Thread.sleep(negativeTime)` throws an `IllegalArgumentException`, should we also change `Utils.sleep(...)` (called by `SystemTime.sleep(...)` that is now used above) to check and return immediately for a negative number of milliseconds to sleep?
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r499023599



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -70,7 +70,7 @@
  */
 public class KafkaBasedLog<K, V> {
     private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class);
-    private static final long CREATE_TOPIC_TIMEOUT_MS = 30000;
+    private static final long CREATE_TOPIC_TIMEOUT_NS = TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);

Review comment:
       We are often using this pattern in other places for time-related constants:
   ```suggestion
       private static final long CREATE_TOPIC_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -133,11 +133,14 @@ public void start() {
         List<TopicPartition> partitions = new ArrayList<>();
 
         // We expect that the topics will have been created either manually by the user or automatically by the herder
-        List<PartitionInfo> partitionInfos = null;
-        long started = time.milliseconds();
-        while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
+        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+        long started = time.nanoseconds();
+        long maxSleepMs = 1_000;

Review comment:
       Let's define a constant above:
   ```
   private static final long MAX_SLEEP_MS = TimeUnit.SECONDS.toMillis(1);
   ```
   and then we can replace `maxSleepMs` with `MAX_SLEEP_MS`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
soondenana commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496983721



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -133,11 +133,14 @@ public void start() {
         List<TopicPartition> partitions = new ArrayList<>();
 
         // We expect that the topics will have been created either manually by the user or automatically by the herder
-        List<PartitionInfo> partitionInfos = null;
-        long started = time.milliseconds();
-        while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
+        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+        long started = time.nanoseconds();
+        long maxSleepMs = 1_000;
+        long sleepMs = 10;
+        while (partitionInfos == null && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) {
+            time.sleep(sleepMs);
+            sleepMs = Math.min(2 * sleepMs, maxSleepMs);
             partitionInfos = consumer.partitionsFor(topic);

Review comment:
       I think the semantics needed here is different. The timeout in `partitionsFor` is the max amount of time the api can block waiting for response before it fails with `TimeoutException`. However, the api can return within timeout with empty results as newly created topics data has not been propagated yet. We then have to retry again until `partitionsFor` returns the partition data (upto a max time).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r499079604



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -331,6 +331,11 @@ public static ByteBuffer wrapNullable(byte[] array) {
      */
     public static void sleep(long ms) {
         try {
+            if (ms <= 0) {
+                // No need to sleep
+                log.debug("Skipping sleep as asked to sleep for {} msec", ms);

Review comment:
       Overflow is not uncommon when using things like Long.MaxValue, so assuming negative is not a bug seems dangerous.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
soondenana commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r499033230



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -133,11 +133,14 @@ public void start() {
         List<TopicPartition> partitions = new ArrayList<>();
 
         // We expect that the topics will have been created either manually by the user or automatically by the herder
-        List<PartitionInfo> partitionInfos = null;
-        long started = time.milliseconds();
-        while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
+        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+        long started = time.nanoseconds();
+        long maxSleepMs = 1_000;

Review comment:
       Updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
soondenana commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496267175



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -163,6 +163,18 @@ public void start() {
         log.info("Started KafkaBasedLog for topic " + topic);
     }
 
+    /**
+     * Sleep for some time so that topic used for this KafkaBasedLog gets created. Note that
+     * {@code System.currentTimeMillis()} is not monotonic, so check for that condition.

Review comment:
       In fact I don't like this loop altogether. Going to rewrite it so that it doesn't use these constructs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] splett2 commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
splett2 commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496238384



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
##########
@@ -536,6 +536,22 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
         PowerMock.verifyAll();
     }
 
+    /**
+     * Check if the waitForTopicCreate method doesn't throw if time moves backward, and works
+     * correctly if it increases.
+     */
+    @Test
+    public void testWatiForTopicCreate() {

Review comment:
       nit: typo in `Wati`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#issuecomment-702915039


   @soondenana if the goal is to minimize the changes, would it be sufficient to change the code to use `Time.sleep(...)` instead of `Thread.sleep(...)`, and then change the `SystemTime.sleep(...)` implementation to return immediately if the supplied number of milliseconds is <= 0?
   
   Are there performance implications of using `System.nanoTime()` instead of `System.currentTimeMillis()`?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r499071998



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -331,6 +331,11 @@ public static ByteBuffer wrapNullable(byte[] array) {
      */
     public static void sleep(long ms) {
         try {
+            if (ms <= 0) {
+                // No need to sleep
+                log.debug("Skipping sleep as asked to sleep for {} msec", ms);

Review comment:
       Won't this hide bugs?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
soondenana commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496263965



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -163,6 +163,18 @@ public void start() {
         log.info("Started KafkaBasedLog for topic " + topic);
     }
 
+    /**
+     * Sleep for some time so that topic used for this KafkaBasedLog gets created. Note that
+     * {@code System.currentTimeMillis()} is not monotonic, so check for that condition.

Review comment:
       Wanted to make least amount of change. I can update the code to use monotonic 'nanoTime` instead (nanoseconds in Time interface). We will also need to convert that to milli before passing to sleep (unless we want to add nano to those interfaces too, like Utils.sleep)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r497229208



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -133,11 +133,14 @@ public void start() {
         List<TopicPartition> partitions = new ArrayList<>();
 
         // We expect that the topics will have been created either manually by the user or automatically by the herder
-        List<PartitionInfo> partitionInfos = null;
-        long started = time.milliseconds();
-        while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
+        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+        long started = time.nanoseconds();
+        long maxSleepMs = 1_000;
+        long sleepMs = 10;
+        while (partitionInfos == null && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) {
+            time.sleep(sleepMs);
+            sleepMs = Math.min(2 * sleepMs, maxSleepMs);
             partitionInfos = consumer.partitionsFor(topic);

Review comment:
       @soondenana Thanks for responses. This patch LGTM. What I want to discuss is unrelated to this patch.
   
   > they do it if they get an invalid result back. If null was a valid result for them, they shouldn't retry either.
   
   If the topic is not exist (or not been propagated yet), ```partitionFor``` can return null. By contrast, ```beginningOffsets``` (and other methods) throws ```TimeoutException```. In order to make consistent behavior, ```beginningOffsets``` (and other methods) should let the topic (or partition) be absent in the returned ```Map```. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
soondenana commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496272243



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -163,6 +163,18 @@ public void start() {
         log.info("Started KafkaBasedLog for topic " + topic);
     }
 
+    /**
+     * Sleep for some time so that topic used for this KafkaBasedLog gets created. Note that
+     * {@code System.currentTimeMillis()} is not monotonic, so check for that condition.

Review comment:
       In fact we don't need value from clock to sleep, only need it to find elapsed time and timeout. I have decoupled these two, and also fixed a minor issue where it was sleeping first time even if topic was present.
   
   Please take a look.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch merged pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
rhauch merged pull request #9347:
URL: https://github.com/apache/kafka/pull/9347


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
soondenana commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r497034435



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -133,11 +133,14 @@ public void start() {
         List<TopicPartition> partitions = new ArrayList<>();
 
         // We expect that the topics will have been created either manually by the user or automatically by the herder
-        List<PartitionInfo> partitionInfos = null;
-        long started = time.milliseconds();
-        while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
+        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+        long started = time.nanoseconds();
+        long maxSleepMs = 1_000;
+        long sleepMs = 10;
+        while (partitionInfos == null && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) {
+            time.sleep(sleepMs);
+            sleepMs = Math.min(2 * sleepMs, maxSleepMs);
             partitionInfos = consumer.partitionsFor(topic);

Review comment:
       Not sure about this, no result for "partitionInfos" is a valid result. There is no point in automatic retrying. While for other apis that retry automatically, they do it if they get an invalid result back. If `null` was a valid result for them, they shouldn't retry either. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] soondenana commented on pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
soondenana commented on pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#issuecomment-702979275


   > should we also change Utils.sleep(...) (called by SystemTime.sleep(...) that is now used above) to check and return immediately for a negative number of milliseconds to sleep?
   
   Yes, makes sense. Updated code to do so.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
soondenana commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r499080376



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -331,6 +331,11 @@ public static ByteBuffer wrapNullable(byte[] array) {
      */
     public static void sleep(long ms) {
         try {
+            if (ms <= 0) {
+                // No need to sleep
+                log.debug("Skipping sleep as asked to sleep for {} msec", ms);

Review comment:
       Ok, I see the point. Overflow can be caused by time arithmetic too. Reverted the change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
soondenana commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r499076318



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -331,6 +331,11 @@ public static ByteBuffer wrapNullable(byte[] array) {
      */
     public static void sleep(long ms) {
         try {
+            if (ms <= 0) {
+                // No need to sleep
+                log.debug("Skipping sleep as asked to sleep for {} msec", ms);

Review comment:
       Yes. I have seen things done both ways i.e. behave reasonably in face of weird args vs throw exception. `Time.sleep` combines both approaches, throws for values `< 0` but skips if sleep `= 0` and goes to sleep for values `> 0`.
   
   Its a judgement call, we can delegate to JDK and remove this check.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496270860



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -133,11 +134,14 @@ public void start() {
         List<TopicPartition> partitions = new ArrayList<>();
 
         // We expect that the topics will have been created either manually by the user or automatically by the herder
-        List<PartitionInfo> partitionInfos = null;
-        long started = time.milliseconds();
-        while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
+        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+        long started = time.nanoseconds();
+        long maxSleepMs = 1_000;
+        long sleepMs = 10;
+        while (partitionInfos == null && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) {
+            Utils.sleep(sleepMs);

Review comment:
       `Time` has a `sleep` method too btw.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org