You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/06 21:20:21 UTC
kafka git commit: KAFKA-3155;
Avoid long overflow in RecordBatch#maybeExpire
Repository: kafka
Updated Branches:
refs/heads/trunk 2064a2a55 -> 5cf491c27
KAFKA-3155; Avoid long overflow in RecordBatch#maybeExpire
`Long.MaxValue` for the linger overflows in `RecordBatch#maybeExpire` when added to
the current timestamp.
Then causes an error to be set for the batch by `Sender` (not happening every time since
it depends on the timing of `Sender`):
That error then causes a call to `ProduceRequestResult#done` on the batch, which then
makes the check for "not done" fail.
Author: Armin Braun <me...@obrown.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #2639 from original-brownbear/KAFKA-3155
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5cf491c2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5cf491c2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5cf491c2
Branch: refs/heads/trunk
Commit: 5cf491c2765b4912ff5f7f69f0b992a67005b983
Parents: 2064a2a
Author: Armin Braun <me...@obrown.io>
Authored: Mon Mar 6 20:10:45 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon Mar 6 20:29:37 2017 +0000
----------------------------------------------------------------------
.../producer/internals/RecordAccumulator.java | 33 ++++-----
.../clients/producer/internals/RecordBatch.java | 57 ++++++++++------
.../clients/producer/internals/Sender.java | 6 +-
.../producer/internals/RecordBatchTest.java | 72 ++++++++++++++++++++
4 files changed, 128 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf491c2/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 7ef28e5..3306820 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -81,7 +81,7 @@ public final class RecordAccumulator {
/**
* Create a new record accumulator
- *
+ *
* @param batchSize The size to use when allocating {@link MemoryRecords} instances
* @param totalSize The maximum memory the record accumulator can use.
* @param compression The compression codec for the records
@@ -205,7 +205,7 @@ public final class RecordAccumulator {
// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
-
+
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
} finally {
@@ -284,10 +284,7 @@ public final class RecordAccumulator {
* Re-enqueue the given record batch in the accumulator to retry
*/
public void reenqueue(RecordBatch batch, long now) {
- batch.attempts++;
- batch.lastAttemptMs = now;
- batch.lastAppendTime = now;
- batch.setRetry();
+ batch.reenqueued(now);
Deque<RecordBatch> deque = getOrCreateDeque(batch.topicPartition);
synchronized (deque) {
deque.addFirst(batch);
@@ -334,16 +331,16 @@ public final class RecordAccumulator {
} else if (!readyNodes.contains(leader) && !muted.contains(part)) {
RecordBatch batch = deque.peekFirst();
if (batch != null) {
- boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
- long waitedTimeMs = nowMs - batch.lastAttemptMs;
+ long waitedTimeMs = batch.waitedTimeMs(nowMs);
+ boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
- long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
boolean full = deque.size() > 1 || batch.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
+ long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
@@ -374,7 +371,7 @@ public final class RecordAccumulator {
/**
* Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified
* size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over.
- *
+ *
* @param cluster The current cluster metadata
* @param nodes The list of node to drain
* @param maxSize The maximum number of bytes to drain
@@ -405,7 +402,7 @@ public final class RecordAccumulator {
synchronized (deque) {
RecordBatch first = deque.peekFirst();
if (first != null) {
- boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
+ boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
// Only drain the batch if it is not during backoff period.
if (!backoff) {
if (size + first.sizeInBytes() > maxSize && !ready.isEmpty()) {
@@ -418,7 +415,7 @@ public final class RecordAccumulator {
batch.close();
size += batch.sizeInBytes();
ready.add(batch);
- batch.drainedMs = now;
+ batch.drained(now);
}
}
}
@@ -458,7 +455,7 @@ public final class RecordAccumulator {
incomplete.remove(batch);
free.deallocate(batch.buffer(), batch.initialCapacity());
}
-
+
/**
* Are there any threads currently waiting on a flush?
*
@@ -472,7 +469,7 @@ public final class RecordAccumulator {
Map<TopicPartition, Deque<RecordBatch>> batches() {
return Collections.unmodifiableMap(batches);
}
-
+
/**
* Initiate the flushing of data from the accumulator...this makes all requests immediately ready
*/
@@ -578,7 +575,7 @@ public final class RecordAccumulator {
this.unknownLeaderTopics = unknownLeaderTopics;
}
}
-
+
/*
* A threadsafe helper class to hold RecordBatches that haven't been ack'd yet
*/
@@ -588,13 +585,13 @@ public final class RecordAccumulator {
public IncompleteRecordBatches() {
this.incomplete = new HashSet<RecordBatch>();
}
-
+
public void add(RecordBatch batch) {
synchronized (incomplete) {
this.incomplete.add(batch);
}
}
-
+
public void remove(RecordBatch batch) {
synchronized (incomplete) {
boolean removed = this.incomplete.remove(batch);
@@ -602,7 +599,7 @@ public final class RecordAccumulator {
throw new IllegalStateException("Remove from the incomplete set failed. This should be impossible.");
}
}
-
+
public Iterable<RecordBatch> all() {
synchronized (incomplete) {
return new ArrayList<>(this.incomplete);
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf491c2/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index af6262e..8dacaf5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
* A batch of records that is or will be sent.
- *
+ *
* This class is not thread safe and external synchronization must be used when modifying it
*/
public final class RecordBatch {
@@ -47,12 +47,12 @@ public final class RecordBatch {
private final List<Thunk> thunks = new ArrayList<>();
private final MemoryRecordsBuilder recordsBuilder;
- volatile int attempts;
+ private volatile int attempts;
int recordCount;
int maxRecordSize;
- long drainedMs;
- long lastAttemptMs;
- long lastAppendTime;
+ private long lastAttemptMs;
+ private long lastAppendTime;
+ private long drainedMs;
private String expiryErrorMessage;
private AtomicBoolean completed;
private boolean retry;
@@ -69,7 +69,7 @@ public final class RecordBatch {
/**
* Append the record to the current record set and return the relative offset within that record set
- *
+ *
* @return The RecordSend corresponding to this record or null if there isn't sufficient room.
*/
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
@@ -92,7 +92,7 @@ public final class RecordBatch {
/**
* Complete the request.
- *
+ *
* @param baseOffset The base offset of the messages assigned by the server
* @param logAppendTime The log append time or -1 if CreateTime is being used
* @param exception The exception that occurred (or null if the request was successful)
@@ -152,13 +152,12 @@ public final class RecordBatch {
* {@link #expirationDone()} must be invoked to complete the produce future and invoke callbacks.
*/
public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
-
if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
- else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs + lingerMs)))
- expiryErrorMessage = (now - (this.createdMs + lingerMs)) + " ms has passed since batch creation plus linger time";
- else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs + retryBackoffMs)))
- expiryErrorMessage = (now - (this.lastAttemptMs + retryBackoffMs)) + " ms has passed since last attempt plus backoff time";
+ else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) - lingerMs))
+ expiryErrorMessage = (createdTimeMs(now) - lingerMs) + " ms has passed since batch creation plus linger time";
+ else if (this.inRetry() && requestTimeoutMs < (waitedTimeMs(now) - retryBackoffMs))
+ expiryErrorMessage = (waitedTimeMs(now) - retryBackoffMs) + " ms has passed since last attempt plus backoff time";
boolean expired = expiryErrorMessage != null;
if (expired)
@@ -178,6 +177,33 @@ public final class RecordBatch {
new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + ": " + expiryErrorMessage));
}
+ int attempts() {
+ return attempts;
+ }
+
+ void reenqueued(long now) {
+ attempts++;
+ lastAttemptMs = Math.max(lastAppendTime, now);
+ lastAppendTime = Math.max(lastAppendTime, now);
+ retry = true;
+ }
+
+ long queueTimeMs() {
+ return drainedMs - createdMs;
+ }
+
+ long createdTimeMs(long nowMs) {
+ return Math.max(0, nowMs - createdMs);
+ }
+
+ long waitedTimeMs(long nowMs) {
+ return Math.max(0, nowMs - lastAttemptMs);
+ }
+
+ void drained(long nowMs) {
+ this.drainedMs = Math.max(drainedMs, nowMs);
+ }
+
/**
* Returns if the batch is been retried for sending to kafka
*/
@@ -185,13 +211,6 @@ public final class RecordBatch {
return this.retry;
}
- /**
- * Set retry to true if the batch is being retried (for send)
- */
- public void setRetry() {
- this.retry = true;
- }
-
public MemoryRecords records() {
return recordsBuilder.build();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf491c2/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index b3553af..7f27d36 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -300,7 +300,7 @@ public class Sender implements Runnable {
log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
correlationId,
batch.topicPartition,
- this.retries - batch.attempts - 1,
+ this.retries - batch.attempts() - 1,
error);
this.accumulator.reenqueue(batch, now);
this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
@@ -332,7 +332,7 @@ public class Sender implements Runnable {
* We can retry a send if the error is transient and the number of attempts taken is fewer than the maximum allowed
*/
private boolean canRetry(RecordBatch batch, Errors error) {
- return batch.attempts < this.retries && error.exception() instanceof RetriableException;
+ return batch.attempts() < this.retries && error.exception() instanceof RetriableException;
}
/**
@@ -519,7 +519,7 @@ public class Sender implements Runnable {
// global metrics
this.batchSizeSensor.record(batch.sizeInBytes(), now);
- this.queueTimeSensor.record(batch.drainedMs - batch.createdMs, now);
+ this.queueTimeSensor.record(batch.queueTimeMs(), now);
this.compressionRateSensor.record(batch.compressionRate());
this.maxRecordSizeSensor.record(batch.maxRecordSize, now);
records += batch.recordCount;
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf491c2/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordBatchTest.java
new file mode 100644
index 0000000..6404451
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordBatchTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import java.nio.ByteBuffer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+
+public class RecordBatchTest {
+
+ private final long now = 1488748346917L;
+
+ private final MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(ByteBuffer.allocate(0),
+ Record.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME, 0L, Record.NO_TIMESTAMP, 0);
+
+ /**
+ * A RecordBatch configured using a very large linger value and a timestamp preceding its create
+ * time is interpreted correctly as not expired when the linger time is larger than the difference
+ * between now and create time by RecordBatch#maybeExpire.
+ */
+ @Test
+ public void testLargeLingerOldNowExpire() {
+ RecordBatch batch = new RecordBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
+ // Set `now` to 2ms before the create time.
+ assertFalse(batch.maybeExpire(10240, 100L, now - 2L, Long.MAX_VALUE, false));
+ }
+
+ /**
+ * A RecordBatch configured using a very large retryBackoff value with retry = true and a timestamp preceding its
+ * create time is interpreted correctly as not expired when the retryBackoff time is larger than the difference
+ * between now and create time by RecordBatch#maybeExpire.
+ */
+ @Test
+ public void testLargeRetryBackoffOldNowExpire() {
+ RecordBatch batch = new RecordBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
+ // Set batch.retry = true
+ batch.reenqueued(now);
+ // Set `now` to 2ms before the create time.
+ assertFalse(batch.maybeExpire(10240, Long.MAX_VALUE, now - 2L, 10240L, false));
+ }
+
+ /**
+ * A RecordBatch#maybeExpire call with a now value before the create time of the RecordBatch is correctly recognized
+ * as not expired when invoked with parameter isFull = true.
+ */
+ @Test
+ public void testLargeFullOldNowExpire() {
+ RecordBatch batch = new RecordBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
+ // Set `now` to 2ms before the create time.
+ assertFalse(batch.maybeExpire(10240, 10240L, now - 2L, 10240L, true));
+ }
+}