You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/21 20:15:32 UTC

kafka git commit: KAFKA-2667: Fix transient error in KafkaBasedLogTest.

Repository: kafka
Updated Branches:
  refs/heads/trunk 0785feeb0 -> 1d4a0b881


KAFKA-2667: Fix transient error in KafkaBasedLogTest.

The test required a specific sequence of events for each Consumer.poll() call,
but the MockConsumer.waitForPollThen() method could not guarantee that,
resulting in race conditions. Add support for scheduling sequences of events
even when running in multi-threaded environments.

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Guozhang Wang

Closes #333 from ewencp/kafka-2667-kafka-based-log-transient-error


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d4a0b88
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d4a0b88
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d4a0b88

Branch: refs/heads/trunk
Commit: 1d4a0b8811d82a6465015a71194a712679d63efe
Parents: 0785fee
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Wed Oct 21 11:20:29 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Oct 21 11:20:29 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/MockConsumer.java    |  58 ++++---
 .../kafka/copycat/util/KafkaBasedLogTest.java   | 164 ++++++++-----------
 2 files changed, 97 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1d4a0b88/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 3c0f261..0242d7b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -19,27 +19,25 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TimeoutException;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.regex.Pattern;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is <i> not
- * threadsafe </i>. However, you can use the {@link #waitForPollThen(Runnable,long)} method to write multithreaded tests
- * where a driver thread waits for {@link #poll(long)} to be called and then can safely perform operations during a
- * callback.
+ * threadsafe </i>. However, you can use the {@link #schedulePollTask(Runnable)} method to write multithreaded tests
+ * where a driver thread waits for {@link #poll(long)} to be called by a background thread and then can safely perform
+ * operations during a callback.
  */
 public class MockConsumer<K, V> implements Consumer<K, V> {
 
@@ -51,7 +49,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     private final Map<TopicPartition, Long> beginningOffsets;
     private final Map<TopicPartition, Long> endOffsets;
 
-    private AtomicReference<CountDownLatch> pollLatch;
+    private Queue<Runnable> pollTasks;
     private KafkaException exception;
 
     private AtomicBoolean wakeup;
@@ -64,7 +62,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         this.closed = false;
         this.beginningOffsets = new HashMap<>();
         this.endOffsets = new HashMap<>();
-        this.pollLatch = new AtomicReference<>();
+        this.pollTasks = new LinkedList<>();
         this.exception = null;
         this.wakeup = new AtomicBoolean(false);
     }
@@ -127,13 +125,12 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     public ConsumerRecords<K, V> poll(long timeout) {
         ensureNotClosed();
 
-        CountDownLatch pollLatchCopy = pollLatch.get();
-        if (pollLatchCopy != null) {
-            pollLatch.set(null);
-            pollLatchCopy.countDown();
-            synchronized (pollLatchCopy) {
-                // Will block until caller of waitUntilPollThen() finishes their callback.
-            }
+        // Synchronize around the entire execution so new tasks to be triggered on subsequent poll calls can be added in
+        // the callback
+        synchronized (pollTasks) {
+            Runnable task = pollTasks.poll();
+            if (task != null)
+                task.run();
         }
 
         if (wakeup.get()) {
@@ -319,23 +316,24 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         wakeup.set(true);
     }
 
-    public void waitForPoll(long timeoutMs) {
-        waitForPollThen(null, timeoutMs);
+    /**
+     * Schedule a task to be executed during a poll(). One enqueued task will be executed per {@link #poll(long)}
+     * invocation. You can use this repeatedly to mock out multiple responses to poll invocations.
+     * @param task the task to be executed
+     */
+    public void schedulePollTask(Runnable task) {
+        synchronized (pollTasks) {
+            pollTasks.add(task);
+        }
     }
 
-    public void waitForPollThen(Runnable task, long timeoutMs) {
-        CountDownLatch latch = new CountDownLatch(1);
-        synchronized (latch) {
-            pollLatch.set(latch);
-            try {
-                if (!latch.await(timeoutMs, TimeUnit.MILLISECONDS))
-                    throw new TimeoutException("Timed out waiting for consumer thread to call poll().");
-            } catch (InterruptedException e) {
-                throw new IllegalStateException("MockConsumer waiting thread was interrupted.", e);
+    public void scheduleNopPollTask() {
+        schedulePollTask(new Runnable() {
+            @Override
+            public void run() {
+                // noop
             }
-            if (task != null)
-                task.run();
-        }
+        });
     }
 
     public Set<TopicPartition> paused() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d4a0b88/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
index 96c4bcc..1ff5e73 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
@@ -165,32 +165,39 @@ public class KafkaBasedLogTest {
         endOffsets.put(TP0, 1L);
         endOffsets.put(TP1, 1L);
         consumer.updateEndOffsets(endOffsets);
-        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
+        final CountDownLatch finishedLatch = new CountDownLatch(1);
+        consumer.schedulePollTask(new Runnable() { // Use first poll task to setup sequence of remaining responses to polls
             @Override
             public void run() {
-                // Needs to seek to end to find end offsets
-                consumer.waitForPoll(10000);
-
-                // Should keep polling until it reaches current log end offset for all partitions
-                consumer.waitForPollThen(new Runnable() {
+                // Should keep polling until it reaches current log end offset for all partitions. Should handle
+                // as many empty polls as needed
+                consumer.scheduleNopPollTask();
+                consumer.scheduleNopPollTask();
+                consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
                         consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE));
                     }
-                }, 10000);
-
-                consumer.waitForPollThen(new Runnable() {
+                });
+                consumer.scheduleNopPollTask();
+                consumer.scheduleNopPollTask();
+                consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
                         consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY, TP1_VALUE));
                     }
-                }, 10000);
+                });
+                consumer.schedulePollTask(new Runnable() {
+                    @Override
+                    public void run() {
+                        finishedLatch.countDown();
+                    }
+                });
             }
-        };
-        startConsumerOpsThread.start();
+        });
         store.start();
-        startConsumerOpsThread.join(10000);
-        assertFalse(startConsumerOpsThread.isAlive());
+        assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS));
+
         assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
         assertEquals(2, consumedRecords.size());
         assertEquals(TP0_VALUE, consumedRecords.get(0).value());
@@ -227,24 +234,10 @@ public class KafkaBasedLogTest {
         endOffsets.put(TP0, 0L);
         endOffsets.put(TP1, 0L);
         consumer.updateEndOffsets(endOffsets);
-        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
-            @Override
-            public void run() {
-                // Should keep polling until it has partition info
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.seek(TP0, 0);
-                        consumer.seek(TP1, 0);
-                    }
-                }, 10000);
-            }
-        };
-        startConsumerOpsThread.start();
         store.start();
-        startConsumerOpsThread.join(10000);
-        assertFalse(startConsumerOpsThread.isAlive());
         assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+        assertEquals(0L, consumer.position(TP0));
+        assertEquals(0L, consumer.position(TP1));
 
         // Set some keys
         final AtomicInteger invoked = new AtomicInteger(0);
@@ -265,72 +258,58 @@ public class KafkaBasedLogTest {
         assertEquals(2, invoked.get());
 
         // Now we should have to wait for the records to be read back when we call readToEnd()
-        final CountDownLatch startOffsetUpdateLatch = new CountDownLatch(1);
-        Thread readNewDataThread = new Thread("read-new-data-thread") {
+        final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false);
+        final FutureCallback<Void> readEndFutureCallback = new FutureCallback<>(new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                assertEquals(4, consumedRecords.size());
+                assertEquals(TP0_VALUE_NEW, consumedRecords.get(2).value());
+                assertEquals(TP1_VALUE_NEW, consumedRecords.get(3).value());
+                getInvokedAndPassed.set(true);
+            }
+        });
+        consumer.schedulePollTask(new Runnable() {
             @Override
             public void run() {
-                // Needs to be woken up after calling readToEnd()
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            startOffsetUpdateLatch.await();
-                        } catch (InterruptedException e) {
-                            throw new RuntimeException("Interrupted");
-                        }
-                    }
-                }, 10000);
+                // Once we're synchronized in a poll, start the read to end and schedule the exact set of poll events
+                // that should follow. This readToEnd call will immediately wakeup this consumer.poll() call without
+                // returning any data.
+                store.readToEnd(readEndFutureCallback);
 
                 // Needs to seek to end to find end offsets
-                consumer.waitForPollThen(new Runnable() {
+                consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
-                        try {
-                            startOffsetUpdateLatch.await();
-                        } catch (InterruptedException e) {
-                            throw new RuntimeException("Interrupted");
-                        }
-
                         Map<TopicPartition, Long> newEndOffsets = new HashMap<>();
                         newEndOffsets.put(TP0, 2L);
                         newEndOffsets.put(TP1, 2L);
                         consumer.updateEndOffsets(newEndOffsets);
                     }
-                }, 10000);
+                });
 
                 // Should keep polling until it reaches current log end offset for all partitions
-                consumer.waitForPollThen(new Runnable() {
+                consumer.scheduleNopPollTask();
+                consumer.scheduleNopPollTask();
+                consumer.scheduleNopPollTask();
+                consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
                         consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE));
                         consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY, TP0_VALUE_NEW));
                         consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY, TP1_VALUE));
                     }
-                }, 10000);
+                });
 
-                consumer.waitForPollThen(new Runnable() {
+                consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
                         consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY, TP1_VALUE_NEW));
                     }
-                }, 10000);
-            }
-        };
-        readNewDataThread.start();
-        final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false);
-        FutureCallback<Void> readEndFutureCallback = new FutureCallback<>(new Callback<Void>() {
-            @Override
-            public void onCompletion(Throwable error, Void result) {
-                assertEquals(4, consumedRecords.size());
-                assertEquals(TP0_VALUE_NEW, consumedRecords.get(2).value());
-                assertEquals(TP1_VALUE_NEW, consumedRecords.get(3).value());
-                getInvokedAndPassed.set(true);
+                });
+
+                // Already have FutureCallback that should be invoked/awaited, so no need for follow up finishedLatch
             }
         });
-        store.readToEnd(readEndFutureCallback);
-        startOffsetUpdateLatch.countDown();
-        readNewDataThread.join(10000);
-        assertFalse(readNewDataThread.isAlive());
         readEndFutureCallback.get(10000, TimeUnit.MILLISECONDS);
         assertTrue(getInvokedAndPassed.get());
 
@@ -349,36 +328,45 @@ public class KafkaBasedLogTest {
 
         PowerMock.replayAll();
 
+        final CountDownLatch finishedLatch = new CountDownLatch(1);
         Map<TopicPartition, Long> endOffsets = new HashMap<>();
         endOffsets.put(TP0, 1L);
         endOffsets.put(TP1, 1L);
         consumer.updateEndOffsets(endOffsets);
-        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
+        consumer.schedulePollTask(new Runnable() {
             @Override
             public void run() {
                 // Trigger exception
-                consumer.waitForPollThen(new Runnable() {
+                consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
                         consumer.setException(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception());
                     }
-                }, 10000);
+                });
 
                 // Should keep polling until it reaches current log end offset for all partitions
-                consumer.waitForPollThen(new Runnable() {
+                consumer.scheduleNopPollTask();
+                consumer.scheduleNopPollTask();
+                consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
                         consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE_NEW));
                         consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP0_KEY, TP0_VALUE_NEW));
                     }
-                }, 10000);
+                });
+
+                consumer.schedulePollTask(new Runnable() {
+                    @Override
+                    public void run() {
+                        finishedLatch.countDown();
+                    }
+                });
             }
-        };
-        startConsumerOpsThread.start();
+        });
         store.start();
-        startConsumerOpsThread.join(10000);
-        assertFalse(startConsumerOpsThread.isAlive());
+        assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS));
         assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+        assertEquals(1L, consumer.position(TP0));
 
         store.stop();
 
@@ -403,24 +391,10 @@ public class KafkaBasedLogTest {
         endOffsets.put(TP0, 0L);
         endOffsets.put(TP1, 0L);
         consumer.updateEndOffsets(endOffsets);
-        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
-            @Override
-            public void run() {
-                // Should keep polling until it has partition info
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.seek(TP0, 0);
-                        consumer.seek(TP1, 0);
-                    }
-                }, 10000);
-            }
-        };
-        startConsumerOpsThread.start();
         store.start();
-        startConsumerOpsThread.join(10000);
-        assertFalse(startConsumerOpsThread.isAlive());
         assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+        assertEquals(0L, consumer.position(TP0));
+        assertEquals(0L, consumer.position(TP1));
 
         final AtomicReference<Throwable> setException = new AtomicReference<>();
         store.send(TP0_KEY, TP0_VALUE, new org.apache.kafka.clients.producer.Callback() {