You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/12/05 22:34:22 UTC
kafka git commit: HOTFIX: Fix bug in readToLogEnd in KafkaBasedLog.
Repository: kafka
Updated Branches:
refs/heads/trunk a55e29631 -> 34aa538bf
HOTFIX: Fix bug in readToLogEnd in KafkaBasedLog.
Author: Konstantine Karantasis <ko...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #2211 from kkonstantine/HOTFIX-Correctly-read-to-end-of-offsets-log-in-Connect-KafkaBasedLog
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/34aa538b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/34aa538b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/34aa538b
Branch: refs/heads/trunk
Commit: 34aa538bf3e0a38fe61de66a4536dfe48a6cdc57
Parents: a55e296
Author: Konstantine Karantasis <ko...@confluent.io>
Authored: Mon Dec 5 14:34:14 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Dec 5 14:34:14 2016 -0800
----------------------------------------------------------------------
.../kafka/connect/util/KafkaBasedLog.java | 9 ++---
.../kafka/connect/util/KafkaBasedLogTest.java | 38 ++++++++++++++++++++
2 files changed, 43 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/34aa538b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 96141a5..dd591cb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -265,15 +265,15 @@ public class KafkaBasedLog<K, V> {
log.trace("Reading to end of log offsets {}", endOffsets);
while (!endOffsets.isEmpty()) {
- poll(Integer.MAX_VALUE);
-
Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TopicPartition, Long> entry = it.next();
if (consumer.position(entry.getKey()) >= entry.getValue())
it.remove();
- else
+ else {
+ poll(Integer.MAX_VALUE);
break;
+ }
}
}
}
@@ -287,6 +287,7 @@ public class KafkaBasedLog<K, V> {
@Override
public void run() {
try {
+ log.trace("{} started execution", this);
while (true) {
int numCallbacks;
synchronized (KafkaBasedLog.this) {
@@ -323,7 +324,7 @@ public class KafkaBasedLog<K, V> {
}
}
} catch (Throwable t) {
- log.error("Unexpected exception in KafkaBasedLog's work thread", t);
+ log.error("Unexpected exception in {}", this, t);
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/34aa538b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index d97e56e..4c9d920 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
@@ -219,6 +220,43 @@ public class KafkaBasedLogTest {
}
@Test
+ public void testReloadOnStartWithNoNewRecordsPresent() throws Exception {
+ expectStart();
+ expectStop();
+
+ PowerMock.replayAll();
+
+ Map<TopicPartition, Long> endOffsets = new HashMap<>();
+ endOffsets.put(TP0, 7L);
+ endOffsets.put(TP1, 7L);
+ consumer.updateEndOffsets(endOffsets);
+ // Better test with an advanced offset other than just 0L
+ consumer.updateBeginningOffsets(endOffsets);
+
+ consumer.schedulePollTask(new Runnable() {
+ @Override
+ public void run() {
+ // Throw an exception that will not be ignored or handled by Connect framework. In
+ // reality a misplaced call to poll blocks indefinitely and connect aborts due to
+ // time outs (for instance via ConnectRestException)
+ throw new WakeupException();
+ }
+ });
+
+ store.start();
+
+ assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+ assertEquals(7L, consumer.position(TP0));
+ assertEquals(7L, consumer.position(TP1));
+
+ store.stop();
+
+ assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+ assertTrue(consumer.closed());
+ PowerMock.verifyAll();
+ }
+
+ @Test
public void testSendAndReadToEnd() throws Exception {
expectStart();
TestFuture<RecordMetadata> tp0Future = new TestFuture<>();