You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/06 21:20:21 UTC

kafka git commit: KAFKA-3155; Avoid long overflow in RecordBatch#maybeExpire

Repository: kafka
Updated Branches:
  refs/heads/trunk 2064a2a55 -> 5cf491c27


KAFKA-3155; Avoid long overflow in RecordBatch#maybeExpire

`Long.MaxValue` for the linger overflows in `RecordBatch#maybeExpire` when added to
the current timestamp.

Then causes an error to be set for the batch by `Sender` (not happening every time since
it depends on the timing of `Sender`):

That error then causes a call to `ProduceRequestResult#done` on the batch, which then
makes the check for "not done" fail.

Author: Armin Braun <me...@obrown.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #2639 from original-brownbear/KAFKA-3155


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5cf491c2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5cf491c2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5cf491c2

Branch: refs/heads/trunk
Commit: 5cf491c2765b4912ff5f7f69f0b992a67005b983
Parents: 2064a2a
Author: Armin Braun <me...@obrown.io>
Authored: Mon Mar 6 20:10:45 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon Mar 6 20:29:37 2017 +0000

----------------------------------------------------------------------
 .../producer/internals/RecordAccumulator.java   | 33 ++++-----
 .../clients/producer/internals/RecordBatch.java | 57 ++++++++++------
 .../clients/producer/internals/Sender.java      |  6 +-
 .../producer/internals/RecordBatchTest.java     | 72 ++++++++++++++++++++
 4 files changed, 128 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf491c2/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 7ef28e5..3306820 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -81,7 +81,7 @@ public final class RecordAccumulator {
 
     /**
      * Create a new record accumulator
-     * 
+     *
      * @param batchSize The size to use when allocating {@link MemoryRecords} instances
      * @param totalSize The maximum memory the record accumulator can use.
      * @param compression The compression codec for the records
@@ -205,7 +205,7 @@ public final class RecordAccumulator {
 
                 // Don't deallocate this buffer in the finally block as it's being used in the record batch
                 buffer = null;
-                
+
                 return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
             }
         } finally {
@@ -284,10 +284,7 @@ public final class RecordAccumulator {
      * Re-enqueue the given record batch in the accumulator to retry
      */
     public void reenqueue(RecordBatch batch, long now) {
-        batch.attempts++;
-        batch.lastAttemptMs = now;
-        batch.lastAppendTime = now;
-        batch.setRetry();
+        batch.reenqueued(now);
         Deque<RecordBatch> deque = getOrCreateDeque(batch.topicPartition);
         synchronized (deque) {
             deque.addFirst(batch);
@@ -334,16 +331,16 @@ public final class RecordAccumulator {
                 } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
                     RecordBatch batch = deque.peekFirst();
                     if (batch != null) {
-                        boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
-                        long waitedTimeMs = nowMs - batch.lastAttemptMs;
+                        long waitedTimeMs = batch.waitedTimeMs(nowMs);
+                        boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                         long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
-                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                         boolean full = deque.size() > 1 || batch.isFull();
                         boolean expired = waitedTimeMs >= timeToWaitMs;
                         boolean sendable = full || expired || exhausted || closed || flushInProgress();
                         if (sendable && !backingOff) {
                             readyNodes.add(leader);
                         } else {
+                            long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                             // Note that this results in a conservative estimate since an un-sendable partition may have
                             // a leader that will later be found to have sendable data. However, this is good enough
                             // since we'll just wake up and then sleep again for the remaining time.
@@ -374,7 +371,7 @@ public final class RecordAccumulator {
     /**
      * Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified
      * size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over.
-     * 
+     *
      * @param cluster The current cluster metadata
      * @param nodes The list of node to drain
      * @param maxSize The maximum number of bytes to drain
@@ -405,7 +402,7 @@ public final class RecordAccumulator {
                         synchronized (deque) {
                             RecordBatch first = deque.peekFirst();
                             if (first != null) {
-                                boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
+                                boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
                                 // Only drain the batch if it is not during backoff period.
                                 if (!backoff) {
                                     if (size + first.sizeInBytes() > maxSize && !ready.isEmpty()) {
@@ -418,7 +415,7 @@ public final class RecordAccumulator {
                                         batch.close();
                                         size += batch.sizeInBytes();
                                         ready.add(batch);
-                                        batch.drainedMs = now;
+                                        batch.drained(now);
                                     }
                                 }
                             }
@@ -458,7 +455,7 @@ public final class RecordAccumulator {
         incomplete.remove(batch);
         free.deallocate(batch.buffer(), batch.initialCapacity());
     }
-    
+
     /**
      * Are there any threads currently waiting on a flush?
      *
@@ -472,7 +469,7 @@ public final class RecordAccumulator {
     Map<TopicPartition, Deque<RecordBatch>> batches() {
         return Collections.unmodifiableMap(batches);
     }
-    
+
     /**
      * Initiate the flushing of data from the accumulator...this makes all requests immediately ready
      */
@@ -578,7 +575,7 @@ public final class RecordAccumulator {
             this.unknownLeaderTopics = unknownLeaderTopics;
         }
     }
-    
+
     /*
      * A threadsafe helper class to hold RecordBatches that haven't been ack'd yet
      */
@@ -588,13 +585,13 @@ public final class RecordAccumulator {
         public IncompleteRecordBatches() {
             this.incomplete = new HashSet<RecordBatch>();
         }
-        
+
         public void add(RecordBatch batch) {
             synchronized (incomplete) {
                 this.incomplete.add(batch);
             }
         }
-        
+
         public void remove(RecordBatch batch) {
             synchronized (incomplete) {
                 boolean removed = this.incomplete.remove(batch);
@@ -602,7 +599,7 @@ public final class RecordAccumulator {
                     throw new IllegalStateException("Remove from the incomplete set failed. This should be impossible.");
             }
         }
-        
+
         public Iterable<RecordBatch> all() {
             synchronized (incomplete) {
                 return new ArrayList<>(this.incomplete);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf491c2/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index af6262e..8dacaf5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * A batch of records that is or will be sent.
- * 
+ *
  * This class is not thread safe and external synchronization must be used when modifying it
  */
 public final class RecordBatch {
@@ -47,12 +47,12 @@ public final class RecordBatch {
     private final List<Thunk> thunks = new ArrayList<>();
     private final MemoryRecordsBuilder recordsBuilder;
 
-    volatile int attempts;
+    private volatile int attempts;
     int recordCount;
     int maxRecordSize;
-    long drainedMs;
-    long lastAttemptMs;
-    long lastAppendTime;
+    private long lastAttemptMs;
+    private long lastAppendTime;
+    private long drainedMs;
     private String expiryErrorMessage;
     private AtomicBoolean completed;
     private boolean retry;
@@ -69,7 +69,7 @@ public final class RecordBatch {
 
     /**
      * Append the record to the current record set and return the relative offset within that record set
-     * 
+     *
      * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
      */
     public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
@@ -92,7 +92,7 @@ public final class RecordBatch {
 
     /**
      * Complete the request.
-     * 
+     *
      * @param baseOffset The base offset of the messages assigned by the server
      * @param logAppendTime The log append time or -1 if CreateTime is being used
      * @param exception The exception that occurred (or null if the request was successful)
@@ -152,13 +152,12 @@ public final class RecordBatch {
      * {@link #expirationDone()} must be invoked to complete the produce future and invoke callbacks.
      */
     public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
-
         if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
             expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
-        else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs + lingerMs)))
-            expiryErrorMessage = (now - (this.createdMs + lingerMs)) + " ms has passed since batch creation plus linger time";
-        else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs + retryBackoffMs)))
-            expiryErrorMessage = (now - (this.lastAttemptMs + retryBackoffMs)) + " ms has passed since last attempt plus backoff time";
+        else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) - lingerMs))
+            expiryErrorMessage = (createdTimeMs(now) - lingerMs) + " ms has passed since batch creation plus linger time";
+        else if (this.inRetry() && requestTimeoutMs < (waitedTimeMs(now) - retryBackoffMs))
+            expiryErrorMessage = (waitedTimeMs(now) - retryBackoffMs) + " ms has passed since last attempt plus backoff time";
 
         boolean expired = expiryErrorMessage != null;
         if (expired)
@@ -178,6 +177,33 @@ public final class RecordBatch {
                   new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + ": " + expiryErrorMessage));
     }
 
+    int attempts() {
+        return attempts;
+    }
+
+    void reenqueued(long now) {
+        attempts++;
+        lastAttemptMs = Math.max(lastAppendTime, now);
+        lastAppendTime = Math.max(lastAppendTime, now);
+        retry = true;
+    }
+
+    long queueTimeMs() {
+        return drainedMs - createdMs;
+    }
+
+    long createdTimeMs(long nowMs) {
+        return Math.max(0, nowMs - createdMs);
+    }
+
+    long waitedTimeMs(long nowMs) {
+        return Math.max(0, nowMs - lastAttemptMs);
+    }
+
+    void drained(long nowMs) {
+        this.drainedMs = Math.max(drainedMs, nowMs);
+    }
+
     /**
      * Returns if the batch is been retried for sending to kafka
      */
@@ -185,13 +211,6 @@ public final class RecordBatch {
         return this.retry;
     }
 
-    /**
-     * Set retry to true if the batch is being retried (for send)
-     */
-    public void setRetry() {
-        this.retry = true;
-    }
-
     public MemoryRecords records() {
         return recordsBuilder.build();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf491c2/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index b3553af..7f27d36 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -300,7 +300,7 @@ public class Sender implements Runnable {
             log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
                      correlationId,
                      batch.topicPartition,
-                     this.retries - batch.attempts - 1,
+                     this.retries - batch.attempts() - 1,
                      error);
             this.accumulator.reenqueue(batch, now);
             this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
@@ -332,7 +332,7 @@ public class Sender implements Runnable {
      * We can retry a send if the error is transient and the number of attempts taken is fewer than the maximum allowed
      */
     private boolean canRetry(RecordBatch batch, Errors error) {
-        return batch.attempts < this.retries && error.exception() instanceof RetriableException;
+        return batch.attempts() < this.retries && error.exception() instanceof RetriableException;
     }
 
     /**
@@ -519,7 +519,7 @@ public class Sender implements Runnable {
 
                     // global metrics
                     this.batchSizeSensor.record(batch.sizeInBytes(), now);
-                    this.queueTimeSensor.record(batch.drainedMs - batch.createdMs, now);
+                    this.queueTimeSensor.record(batch.queueTimeMs(), now);
                     this.compressionRateSensor.record(batch.compressionRate());
                     this.maxRecordSizeSensor.record(batch.maxRecordSize, now);
                     records += batch.recordCount;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf491c2/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordBatchTest.java
new file mode 100644
index 0000000..6404451
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordBatchTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import java.nio.ByteBuffer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+
+public class RecordBatchTest {
+
+    private final long now = 1488748346917L;
+
+    private final MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(ByteBuffer.allocate(0),
+            Record.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME, 0L, Record.NO_TIMESTAMP, 0);
+
+    /**
+     * A RecordBatch configured using a very large linger value and a timestamp preceding its create
+     * time is interpreted correctly as not expired when the linger time is larger than the difference
+     * between now and create time by RecordBatch#maybeExpire.
+     */
+    @Test
+    public void testLargeLingerOldNowExpire() {
+        RecordBatch batch = new RecordBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
+        // Set `now` to 2ms before the create time.
+        assertFalse(batch.maybeExpire(10240, 100L, now - 2L, Long.MAX_VALUE, false));
+    }
+
+    /**
+     * A RecordBatch configured using a very large retryBackoff value with retry = true and a timestamp preceding its
+     * create time is interpreted correctly as not expired when the retryBackoff time is larger than the difference
+     * between now and create time by RecordBatch#maybeExpire.
+     */
+    @Test
+    public void testLargeRetryBackoffOldNowExpire() {
+        RecordBatch batch = new RecordBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
+        // Set batch.retry = true
+        batch.reenqueued(now);
+        // Set `now` to 2ms before the create time.
+        assertFalse(batch.maybeExpire(10240, Long.MAX_VALUE, now - 2L, 10240L, false));
+    }
+
+    /**
+     * A RecordBatch#maybeExpire call with a now value before the create time of the RecordBatch is correctly recognized
+     * as not expired when invoked with parameter isFull = true.
+     */
+    @Test
+    public void testLargeFullOldNowExpire() {
+        RecordBatch batch = new RecordBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
+        // Set `now` to 2ms before the create time.
+        assertFalse(batch.maybeExpire(10240, 10240L, now - 2L, 10240L, true));
+    }
+}