You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/12/05 22:34:22 UTC

kafka git commit: HOTFIX: Fix bug in readToLogEnd in KafkaBasedLog.

Repository: kafka
Updated Branches:
  refs/heads/trunk a55e29631 -> 34aa538bf


HOTFIX: Fix bug in readToLogEnd in KafkaBasedLog.

Author: Konstantine Karantasis <ko...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #2211 from kkonstantine/HOTFIX-Correctly-read-to-end-of-offsets-log-in-Connect-KafkaBasedLog


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/34aa538b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/34aa538b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/34aa538b

Branch: refs/heads/trunk
Commit: 34aa538bf3e0a38fe61de66a4536dfe48a6cdc57
Parents: a55e296
Author: Konstantine Karantasis <ko...@confluent.io>
Authored: Mon Dec 5 14:34:14 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Dec 5 14:34:14 2016 -0800

----------------------------------------------------------------------
 .../kafka/connect/util/KafkaBasedLog.java       |  9 ++---
 .../kafka/connect/util/KafkaBasedLogTest.java   | 38 ++++++++++++++++++++
 2 files changed, 43 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/34aa538b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 96141a5..dd591cb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -265,15 +265,15 @@ public class KafkaBasedLog<K, V> {
         log.trace("Reading to end of log offsets {}", endOffsets);
 
         while (!endOffsets.isEmpty()) {
-            poll(Integer.MAX_VALUE);
-
             Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
             while (it.hasNext()) {
                 Map.Entry<TopicPartition, Long> entry = it.next();
                 if (consumer.position(entry.getKey()) >= entry.getValue())
                     it.remove();
-                else
+                else {
+                    poll(Integer.MAX_VALUE);
                     break;
+                }
             }
         }
     }
@@ -287,6 +287,7 @@ public class KafkaBasedLog<K, V> {
         @Override
         public void run() {
             try {
+                log.trace("{} started execution", this);
                 while (true) {
                     int numCallbacks;
                     synchronized (KafkaBasedLog.this) {
@@ -323,7 +324,7 @@ public class KafkaBasedLog<K, V> {
                     }
                 }
             } catch (Throwable t) {
-                log.error("Unexpected exception in KafkaBasedLog's work thread", t);
+                log.error("Unexpected exception in {}", this, t);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/34aa538b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index d97e56e..4c9d920 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.Time;
@@ -219,6 +220,43 @@ public class KafkaBasedLogTest {
     }
 
     @Test
+    public void testReloadOnStartWithNoNewRecordsPresent() throws Exception {
+        expectStart();
+        expectStop();
+
+        PowerMock.replayAll();
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 7L);
+        endOffsets.put(TP1, 7L);
+        consumer.updateEndOffsets(endOffsets);
+        // Better test with an advanced offset other than just 0L
+        consumer.updateBeginningOffsets(endOffsets);
+
+        consumer.schedulePollTask(new Runnable() {
+            @Override
+            public void run() {
+                // Throw an exception that will not be ignored or handled by Connect framework. In
+                // reality a misplaced call to poll blocks indefinitely and connect aborts due to
+                // time outs (for instance via ConnectRestException)
+                throw new WakeupException();
+            }
+        });
+
+        store.start();
+
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+        assertEquals(7L, consumer.position(TP0));
+        assertEquals(7L, consumer.position(TP1));
+
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testSendAndReadToEnd() throws Exception {
         expectStart();
         TestFuture<RecordMetadata> tp0Future = new TestFuture<>();