You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/03/22 04:47:49 UTC

kafka git commit: KAFKA-3412: multiple asynchronous commits causes send failures

Repository: kafka
Updated Branches:
  refs/heads/trunk 4f0417931 -> 8d8e3aaa6


KAFKA-3412: multiple asynchronous commits causes send failures

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1108 from hachikuji/KAFKA-3412


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8d8e3aaa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8d8e3aaa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8d8e3aaa

Branch: refs/heads/trunk
Commit: 8d8e3aaa6172d314230a8d61e6892e9c09dc45b6
Parents: 4f04179
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Mar 21 20:47:25 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Mar 21 20:47:25 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  2 +-
 .../consumer/internals/ConsumerCoordinator.java |  4 ++++
 .../internals/ConsumerNetworkClient.java        |  5 ++--
 .../internals/ConsumerCoordinatorTest.java      |  8 -------
 .../kafka/api/BaseConsumerTest.scala            | 24 +++++++++++++-------
 .../kafka/api/PlaintextConsumerTest.scala       | 15 ++++++++++++
 6 files changed, 39 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8d8e3aaa/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index b7eafbe..c36b7f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -870,7 +870,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     // must return these records to users to process before being interrupted or
                     // auto-committing offsets
                     fetcher.sendFetches(metadata.fetch());
-                    client.quickPoll();
+                    client.quickPoll(false);
                     return this.interceptors == null
                         ? new ConsumerRecords<>(records) : this.interceptors.onConsume(new ConsumerRecords<>(records));
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d8e3aaa/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index cf93530..e582ce3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -345,6 +345,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 cb.onComplete(offsets, e);
             }
         });
+
+        // ensure commit has a chance to be transmitted (without blocking on its completion)
+        // note that we allow delayed tasks to be executed in case heartbeats need to be sent
+        client.quickPoll(true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d8e3aaa/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 4492306..b70994d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -196,10 +196,11 @@ public class ConsumerNetworkClient implements Closeable {
     /**
      * Poll for network IO and return immediately. This will not trigger wakeups,
      * nor will it execute any delayed tasks.
+     * @param executeDelayedTasks Whether to allow delayed task execution (true allows)
      */
-    public void quickPoll() {
+    public void quickPoll(boolean executeDelayedTasks) {
         disableWakeups();
-        poll(0, time.milliseconds(), false);
+        poll(0, time.milliseconds(), executeDelayedTasks);
         enableWakeups();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d8e3aaa/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 260ee7a..8844adc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -627,7 +627,6 @@ public class ConsumerCoordinatorTest {
 
         AtomicBoolean success = new AtomicBoolean(false);
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success));
-        consumerClient.poll(0);
         assertTrue(success.get());
 
         assertEquals(100L, subscriptions.committed(tp).offset());
@@ -644,7 +643,6 @@ public class ConsumerCoordinatorTest {
 
         AtomicBoolean success = new AtomicBoolean(false);
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "hello")), callback(success));
-        consumerClient.poll(0);
         assertTrue(success.get());
 
         assertEquals(100L, subscriptions.committed(tp).offset());
@@ -658,7 +656,6 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorKnown();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
-        consumerClient.poll(0);
         assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
         assertNull(defaultOffsetCommitCallback.exception);
     }
@@ -693,7 +690,6 @@ public class ConsumerCoordinatorTest {
 
         AtomicBoolean success = new AtomicBoolean(false);
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success));
-        consumerClient.poll(0);
         assertTrue(success.get());
     }
 
@@ -704,7 +700,6 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorKnown();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
-        consumerClient.poll(0);
         assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
         assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), defaultOffsetCommitCallback.exception);
     }
@@ -718,7 +713,6 @@ public class ConsumerCoordinatorTest {
         MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
-        consumerClient.poll(0);
 
         assertTrue(coordinator.coordinatorUnknown());
         assertEquals(1, cb.invoked);
@@ -734,7 +728,6 @@ public class ConsumerCoordinatorTest {
         MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code())));
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
-        consumerClient.poll(0);
 
         assertTrue(coordinator.coordinatorUnknown());
         assertEquals(1, cb.invoked);
@@ -750,7 +743,6 @@ public class ConsumerCoordinatorTest {
         MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
-        consumerClient.poll(0);
 
         assertTrue(coordinator.coordinatorUnknown());
         assertEquals(1, cb.invoked);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d8e3aaa/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 9939309..1408cd9 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -81,7 +81,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
 
     // shouldn't make progress until poll is invoked
     Thread.sleep(10)
-    assertEquals(0, commitCallback.count)
+    assertEquals(0, commitCallback.successCount)
     awaitCommitCallback(this.consumers(0), commitCallback)
   }
 
@@ -330,18 +330,26 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     records
   }
 
-  protected def awaitCommitCallback[K, V](consumer: Consumer[K, V], commitCallback: CountConsumerCommitCallback): Unit = {
-    val startCount = commitCallback.count
+  protected def awaitCommitCallback[K, V](consumer: Consumer[K, V],
+                                          commitCallback: CountConsumerCommitCallback,
+                                          count: Int = 1): Unit = {
+    val startCount = commitCallback.successCount
     val started = System.currentTimeMillis()
-    while (commitCallback.count == startCount && System.currentTimeMillis() - started < 10000)
+    while (commitCallback.successCount < startCount + count && System.currentTimeMillis() - started < 10000)
       consumer.poll(50)
-    assertEquals(startCount + 1, commitCallback.count)
+    assertEquals(startCount + count, commitCallback.successCount)
   }
 
   protected class CountConsumerCommitCallback extends OffsetCommitCallback {
-    var count = 0
-
-    override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = count += 1
+    var successCount = 0
+    var failCount = 0
+
+    override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
+      if (exception == null)
+        successCount += 1
+      else
+        failCount += 1
+    }
   }
 
   protected class ConsumerAssignmentPoller(consumer: Consumer[Array[Byte], Array[Byte]],

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d8e3aaa/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 9c56010..ff2e63d 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -233,6 +233,21 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
+  def testAsyncCommit() {
+    val consumer = this.consumers(0)
+    consumer.assign(List(tp).asJava)
+    consumer.poll(0)
+
+    val callback = new CountConsumerCommitCallback
+    val count = 5
+    for (i <- 1 to count)
+      consumer.commitAsync(Map(tp -> new OffsetAndMetadata(i)).asJava, callback)
+
+    awaitCommitCallback(consumer, callback, count=count)
+    assertEquals(new OffsetAndMetadata(count), consumer.committed(tp))
+  }
+
+  @Test
   def testExpandingTopicSubscriptions() {
     val otherTopic = "other"
     val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))