You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/03/22 04:47:49 UTC
kafka git commit: KAFKA-3412: multiple asynchronous commits causes
send failures
Repository: kafka
Updated Branches:
refs/heads/trunk 4f0417931 -> 8d8e3aaa6
KAFKA-3412: multiple asynchronous commits causes send failures
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1108 from hachikuji/KAFKA-3412
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8d8e3aaa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8d8e3aaa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8d8e3aaa
Branch: refs/heads/trunk
Commit: 8d8e3aaa6172d314230a8d61e6892e9c09dc45b6
Parents: 4f04179
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Mar 21 20:47:25 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Mar 21 20:47:25 2016 -0700
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 2 +-
.../consumer/internals/ConsumerCoordinator.java | 4 ++++
.../internals/ConsumerNetworkClient.java | 5 ++--
.../internals/ConsumerCoordinatorTest.java | 8 -------
.../kafka/api/BaseConsumerTest.scala | 24 +++++++++++++-------
.../kafka/api/PlaintextConsumerTest.scala | 15 ++++++++++++
6 files changed, 39 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d8e3aaa/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index b7eafbe..c36b7f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -870,7 +870,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// must return these records to users to process before being interrupted or
// auto-committing offsets
fetcher.sendFetches(metadata.fetch());
- client.quickPoll();
+ client.quickPoll(false);
return this.interceptors == null
? new ConsumerRecords<>(records) : this.interceptors.onConsume(new ConsumerRecords<>(records));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d8e3aaa/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index cf93530..e582ce3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -345,6 +345,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
cb.onComplete(offsets, e);
}
});
+
+ // ensure commit has a chance to be transmitted (without blocking on its completion)
+ // note that we allow delayed tasks to be executed in case heartbeats need to be sent
+ client.quickPoll(true);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d8e3aaa/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 4492306..b70994d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -196,10 +196,11 @@ public class ConsumerNetworkClient implements Closeable {
/**
* Poll for network IO and return immediately. This will not trigger wakeups,
* nor will it execute any delayed tasks.
+ * @param executeDelayedTasks Whether to allow delayed task execution (true allows)
*/
- public void quickPoll() {
+ public void quickPoll(boolean executeDelayedTasks) {
disableWakeups();
- poll(0, time.milliseconds(), false);
+ poll(0, time.milliseconds(), executeDelayedTasks);
enableWakeups();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d8e3aaa/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 260ee7a..8844adc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -627,7 +627,6 @@ public class ConsumerCoordinatorTest {
AtomicBoolean success = new AtomicBoolean(false);
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success));
- consumerClient.poll(0);
assertTrue(success.get());
assertEquals(100L, subscriptions.committed(tp).offset());
@@ -644,7 +643,6 @@ public class ConsumerCoordinatorTest {
AtomicBoolean success = new AtomicBoolean(false);
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "hello")), callback(success));
- consumerClient.poll(0);
assertTrue(success.get());
assertEquals(100L, subscriptions.committed(tp).offset());
@@ -658,7 +656,6 @@ public class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorKnown();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
- consumerClient.poll(0);
assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
assertNull(defaultOffsetCommitCallback.exception);
}
@@ -693,7 +690,6 @@ public class ConsumerCoordinatorTest {
AtomicBoolean success = new AtomicBoolean(false);
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success));
- consumerClient.poll(0);
assertTrue(success.get());
}
@@ -704,7 +700,6 @@ public class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorKnown();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
- consumerClient.poll(0);
assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), defaultOffsetCommitCallback.exception);
}
@@ -718,7 +713,6 @@ public class ConsumerCoordinatorTest {
MockCommitCallback cb = new MockCommitCallback();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
- consumerClient.poll(0);
assertTrue(coordinator.coordinatorUnknown());
assertEquals(1, cb.invoked);
@@ -734,7 +728,6 @@ public class ConsumerCoordinatorTest {
MockCommitCallback cb = new MockCommitCallback();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code())));
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
- consumerClient.poll(0);
assertTrue(coordinator.coordinatorUnknown());
assertEquals(1, cb.invoked);
@@ -750,7 +743,6 @@ public class ConsumerCoordinatorTest {
MockCommitCallback cb = new MockCommitCallback();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
- consumerClient.poll(0);
assertTrue(coordinator.coordinatorUnknown());
assertEquals(1, cb.invoked);
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d8e3aaa/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 9939309..1408cd9 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -81,7 +81,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
// shouldn't make progress until poll is invoked
Thread.sleep(10)
- assertEquals(0, commitCallback.count)
+ assertEquals(0, commitCallback.successCount)
awaitCommitCallback(this.consumers(0), commitCallback)
}
@@ -330,18 +330,26 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
records
}
- protected def awaitCommitCallback[K, V](consumer: Consumer[K, V], commitCallback: CountConsumerCommitCallback): Unit = {
- val startCount = commitCallback.count
+ protected def awaitCommitCallback[K, V](consumer: Consumer[K, V],
+ commitCallback: CountConsumerCommitCallback,
+ count: Int = 1): Unit = {
+ val startCount = commitCallback.successCount
val started = System.currentTimeMillis()
- while (commitCallback.count == startCount && System.currentTimeMillis() - started < 10000)
+ while (commitCallback.successCount < startCount + count && System.currentTimeMillis() - started < 10000)
consumer.poll(50)
- assertEquals(startCount + 1, commitCallback.count)
+ assertEquals(startCount + count, commitCallback.successCount)
}
protected class CountConsumerCommitCallback extends OffsetCommitCallback {
- var count = 0
-
- override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = count += 1
+ var successCount = 0
+ var failCount = 0
+
+ override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
+ if (exception == null)
+ successCount += 1
+ else
+ failCount += 1
+ }
}
protected class ConsumerAssignmentPoller(consumer: Consumer[Array[Byte], Array[Byte]],
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d8e3aaa/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 9c56010..ff2e63d 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -233,6 +233,21 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}
@Test
+ def testAsyncCommit() {
+ val consumer = this.consumers(0)
+ consumer.assign(List(tp).asJava)
+ consumer.poll(0)
+
+ val callback = new CountConsumerCommitCallback
+ val count = 5
+ for (i <- 1 to count)
+ consumer.commitAsync(Map(tp -> new OffsetAndMetadata(i)).asJava, callback)
+
+ awaitCommitCallback(consumer, callback, count=count)
+ assertEquals(new OffsetAndMetadata(count), consumer.committed(tp))
+ }
+
+ @Test
def testExpandingTopicSubscriptions() {
val otherTopic = "other"
val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))