You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/07/15 21:40:44 UTC
[1/3] kafka git commit: KAFKA-2123: add callback in commit api and
use a delayed queue for async requests;
reviewed by Ewen Cheslack-Postava and Guozhang Wang
Repository: kafka
Updated Branches:
refs/heads/trunk a7e0ac365 -> 99c0686be
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 6837453..4d9a425 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -138,7 +138,6 @@ public class SubscriptionState {
public void committed(TopicPartition tp, long offset) {
this.committed.put(tp, offset);
- this.needsFetchCommittedOffsets = false;
}
public Long committed(TopicPartition tp) {
@@ -152,6 +151,10 @@ public class SubscriptionState {
public boolean refreshCommitsNeeded() {
return this.needsFetchCommittedOffsets;
}
+
+ public void commitsRefreshed() {
+ this.needsFetchCommittedOffsets = false;
+ }
public void seek(TopicPartition tp, long offset) {
fetched(tp, offset);
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
new file mode 100644
index 0000000..ba9ce82
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has
+ * not yet been created.
+ */
+public class ConsumerCoordinatorNotAvailableException extends RetriableException {
+
+ private static final long serialVersionUID = 1L;
+
+ public ConsumerCoordinatorNotAvailableException() {
+ super();
+ }
+
+ public ConsumerCoordinatorNotAvailableException(String message) {
+ super(message);
+ }
+
+ public ConsumerCoordinatorNotAvailableException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ConsumerCoordinatorNotAvailableException(Throwable cause) {
+ super(cause);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java b/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java
new file mode 100644
index 0000000..18d61a2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+
+/**
+ * Server disconnected before a request could be completed.
+ */
+public class DisconnectException extends RetriableException {
+
+ private static final long serialVersionUID = 1L;
+
+ public DisconnectException() {
+ super();
+ }
+
+ public DisconnectException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public DisconnectException(String message) {
+ super(message);
+ }
+
+ public DisconnectException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java b/clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
new file mode 100644
index 0000000..d20b74a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class IllegalGenerationException extends RetriableException {
+ private static final long serialVersionUID = 1L;
+
+ public IllegalGenerationException() {
+ super();
+ }
+
+ public IllegalGenerationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public IllegalGenerationException(String message) {
+ super(message);
+ }
+
+ public IllegalGenerationException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
new file mode 100644
index 0000000..b6c83b4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is
+ * not a coordinator for.
+ */
+public class NotCoordinatorForConsumerException extends RetriableException {
+
+ private static final long serialVersionUID = 1L;
+
+ public NotCoordinatorForConsumerException() {
+ super();
+ }
+
+ public NotCoordinatorForConsumerException(String message) {
+ super(message);
+ }
+
+ public NotCoordinatorForConsumerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public NotCoordinatorForConsumerException(Throwable cause) {
+ super(cause);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
new file mode 100644
index 0000000..016506e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change
+ * for that offsets topic partition).
+ */
+public class OffsetLoadInProgressException extends RetriableException {
+
+ private static final long serialVersionUID = 1L;
+
+ public OffsetLoadInProgressException() {
+ super();
+ }
+
+ public OffsetLoadInProgressException(String message) {
+ super(message);
+ }
+
+ public OffsetLoadInProgressException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public OffsetLoadInProgressException(Throwable cause) {
+ super(cause);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
new file mode 100644
index 0000000..9bcbd11
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class UnknownConsumerIdException extends RetriableException {
+ private static final long serialVersionUID = 1L;
+
+ public UnknownConsumerIdException() {
+ super();
+ }
+
+ public UnknownConsumerIdException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public UnknownConsumerIdException(String message) {
+ super(message);
+ }
+
+ public UnknownConsumerIdException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 4c0ecc3..d6c41c1 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -53,11 +53,11 @@ public enum Errors {
NETWORK_EXCEPTION(13,
new NetworkException("The server disconnected before a response was received.")),
OFFSET_LOAD_IN_PROGRESS(14,
- new ApiException("The coordinator is loading offsets and can't process requests.")),
+ new OffsetLoadInProgressException("The coordinator is loading offsets and can't process requests.")),
CONSUMER_COORDINATOR_NOT_AVAILABLE(15,
- new ApiException("The coordinator is not available.")),
+ new ConsumerCoordinatorNotAvailableException("The coordinator is not available.")),
NOT_COORDINATOR_FOR_CONSUMER(16,
- new ApiException("This is not the correct co-ordinator for this consumer.")),
+ new NotCoordinatorForConsumerException("This is not the correct coordinator for this consumer.")),
INVALID_TOPIC_EXCEPTION(17,
new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
RECORD_LIST_TOO_LARGE(18,
@@ -69,13 +69,13 @@ public enum Errors {
INVALID_REQUIRED_ACKS(21,
new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")),
ILLEGAL_GENERATION(22,
- new ApiException("Specified consumer generation id is not valid.")),
+ new IllegalGenerationException("Specified consumer generation id is not valid.")),
INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY(23,
new ApiException("The request partition assignment strategy does not match that of the group.")),
UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY(24,
new ApiException("The request partition assignment strategy is unknown to the broker.")),
UNKNOWN_CONSUMER_ID(25,
- new ApiException("The coordinator is not aware of this consumer.")),
+ new UnknownConsumerIdException("The coordinator is not aware of this consumer.")),
INVALID_SESSION_TIMEOUT(26,
new ApiException("The session timeout is not within an acceptable range.")),
COMMITTING_PARTITIONS_NOT_ASSIGNED(27,
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
new file mode 100644
index 0000000..9de1cee
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.HeartbeatRequest;
+import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ConsumerNetworkClientTest {
+
+ private String topicName = "test";
+ private MockTime time = new MockTime();
+ private MockClient client = new MockClient(time);
+ private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
+ private Node node = cluster.nodes().get(0);
+ private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+ private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+
+ @Test
+ public void send() {
+ client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
+ RequestFuture<ClientResponse> future = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+ assertEquals(1, consumerClient.pendingRequestCount());
+ assertEquals(1, consumerClient.pendingRequestCount(node));
+ assertFalse(future.isDone());
+
+ consumerClient.poll(future);
+ assertTrue(future.isDone());
+ assertTrue(future.succeeded());
+
+ ClientResponse clientResponse = future.value();
+ HeartbeatResponse response = new HeartbeatResponse(clientResponse.responseBody());
+ assertEquals(Errors.NONE.code(), response.errorCode());
+ }
+
+ @Test
+ public void multiSend() {
+ client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
+ client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
+ RequestFuture<ClientResponse> future1 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+ RequestFuture<ClientResponse> future2 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+ assertEquals(2, consumerClient.pendingRequestCount());
+ assertEquals(2, consumerClient.pendingRequestCount(node));
+
+ consumerClient.awaitPendingRequests(node);
+ assertTrue(future1.succeeded());
+ assertTrue(future2.succeeded());
+ }
+
+ @Test
+ public void schedule() {
+ TestDelayedTask task = new TestDelayedTask();
+ consumerClient.schedule(task, time.milliseconds());
+ consumerClient.poll(0);
+ assertEquals(1, task.executions);
+
+ consumerClient.schedule(task, time.milliseconds() + 100);
+ consumerClient.poll(0);
+ assertEquals(1, task.executions);
+
+ time.sleep(100);
+ consumerClient.poll(0);
+ assertEquals(2, task.executions);
+ }
+
+ @Test
+ public void wakeup() {
+ RequestFuture<ClientResponse> future = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+ consumerClient.wakeup();
+ try {
+ consumerClient.poll(0);
+ fail();
+ } catch (ConsumerWakeupException e) {
+ }
+
+ client.respond(heartbeatResponse(Errors.NONE.code()));
+ consumerClient.poll(future);
+ assertTrue(future.isDone());
+ }
+
+
+ private HeartbeatRequest heartbeatRequest() {
+ return new HeartbeatRequest("group", 1, "consumerId");
+ }
+
+ private Struct heartbeatResponse(short error) {
+ HeartbeatResponse response = new HeartbeatResponse(error);
+ return response.toStruct();
+ }
+
+ private static class TestDelayedTask implements DelayedTask {
+ int executions = 0;
+ @Override
+ public void run(long now) {
+ executions++;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
index d085fe5..ca832be 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -18,13 +18,19 @@ package org.apache.kafka.clients.consumer.internals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.CommitType;
+import org.apache.kafka.clients.consumer.ConsumerCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
@@ -36,10 +42,12 @@ import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
+import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Before;
import org.junit.Test;
@@ -51,108 +59,173 @@ public class CoordinatorTest {
private String groupId = "test-group";
private TopicPartition tp = new TopicPartition(topicName, 0);
private int sessionTimeoutMs = 10;
+ private long retryBackoffMs = 100;
+ private long requestTimeoutMs = 5000;
private String rebalanceStrategy = "not-matter";
- private MockTime time = new MockTime();
- private MockClient client = new MockClient(time);
+ private MockTime time;
+ private MockClient client;
private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
private Node node = cluster.nodes().get(0);
- private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
- private Metrics metrics = new Metrics(time);
+ private SubscriptionState subscriptions;
+ private Metadata metadata;
+ private Metrics metrics;
private Map<String, String> metricTags = new LinkedHashMap<String, String>();
-
- private Coordinator coordinator = new Coordinator(client,
- groupId,
- sessionTimeoutMs,
- rebalanceStrategy,
- subscriptions,
- metrics,
- "consumer" + groupId,
- metricTags,
- time);
+ private ConsumerNetworkClient consumerClient;
+ private MockRebalanceCallback rebalanceCallback;
+ private Coordinator coordinator;
@Before
public void setup() {
+ this.time = new MockTime();
+ this.client = new MockClient(time);
+ this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
+ this.metadata = new Metadata(0, Long.MAX_VALUE);
+ this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+ this.metrics = new Metrics(time);
+ this.rebalanceCallback = new MockRebalanceCallback();
+
client.setNode(node);
+
+ this.coordinator = new Coordinator(consumerClient,
+ groupId,
+ sessionTimeoutMs,
+ rebalanceStrategy,
+ subscriptions,
+ metrics,
+ "consumer" + groupId,
+ metricTags,
+ time,
+ requestTimeoutMs,
+ retryBackoffMs,
+ rebalanceCallback);
}
@Test
public void testNormalHeartbeat() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.discoverConsumerCoordinator();
- client.poll(0, time.milliseconds());
+ coordinator.ensureCoordinatorKnown();
// normal heartbeat
time.sleep(sessionTimeoutMs);
- coordinator.maybeHeartbeat(time.milliseconds()); // should send out the heartbeat
- assertEquals(1, client.inFlightRequestCount());
- client.respond(heartbeatResponse(Errors.NONE.code()));
- assertEquals(1, client.poll(0, time.milliseconds()).size());
+ RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+ assertEquals(1, consumerClient.pendingRequestCount());
+ assertFalse(future.isDone());
+
+ client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
+ consumerClient.poll(0);
+
+ assertTrue(future.isDone());
+ assertTrue(future.succeeded());
}
@Test
public void testCoordinatorNotAvailable() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.discoverConsumerCoordinator();
- client.poll(0, time.milliseconds());
+ coordinator.ensureCoordinatorKnown();
// consumer_coordinator_not_available will mark coordinator as unknown
time.sleep(sessionTimeoutMs);
- coordinator.maybeHeartbeat(time.milliseconds()); // should send out the heartbeat
- assertEquals(1, client.inFlightRequestCount());
- client.respond(heartbeatResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()));
+ RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+ assertEquals(1, consumerClient.pendingRequestCount());
+ assertFalse(future.isDone());
+
+ client.prepareResponse(heartbeatResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()));
time.sleep(sessionTimeoutMs);
- assertEquals(1, client.poll(0, time.milliseconds()).size());
+ consumerClient.poll(0);
+
+ assertTrue(future.isDone());
+ assertTrue(future.failed());
+ assertEquals(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception(), future.exception());
assertTrue(coordinator.coordinatorUnknown());
}
@Test
public void testNotCoordinator() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.discoverConsumerCoordinator();
- client.poll(0, time.milliseconds());
+ coordinator.ensureCoordinatorKnown();
// not_coordinator will mark coordinator as unknown
time.sleep(sessionTimeoutMs);
- coordinator.maybeHeartbeat(time.milliseconds()); // should send out the heartbeat
- assertEquals(1, client.inFlightRequestCount());
- client.respond(heartbeatResponse(Errors.NOT_COORDINATOR_FOR_CONSUMER.code()));
+ RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+ assertEquals(1, consumerClient.pendingRequestCount());
+ assertFalse(future.isDone());
+
+ client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR_FOR_CONSUMER.code()));
time.sleep(sessionTimeoutMs);
- assertEquals(1, client.poll(0, time.milliseconds()).size());
+ consumerClient.poll(0);
+
+ assertTrue(future.isDone());
+ assertTrue(future.failed());
+ assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.exception(), future.exception());
assertTrue(coordinator.coordinatorUnknown());
}
@Test
public void testIllegalGeneration() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.discoverConsumerCoordinator();
- client.poll(0, time.milliseconds());
+ coordinator.ensureCoordinatorKnown();
// illegal_generation will cause re-partition
subscriptions.subscribe(topicName);
subscriptions.changePartitionAssignment(Collections.singletonList(tp));
time.sleep(sessionTimeoutMs);
- coordinator.maybeHeartbeat(time.milliseconds()); // should send out the heartbeat
- assertEquals(1, client.inFlightRequestCount());
- client.respond(heartbeatResponse(Errors.ILLEGAL_GENERATION.code()));
+ RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+ assertEquals(1, consumerClient.pendingRequestCount());
+ assertFalse(future.isDone());
+
+ client.prepareResponse(heartbeatResponse(Errors.ILLEGAL_GENERATION.code()));
time.sleep(sessionTimeoutMs);
- assertEquals(1, client.poll(0, time.milliseconds()).size());
+ consumerClient.poll(0);
+
+ assertTrue(future.isDone());
+ assertTrue(future.failed());
+ assertEquals(Errors.ILLEGAL_GENERATION.exception(), future.exception());
+ assertTrue(subscriptions.partitionAssignmentNeeded());
+ }
+
+ @Test
+ public void testUnknownConsumerId() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // illegal_generation will cause re-partition
+ subscriptions.subscribe(topicName);
+ subscriptions.changePartitionAssignment(Collections.singletonList(tp));
+
+ time.sleep(sessionTimeoutMs);
+ RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+ assertEquals(1, consumerClient.pendingRequestCount());
+ assertFalse(future.isDone());
+
+ client.prepareResponse(heartbeatResponse(Errors.UNKNOWN_CONSUMER_ID.code()));
+ time.sleep(sessionTimeoutMs);
+ consumerClient.poll(0);
+
+ assertTrue(future.isDone());
+ assertTrue(future.failed());
+ assertEquals(Errors.UNKNOWN_CONSUMER_ID.exception(), future.exception());
assertTrue(subscriptions.partitionAssignmentNeeded());
}
@Test
public void testCoordinatorDisconnect() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.discoverConsumerCoordinator();
- client.poll(0, time.milliseconds());
+ coordinator.ensureCoordinatorKnown();
// coordinator disconnect will mark coordinator as unknown
time.sleep(sessionTimeoutMs);
- coordinator.maybeHeartbeat(time.milliseconds()); // should send out the heartbeat
- assertEquals(1, client.inFlightRequestCount());
- client.respond(heartbeatResponse(Errors.NONE.code()), true); // return disconnected
+ RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+ assertEquals(1, consumerClient.pendingRequestCount());
+ assertFalse(future.isDone());
+
+ client.prepareResponse(heartbeatResponse(Errors.NONE.code()), true); // return disconnected
time.sleep(sessionTimeoutMs);
- assertEquals(1, client.poll(0, time.milliseconds()).size());
+ consumerClient.poll(0);
+
+ assertTrue(future.isDone());
+ assertTrue(future.failed());
+ assertTrue(future.exception() instanceof DisconnectException);
assertTrue(coordinator.coordinatorUnknown());
}
@@ -162,16 +235,18 @@ public class CoordinatorTest {
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.discoverConsumerCoordinator();
- client.poll(0, time.milliseconds());
+ coordinator.ensureCoordinatorKnown();
// normal join group
client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()));
- coordinator.assignPartitions(time.milliseconds());
- client.poll(0, time.milliseconds());
+ coordinator.ensurePartitionAssignment();
assertFalse(subscriptions.partitionAssignmentNeeded());
assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+ assertEquals(1, rebalanceCallback.revokedCount);
+ assertEquals(Collections.emptySet(), rebalanceCallback.revoked);
+ assertEquals(1, rebalanceCallback.assignedCount);
+ assertEquals(Collections.singleton(tp), rebalanceCallback.assigned);
}
@Test
@@ -180,165 +255,228 @@ public class CoordinatorTest {
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.discoverConsumerCoordinator();
- client.poll(0, time.milliseconds());
- assertTrue(subscriptions.partitionAssignmentNeeded());
+ coordinator.ensureCoordinatorKnown();
- // diconnected from original coordinator will cause re-discover and join again
+ // disconnected from original coordinator will cause re-discover and join again
client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()), true);
- coordinator.assignPartitions(time.milliseconds());
- client.poll(0, time.milliseconds());
- assertTrue(subscriptions.partitionAssignmentNeeded());
-
- // rediscover the coordinator
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.discoverConsumerCoordinator();
- client.poll(0, time.milliseconds());
-
- // try assigning partitions again
client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()));
- coordinator.assignPartitions(time.milliseconds());
- client.poll(0, time.milliseconds());
+ coordinator.ensurePartitionAssignment();
assertFalse(subscriptions.partitionAssignmentNeeded());
assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+ assertEquals(1, rebalanceCallback.revokedCount);
+ assertEquals(Collections.emptySet(), rebalanceCallback.revoked);
+ assertEquals(1, rebalanceCallback.assignedCount);
+ assertEquals(Collections.singleton(tp), rebalanceCallback.assigned);
}
+ @Test(expected = ApiException.class)
+ public void testUnknownPartitionAssignmentStrategy() {
+ subscriptions.subscribe(topicName);
+ subscriptions.needReassignment();
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // coordinator doesn't like our assignment strategy
+ client.prepareResponse(joinGroupResponse(0, "consumer", Collections.<TopicPartition>emptyList(), Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code()));
+ coordinator.ensurePartitionAssignment();
+ }
+
+ @Test(expected = ApiException.class)
+ public void testInvalidSessionTimeout() {
+ subscriptions.subscribe(topicName);
+ subscriptions.needReassignment();
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // coordinator doesn't like our assignment strategy
+ client.prepareResponse(joinGroupResponse(0, "consumer", Collections.<TopicPartition>emptyList(), Errors.INVALID_SESSION_TIMEOUT.code()));
+ coordinator.ensurePartitionAssignment();
+ }
@Test
public void testCommitOffsetNormal() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.discoverConsumerCoordinator();
- client.poll(0, time.milliseconds());
+ coordinator.ensureCoordinatorKnown();
- // With success flag
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
- RequestFuture<Void> result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
- assertEquals(1, client.poll(0, time.milliseconds()).size());
- assertTrue(result.isDone());
- assertTrue(result.succeeded());
- // Without success flag
- coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
- client.respond(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
- assertEquals(1, client.poll(0, time.milliseconds()).size());
+ AtomicBoolean success = new AtomicBoolean(false);
+ coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.ASYNC, callback(success));
+ consumerClient.poll(0);
+ assertTrue(success.get());
}
@Test
- public void testCommitOffsetError() {
+ public void testCommitOffsetAsyncCoordinatorNotAvailable() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.discoverConsumerCoordinator();
- client.poll(0, time.milliseconds());
+ coordinator.ensureCoordinatorKnown();
// async commit with coordinator not available
+ MockCommitCallback cb = new MockCommitCallback();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
- coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
- assertEquals(1, client.poll(0, time.milliseconds()).size());
+ coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.ASYNC, cb);
+ consumerClient.poll(0);
+
assertTrue(coordinator.coordinatorUnknown());
- // resume
+ assertEquals(1, cb.invoked);
+ assertEquals(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception(), cb.exception);
+ }
+
+ @Test
+ public void testCommitOffsetAsyncNotCoordinator() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.discoverConsumerCoordinator();
- client.poll(0, time.milliseconds());
+ coordinator.ensureCoordinatorKnown();
// async commit with not coordinator
+ MockCommitCallback cb = new MockCommitCallback();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
- coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
- assertEquals(1, client.poll(0, time.milliseconds()).size());
+ coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.ASYNC, cb);
+ consumerClient.poll(0);
+
assertTrue(coordinator.coordinatorUnknown());
- // resume
+ assertEquals(1, cb.invoked);
+ assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.exception(), cb.exception);
+ }
+
+ @Test
+ public void testCommitOffsetAsyncDisconnected() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.discoverConsumerCoordinator();
- client.poll(0, time.milliseconds());
+ coordinator.ensureCoordinatorKnown();
- // sync commit with not_coordinator
+ // async commit with coordinator disconnected
+ MockCommitCallback cb = new MockCommitCallback();
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
+ coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.ASYNC, cb);
+ consumerClient.poll(0);
+
+ assertTrue(coordinator.coordinatorUnknown());
+ assertEquals(1, cb.invoked);
+ assertTrue(cb.exception instanceof DisconnectException);
+ }
+
+ @Test
+ public void testCommitOffsetSyncNotCoordinator() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
+ MockCommitCallback cb = new MockCommitCallback();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
- RequestFuture<Void> result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
- assertEquals(1, client.poll(0, time.milliseconds()).size());
- assertTrue(result.isDone());
- assertEquals(RequestFuture.RetryAction.FIND_COORDINATOR, result.retryAction());
+ coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.SYNC, cb);
+ assertEquals(1, cb.invoked);
+ assertNull(cb.exception);
+ }
- // sync commit with coordinator disconnected
- client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
- client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
- result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
+ @Test
+ public void testCommitOffsetSyncCoordinatorNotAvailable() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
- assertEquals(0, client.poll(0, time.milliseconds()).size());
- assertTrue(result.isDone());
- assertEquals(RequestFuture.RetryAction.FIND_COORDINATOR, result.retryAction());
+ // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
+ MockCommitCallback cb = new MockCommitCallback();
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+ coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.SYNC, cb);
+ assertEquals(1, cb.invoked);
+ assertNull(cb.exception);
+ }
+ @Test
+ public void testCommitOffsetSyncCoordinatorDisconnected() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.discoverConsumerCoordinator();
- client.poll(0, time.milliseconds());
+ coordinator.ensureCoordinatorKnown();
- result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
- assertEquals(1, client.poll(0, time.milliseconds()).size());
- assertTrue(result.isDone());
- assertTrue(result.succeeded());
+ // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
+ MockCommitCallback cb = new MockCommitCallback();
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+ coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.SYNC, cb);
+ assertEquals(1, cb.invoked);
+ assertNull(cb.exception);
}
+ @Test(expected = ApiException.class)
+ public void testCommitOffsetSyncThrowsNonRetriableException() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // sync commit with invalid partitions should throw if we have no callback
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.COMMITTING_PARTITIONS_NOT_ASSIGNED.code())), false);
+ coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.SYNC, null);
+ }
@Test
- public void testFetchOffset() {
+ public void testCommitOffsetSyncCallbackHandlesNonRetriableException() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // sync commit with invalid partitions should throw if we have no callback
+ MockCommitCallback cb = new MockCommitCallback();
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.COMMITTING_PARTITIONS_NOT_ASSIGNED.code())), false);
+ coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.SYNC, cb);
+ assertTrue(cb.exception instanceof ApiException);
+ }
+ @Test
+ public void testRefreshOffset() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.discoverConsumerCoordinator();
- client.poll(0, time.milliseconds());
+ coordinator.ensureCoordinatorKnown();
- // normal fetch
+ subscriptions.subscribe(tp);
+ subscriptions.needRefreshCommits();
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
- RequestFuture<Map<TopicPartition, Long>> result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
- client.poll(0, time.milliseconds());
- assertTrue(result.isDone());
- assertEquals(100L, (long) result.value().get(tp));
+ coordinator.refreshCommittedOffsetsIfNeeded();
+ assertFalse(subscriptions.refreshCommitsNeeded());
+ assertEquals(100L, (long) subscriptions.committed(tp));
+ }
- // fetch with loading in progress
+ @Test
+ public void testRefreshOffsetLoadInProgress() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ subscriptions.subscribe(tp);
+ subscriptions.needRefreshCommits();
client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L));
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
+ coordinator.refreshCommittedOffsetsIfNeeded();
+ assertFalse(subscriptions.refreshCommitsNeeded());
+ assertEquals(100L, (long) subscriptions.committed(tp));
+ }
- result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
- client.poll(0, time.milliseconds());
- assertTrue(result.isDone());
- assertTrue(result.failed());
- assertEquals(RequestFuture.RetryAction.BACKOFF, result.retryAction());
-
- result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
- client.poll(0, time.milliseconds());
- assertTrue(result.isDone());
- assertEquals(100L, (long) result.value().get(tp));
+ @Test
+ public void testRefreshOffsetNotCoordinatorForConsumer() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
- // fetch with not coordinator
+ subscriptions.subscribe(tp);
+ subscriptions.needRefreshCommits();
client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code(), "", 100L));
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
+ coordinator.refreshCommittedOffsetsIfNeeded();
+ assertFalse(subscriptions.refreshCommitsNeeded());
+ assertEquals(100L, (long) subscriptions.committed(tp));
+ }
- result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
- client.poll(0, time.milliseconds());
- assertTrue(result.isDone());
- assertTrue(result.failed());
- assertEquals(RequestFuture.RetryAction.FIND_COORDINATOR, result.retryAction());
-
- coordinator.discoverConsumerCoordinator();
- client.poll(0, time.milliseconds());
-
- result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
- client.poll(0, time.milliseconds());
- assertTrue(result.isDone());
- assertEquals(100L, (long) result.value().get(tp));
-
- // fetch with no fetchable offsets
- client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
- result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
- client.poll(0, time.milliseconds());
- assertTrue(result.isDone());
- assertTrue(result.value().isEmpty());
+ @Test
+ public void testRefreshOffsetWithNoFetchableOffsets() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
- // fetch with offset -1
+ subscriptions.subscribe(tp);
+ subscriptions.needRefreshCommits();
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
- result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
- client.poll(0, time.milliseconds());
- assertTrue(result.isDone());
- assertTrue(result.value().isEmpty());
+ coordinator.refreshCommittedOffsetsIfNeeded();
+ assertFalse(subscriptions.refreshCommitsNeeded());
+ assertEquals(null, subscriptions.committed(tp));
}
private Struct consumerMetadataResponse(Node node, short error) {
@@ -366,4 +504,45 @@ public class CoordinatorTest {
OffsetFetchResponse response = new OffsetFetchResponse(Collections.singletonMap(tp, data));
return response.toStruct();
}
+
+ private ConsumerCommitCallback callback(final AtomicBoolean success) {
+ return new ConsumerCommitCallback() {
+ @Override
+ public void onComplete(Map<TopicPartition, Long> offsets, Exception exception) {
+ if (exception == null)
+ success.set(true);
+ }
+ };
+ }
+
+ private static class MockCommitCallback implements ConsumerCommitCallback {
+ public int invoked = 0;
+ public Exception exception = null;
+
+ @Override
+ public void onComplete(Map<TopicPartition, Long> offsets, Exception exception) {
+ invoked++;
+ this.exception = exception;
+ }
+ }
+
+ private static class MockRebalanceCallback implements Coordinator.RebalanceCallback {
+ public Collection<TopicPartition> revoked;
+ public Collection<TopicPartition> assigned;
+ public int revokedCount = 0;
+ public int assignedCount = 0;
+
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+ this.assigned = partitions;
+ assignedCount++;
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ this.revoked = partitions;
+ revokedCount++;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
new file mode 100644
index 0000000..db87b66
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+public class DelayedTaskQueueTest {
+ private DelayedTaskQueue scheduler = new DelayedTaskQueue();
+ private ArrayList<DelayedTask> executed = new ArrayList<DelayedTask>();
+
+ @Test
+ public void testScheduling() {
+ // Empty scheduler
+ assertEquals(Long.MAX_VALUE, scheduler.nextTimeout(0));
+ scheduler.poll(0);
+ assertEquals(Collections.emptyList(), executed);
+
+ TestTask task1 = new TestTask();
+ TestTask task2 = new TestTask();
+ TestTask task3 = new TestTask();
+ scheduler.add(task1, 20);
+ assertEquals(20, scheduler.nextTimeout(0));
+ scheduler.add(task2, 10);
+ assertEquals(10, scheduler.nextTimeout(0));
+ scheduler.add(task3, 30);
+ assertEquals(10, scheduler.nextTimeout(0));
+
+ scheduler.poll(5);
+ assertEquals(Collections.emptyList(), executed);
+ assertEquals(5, scheduler.nextTimeout(5));
+
+ scheduler.poll(10);
+ assertEquals(Arrays.asList(task2), executed);
+ assertEquals(10, scheduler.nextTimeout(10));
+
+ scheduler.poll(20);
+ assertEquals(Arrays.asList(task2, task1), executed);
+ assertEquals(20, scheduler.nextTimeout(10));
+
+ scheduler.poll(30);
+ assertEquals(Arrays.asList(task2, task1, task3), executed);
+ assertEquals(Long.MAX_VALUE, scheduler.nextTimeout(30));
+ }
+
+ @Test
+ public void testRemove() {
+ TestTask task1 = new TestTask();
+ TestTask task2 = new TestTask();
+ TestTask task3 = new TestTask();
+ scheduler.add(task1, 20);
+ scheduler.add(task2, 10);
+ scheduler.add(task3, 30);
+ scheduler.add(task1, 40);
+ assertEquals(10, scheduler.nextTimeout(0));
+
+ scheduler.remove(task2);
+ assertEquals(20, scheduler.nextTimeout(0));
+
+ scheduler.remove(task1);
+ assertEquals(30, scheduler.nextTimeout(0));
+
+ scheduler.remove(task3);
+ assertEquals(Long.MAX_VALUE, scheduler.nextTimeout(0));
+ }
+
+ private class TestTask implements DelayedTask {
+ @Override
+ public void run(long now) {
+ executed.add(this);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 405efdc..7a4e586 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -52,6 +52,7 @@ public class FetcherTest {
private int minBytes = 1;
private int maxWaitMs = 0;
private int fetchSize = 1000;
+ private long retryBackoffMs = 100;
private MockTime time = new MockTime();
private MockClient client = new MockClient(time);
private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
@@ -60,10 +61,11 @@ public class FetcherTest {
private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
private Metrics metrics = new Metrics(time);
private Map<String, String> metricTags = new LinkedHashMap<String, String>();
+ private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
- private Fetcher<byte[], byte[]> fetcher = new Fetcher<byte[], byte[]>(client,
+ private Fetcher<byte[], byte[]> fetcher = new Fetcher<byte[], byte[]>(consumerClient,
minBytes,
maxWaitMs,
fetchSize,
@@ -75,7 +77,8 @@ public class FetcherTest {
metrics,
"consumer" + groupId,
metricTags,
- time);
+ time,
+ retryBackoffMs);
@Before
public void setup() throws Exception {
@@ -97,9 +100,9 @@ public class FetcherTest {
subscriptions.consumed(tp, 0);
// normal fetch
- fetcher.initFetches(cluster, time.milliseconds());
- client.respond(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
- client.poll(0, time.milliseconds());
+ fetcher.initFetches(cluster);
+ client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
+ consumerClient.poll(0);
records = fetcher.fetchedRecords().get(tp);
assertEquals(3, records.size());
assertEquals(4L, (long) subscriptions.fetched(tp)); // this is the next fetching position
@@ -119,24 +122,24 @@ public class FetcherTest {
subscriptions.consumed(tp, 0);
// fetch with not leader
- fetcher.initFetches(cluster, time.milliseconds());
- client.respond(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L));
- client.poll(0, time.milliseconds());
+ fetcher.initFetches(cluster);
+ client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L));
+ consumerClient.poll(0);
assertEquals(0, fetcher.fetchedRecords().size());
assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
// fetch with unknown topic partition
- fetcher.initFetches(cluster, time.milliseconds());
- client.respond(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L));
- client.poll(0, time.milliseconds());
+ fetcher.initFetches(cluster);
+ client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L));
+ consumerClient.poll(0);
assertEquals(0, fetcher.fetchedRecords().size());
assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
// fetch with out of range
subscriptions.fetched(tp, 5);
- fetcher.initFetches(cluster, time.milliseconds());
- client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
- client.poll(0, time.milliseconds());
+ fetcher.initFetches(cluster);
+ client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
+ consumerClient.poll(0);
assertTrue(subscriptions.isOffsetResetNeeded(tp));
assertEquals(0, fetcher.fetchedRecords().size());
assertEquals(null, subscriptions.fetched(tp));
@@ -151,9 +154,9 @@ public class FetcherTest {
subscriptions.consumed(tp, 5);
// fetch with out of range
- fetcher.initFetches(cluster, time.milliseconds());
- client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
- client.poll(0, time.milliseconds());
+ fetcher.initFetches(cluster);
+ client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
+ consumerClient.poll(0);
assertTrue(subscriptions.isOffsetResetNeeded(tp));
assertEquals(0, fetcher.fetchedRecords().size());
assertEquals(null, subscriptions.fetched(tp));
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
index ee1ede0..b587e14 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
@@ -51,4 +51,19 @@ public class HeartbeatTest {
assertEquals(0, heartbeat.timeToNextHeartbeat(100));
assertEquals(0, heartbeat.timeToNextHeartbeat(200));
}
+
+ @Test
+ public void testSessionTimeoutExpired() {
+ heartbeat.sentHeartbeat(time.milliseconds());
+ time.sleep(305);
+ assertTrue(heartbeat.sessionTimeoutExpired(time.milliseconds()));
+ }
+
+ @Test
+ public void testResetSession() {
+ heartbeat.sentHeartbeat(time.milliseconds());
+ time.sleep(305);
+ heartbeat.resetSessionTimeout(time.milliseconds());
+ assertFalse(heartbeat.sessionTimeoutExpired(time.milliseconds()));
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
new file mode 100644
index 0000000..7372754
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class RequestFutureTest {
+
+ @Test
+ public void testComposeSuccessCase() {
+ RequestFuture<String> future = new RequestFuture<String>();
+ RequestFuture<Integer> composed = future.compose(new RequestFutureAdapter<String, Integer>() {
+ @Override
+ public void onSuccess(String value, RequestFuture<Integer> future) {
+ future.complete(value.length());
+ }
+ });
+
+ future.complete("hello");
+
+ assertTrue(composed.isDone());
+ assertTrue(composed.succeeded());
+ assertEquals(5, (int) composed.value());
+ }
+
+ @Test
+ public void testComposeFailureCase() {
+ RequestFuture<String> future = new RequestFuture<String>();
+ RequestFuture<Integer> composed = future.compose(new RequestFutureAdapter<String, Integer>() {
+ @Override
+ public void onSuccess(String value, RequestFuture<Integer> future) {
+ future.complete(value.length());
+ }
+ });
+
+ RuntimeException e = new RuntimeException();
+ future.raise(e);
+
+ assertTrue(composed.isDone());
+ assertTrue(composed.failed());
+ assertEquals(e, composed.exception());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index 92ffb91..3eb5f95 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -12,17 +12,13 @@
*/
package kafka.api
+import java.{lang, util}
+
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.clients.consumer.Consumer
-import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.clients.consumer.CommitType
+import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.clients.consumer.NoOffsetForPartitionException
import kafka.utils.{TestUtils, Logging}
import kafka.server.KafkaConfig
@@ -46,6 +42,8 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
val topic = "topic"
val part = 0
val tp = new TopicPartition(topic, part)
+ val part2 = 1
+ val tp2 = new TopicPartition(topic, part2)
// configure the servers and clients
this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
@@ -56,12 +54,13 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-
+ this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+
override def setUp() {
super.setUp()
// create the test topic with all the brokers as replicas
- TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
+ TestUtils.createTopic(this.zkClient, topic, 2, serverCount, this.servers)
}
def testSimpleConsumption() {
@@ -74,6 +73,45 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
this.consumers(0).seek(tp, 0)
consumeRecords(this.consumers(0), numRecords = numRecords, startingOffset = 0)
+
+ // check async commit callbacks
+ val commitCallback = new CountConsumerCommitCallback()
+ this.consumers(0).commit(CommitType.ASYNC, commitCallback)
+
+ // shouldn't make progress until poll is invoked
+ Thread.sleep(10)
+ assertEquals(0, commitCallback.count)
+ awaitCommitCallback(this.consumers(0), commitCallback)
+ }
+
+ def testCommitSpecifiedOffsets() {
+ sendRecords(5, tp)
+ sendRecords(7, tp2)
+
+ this.consumers(0).subscribe(tp)
+ this.consumers(0).subscribe(tp2)
+
+ // Need to poll to join the group
+ this.consumers(0).poll(50)
+ val pos1 = this.consumers(0).position(tp)
+ val pos2 = this.consumers(0).position(tp2)
+ this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp, 3L)), CommitType.SYNC)
+ assertEquals(3, this.consumers(0).committed(tp))
+ intercept[NoOffsetForPartitionException] {
+ this.consumers(0).committed(tp2)
+ }
+ // positions should not change
+ assertEquals(pos1, this.consumers(0).position(tp))
+ assertEquals(pos2, this.consumers(0).position(tp2))
+ this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp2, 5L)), CommitType.SYNC)
+ assertEquals(3, this.consumers(0).committed(tp))
+ assertEquals(5, this.consumers(0).committed(tp2))
+
+ // Using async should pick up the committed changes after commit completes
+ val commitCallback = new CountConsumerCommitCallback()
+ this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp2, 7L)), CommitType.ASYNC, commitCallback)
+ awaitCommitCallback(this.consumers(0), commitCallback)
+ assertEquals(7, this.consumers(0).committed(tp2))
}
def testAutoOffsetReset() {
@@ -150,7 +188,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
def testPartitionReassignmentCallback() {
val callback = new TestConsumerReassignmentCallback()
- this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "200"); // timeout quickly to avoid slow test
+ this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(), new ByteArrayDeserializer())
consumer0.subscribe(topic)
@@ -172,6 +210,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
// this should cause another callback execution
while(callback.callsToAssigned < 2)
consumer0.poll(50)
+
assertEquals(2, callback.callsToAssigned)
assertEquals(2, callback.callsToRevoked)
@@ -191,9 +230,13 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
}
}
- private def sendRecords(numRecords: Int) {
+ private def sendRecords(numRecords: Int): Unit = {
+ sendRecords(numRecords, tp)
+ }
+
+ private def sendRecords(numRecords: Int, tp: TopicPartition) {
val futures = (0 until numRecords).map { i =>
- this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
+ this.producers(0).send(new ProducerRecord(tp.topic(), tp.partition(), i.toString.getBytes, i.toString.getBytes))
}
futures.map(_.get)
}
@@ -218,4 +261,18 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
}
}
+ private def awaitCommitCallback(consumer: Consumer[Array[Byte], Array[Byte]], commitCallback: CountConsumerCommitCallback): Unit = {
+ val startCount = commitCallback.count
+ val started = System.currentTimeMillis()
+ while (commitCallback.count == startCount && System.currentTimeMillis() - started < 10000)
+ this.consumers(0).poll(10000)
+ assertEquals(startCount + 1, commitCallback.count)
+ }
+
+ private class CountConsumerCommitCallback extends ConsumerCommitCallback {
+ var count = 0
+
+ override def onComplete(offsets: util.Map[TopicPartition, lang.Long], exception: Exception): Unit = count += 1
+ }
+
}
\ No newline at end of file
[3/3] kafka git commit: KAFKA-2123: add callback in commit api and
use a delayed queue for async requests;
reviewed by Ewen Cheslack-Postava and Guozhang Wang
Posted by gu...@apache.org.
KAFKA-2123: add callback in commit api and use a delayed queue for async requests; reviewed by Ewen Cheslack-Postava and Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/99c0686b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/99c0686b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/99c0686b
Branch: refs/heads/trunk
Commit: 99c0686be2141a0fffe1c55e279370a87ef8c1ea
Parents: a7e0ac3
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Jul 15 12:38:45 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jul 15 12:38:45 2015 -0700
----------------------------------------------------------------------
.../apache/kafka/clients/consumer/Consumer.java | 10 +
.../consumer/ConsumerCommitCallback.java | 33 +
.../consumer/ConsumerRebalanceCallback.java | 6 +-
.../kafka/clients/consumer/ConsumerRecords.java | 4 +
.../kafka/clients/consumer/KafkaConsumer.java | 512 ++++----------
.../kafka/clients/consumer/MockConsumer.java | 19 +-
.../internals/ConsumerNetworkClient.java | 296 +++++++++
.../clients/consumer/internals/Coordinator.java | 660 ++++++++++++-------
.../clients/consumer/internals/DelayedTask.java | 24 +
.../consumer/internals/DelayedTaskQueue.java | 96 +++
.../clients/consumer/internals/Fetcher.java | 182 +++--
.../clients/consumer/internals/Heartbeat.java | 27 +-
.../internals/NoAvailableBrokersException.java | 23 +
.../consumer/internals/RequestFuture.java | 219 +++---
.../internals/RequestFutureAdapter.java | 28 +
.../internals/RequestFutureListener.java | 23 +
.../consumer/internals/SendFailedException.java | 27 +
.../internals/StaleMetadataException.java | 22 +
.../consumer/internals/SubscriptionState.java | 5 +-
...onsumerCoordinatorNotAvailableException.java | 40 ++
.../common/errors/DisconnectException.java | 39 ++
.../errors/IllegalGenerationException.java | 33 +
.../NotCoordinatorForConsumerException.java | 40 ++
.../errors/OffsetLoadInProgressException.java | 40 ++
.../errors/UnknownConsumerIdException.java | 33 +
.../apache/kafka/common/protocol/Errors.java | 10 +-
.../internals/ConsumerNetworkClientTest.java | 125 ++++
.../consumer/internals/CoordinatorTest.java | 479 +++++++++-----
.../internals/DelayedTaskQueueTest.java | 89 +++
.../clients/consumer/internals/FetcherTest.java | 37 +-
.../consumer/internals/HeartbeatTest.java | 15 +
.../consumer/internals/RequestFutureTest.java | 57 ++
.../integration/kafka/api/ConsumerTest.scala | 81 ++-
33 files changed, 2350 insertions(+), 984 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index fd98740..252b759 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -64,11 +64,21 @@ public interface Consumer<K, V> extends Closeable {
public void commit(CommitType commitType);
/**
+ * @see KafkaConsumer#commit(CommitType, ConsumerCommitCallback)
+ */
+ public void commit(CommitType commitType, ConsumerCommitCallback callback);
+
+ /**
* @see KafkaConsumer#commit(Map, CommitType)
*/
public void commit(Map<TopicPartition, Long> offsets, CommitType commitType);
/**
+ * @see KafkaConsumer#commit(Map, CommitType, ConsumerCommitCallback)
+ */
+ public void commit(Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback);
+
+ /**
* @see KafkaConsumer#seek(TopicPartition, long)
*/
public void seek(TopicPartition partition, long offset);
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
new file mode 100644
index 0000000..f084385
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+
+/**
+ * A callback interface that the user can implement to trigger custom actions when a commit request completes. The callback
+ * may be executed in any thread calling {@link Consumer#poll(long) poll()}.
+ */
+public interface ConsumerCommitCallback {
+
+ /**
+ * A callback method the user can implement to provide asynchronous handling of commit request completion.
+ * This method will be called when the commit request sent to the server has been acknowledged.
+ *
+ * @param offsets A map of the offsets that this callback applies to
+ * @param exception The exception thrown during processing of the request, or null if the commit completed successfully
+ */
+ void onComplete(Map<TopicPartition, Long> offsets, Exception exception);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
index 74dfdba..ff3f50f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
@@ -73,7 +73,8 @@ public interface ConsumerRebalanceCallback {
* It is guaranteed that all the processes in a consumer group will execute their
* {@link #onPartitionsRevoked(Consumer, Collection)} callback before any instance executes its
* {@link #onPartitionsAssigned(Consumer, Collection)} callback.
- *
+ *
+ * @param consumer Reference to the consumer for convenience
* @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously
* assigned to the consumer)
*/
@@ -86,7 +87,8 @@ public interface ConsumerRebalanceCallback {
* custom offset store to prevent duplicate data
* <p>
* For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
- *
+ *
+ * @param consumer Reference to the consumer for convenience
* @param partitions The list of partitions that were assigned to the consumer on the last rebalance
*/
public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index eb75d2e..16a8357 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -105,6 +105,10 @@ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
}
}
+ public boolean isEmpty() {
+ return records.isEmpty();
+ }
+
@SuppressWarnings("unchecked")
public static <K, V> ConsumerRecords<K, V> empty() {
return (ConsumerRecords<K, V>) EMPTY;
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index b4e8f7f..9f64255 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -15,9 +15,10 @@ package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Coordinator;
+import org.apache.kafka.clients.consumer.internals.DelayedTask;
import org.apache.kafka.clients.consumer.internals.Fetcher;
-import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
@@ -49,7 +50,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -394,8 +394,6 @@ import static org.apache.kafka.common.utils.Utils.min;
public class KafkaConsumer<K, V> implements Consumer<K, V> {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
- private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
- private static final long LATEST_OFFSET_TIMESTAMP = -1L;
private static final long NO_CURRENT_THREAD = -1L;
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
@@ -405,17 +403,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final Fetcher<K, V> fetcher;
private final Time time;
- private final NetworkClient client;
+ private final ConsumerNetworkClient client;
private final Metrics metrics;
private final SubscriptionState subscriptions;
private final Metadata metadata;
private final long retryBackoffMs;
private final boolean autoCommit;
private final long autoCommitIntervalMs;
- private final ConsumerRebalanceCallback rebalanceCallback;
- private long lastCommitAttemptMs;
private boolean closed = false;
- private final AtomicBoolean wakeup = new AtomicBoolean(false);
// currentThread holds the threadId of the current thread accessing KafkaConsumer
// and is used to prevent multi-threaded access
@@ -507,14 +502,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
try {
log.debug("Starting the Kafka consumer");
if (callback == null)
- this.rebalanceCallback = config.getConfiguredInstance(ConsumerConfig.CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
+ callback = config.getConfiguredInstance(ConsumerConfig.CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
ConsumerRebalanceCallback.class);
- else
- this.rebalanceCallback = callback;
this.time = new SystemTime();
this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
- this.lastCommitAttemptMs = time.milliseconds();
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
@@ -535,7 +527,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
String metricGrpPrefix = "consumer";
Map<String, String> metricsTags = new LinkedHashMap<String, String>();
metricsTags.put("client-id", clientId);
- this.client = new NetworkClient(
+ NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags),
this.metadata,
clientId,
@@ -543,6 +535,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG));
+ this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
this.subscriptions = new SubscriptionState(offsetResetStrategy);
this.coordinator = new Coordinator(this.client,
@@ -553,8 +546,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
metrics,
metricGrpPrefix,
metricsTags,
- this.time);
-
+ this.time,
+ requestTimeoutMs,
+ retryBackoffMs,
+ wrapRebalanceCallback(callback));
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
@@ -581,10 +576,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
metrics,
metricGrpPrefix,
metricsTags,
- this.time);
+ this.time,
+ this.retryBackoffMs);
config.logUnused();
+ if (autoCommit)
+ scheduleAutoCommitTask(autoCommitIntervalMs);
+
log.debug("Kafka consumer created");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
@@ -719,27 +718,25 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative");
- // Poll for new data until the timeout expires
+ // poll for new data until the timeout expires
long remaining = timeout;
while (remaining >= 0) {
long start = time.milliseconds();
- long pollTimeout = min(remaining, timeToNextCommit(start), coordinator.timeToNextHeartbeat(start));
-
- Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(pollTimeout, start);
+ Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
long end = time.milliseconds();
if (!records.isEmpty()) {
- // If data is available, then return it, but first send off the
+ // if data is available, then return it, but first send off the
// next round of fetches to enable pipelining while the user is
// handling the fetched records.
- fetcher.initFetches(metadata.fetch(), end);
- pollClient(0, end);
+ fetcher.initFetches(metadata.fetch());
+ client.poll(0);
return new ConsumerRecords<K, V>(records);
}
remaining -= end - start;
- // Nothing was available, so we should backoff before retrying
+ // nothing was available, so we should backoff before retrying
if (remaining > 0) {
Utils.sleep(min(remaining, retryBackoffMs));
remaining -= time.milliseconds() - end;
@@ -752,46 +749,42 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
}
-
/**
* Do one round of polling. In addition to checking for new data, this does any needed
* heart-beating, auto-commits, and offset updates.
* @param timeout The maximum time to block in the underlying poll
- * @param now Current time in millis
* @return The fetched records (may be empty)
*/
- private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout, long now) {
- Cluster cluster = this.metadata.fetch();
-
+ private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
// TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
+ coordinator.ensureCoordinatorKnown();
- if (subscriptions.partitionsAutoAssigned()) {
- if (subscriptions.partitionAssignmentNeeded()) {
- // rebalance to get partition assignment
- reassignPartitions(now);
- } else {
- // try to heartbeat with the coordinator if needed
- coordinator.maybeHeartbeat(now);
- }
- }
+ // ensure we have partitions assigned if we expect to
+ if (subscriptions.partitionsAutoAssigned())
+ coordinator.ensurePartitionAssignment();
// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
- // maybe autocommit position
- if (shouldAutoCommit(now))
- commit(CommitType.ASYNC);
-
- // Init any new fetches (won't resend pending fetches)
- fetcher.initFetches(cluster, now);
-
- pollClient(timeout, now);
-
+ // init any new fetches (won't resend pending fetches)
+ Cluster cluster = this.metadata.fetch();
+ fetcher.initFetches(cluster);
+ client.poll(timeout);
return fetcher.fetchedRecords();
}
+ private void scheduleAutoCommitTask(final long interval) {
+ DelayedTask task = new DelayedTask() {
+ public void run(long now) {
+ commit(CommitType.ASYNC);
+ client.schedule(this, now + interval);
+ }
+ };
+ client.schedule(task, time.milliseconds() + interval);
+ }
+
/**
* Commits the specified offsets for the specified list of topics and partitions to Kafka.
* <p>
@@ -799,25 +792,42 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used.
* <p>
- * A non-blocking commit will attempt to commit offsets asynchronously. No error will be thrown if the commit fails.
- * A blocking commit will wait for a response acknowledging the commit. In the event of an error it will retry until
- * the commit succeeds.
- *
+ * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
+ * commit are silently discarded. If you need to determine the result of an asynchronous commit, you should use
+ * {@link #commit(Map, CommitType, ConsumerCommitCallback)}. Synchronous commits (i.e. {@link CommitType#SYNC})
+ * block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown
+ * to the caller).
+ *
* @param offsets The list of offsets per partition that should be committed to Kafka.
* @param commitType Control whether the commit is blocking
*/
@Override
public void commit(final Map<TopicPartition, Long> offsets, CommitType commitType) {
+ commit(offsets, commitType, null);
+ }
+
+ /**
+ * Commits the specified offsets for the specified list of topics and partitions to Kafka.
+ * <p>
+ * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
+ * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
+ * should not be used.
+ * <p>
+ * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
+ * commit are either passed to the callback (if provided) or silently discarded. Synchronous commits (i.e.
+ * {@link CommitType#SYNC}) block until either the commit succeeds or an unrecoverable error is encountered. In
+ * this case, the error is either passed to the callback (if provided) or thrown to the caller.
+ *
+ * @param offsets The list of offsets per partition that should be committed to Kafka.
+ * @param commitType Control whether the commit is blocking
+ * @param callback Callback to invoke when the commit completes
+ */
+ @Override
+ public void commit(final Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback) {
acquire();
try {
log.debug("Committing offsets ({}): {} ", commitType.toString().toLowerCase(), offsets);
-
- this.lastCommitAttemptMs = time.milliseconds();
-
- // commit the offsets with the coordinator
- if (commitType == CommitType.ASYNC)
- this.subscriptions.needRefreshCommits();
- commitOffsets(offsets, commitType);
+ coordinator.commitOffsets(offsets, commitType, callback);
} finally {
release();
}
@@ -829,22 +839,48 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
* every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used.
- *
+ * <p>
+ * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
+ * commit are either passed to the callback (if provided) or silently discarded. Synchronous commits (i.e.
+ * {@link CommitType#SYNC}) block until either the commit succeeds or an unrecoverable error is encountered. In
+ * this case, the error is either passed to the callback (if provided) or thrown to the caller.
+ *
* @param commitType Whether or not the commit should block until it is acknowledged.
+ * @param callback Callback to invoke when the commit completes
*/
@Override
- public void commit(CommitType commitType) {
+ public void commit(CommitType commitType, ConsumerCommitCallback callback) {
acquire();
try {
- // Need defensive copy to ensure offsets are not removed before completion (e.g. in rebalance)
+ // need defensive copy to ensure offsets are not removed before completion (e.g. in rebalance)
Map<TopicPartition, Long> allConsumed = new HashMap<TopicPartition, Long>(this.subscriptions.allConsumed());
- commit(allConsumed, commitType);
+ commit(allConsumed, commitType, callback);
} finally {
release();
}
}
/**
+ * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
+ * <p>
+ * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
+ * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
+ * should not be used.
+ * <p>
+ * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
+ * commit are silently discarded. If you need to determine the result of an asynchronous commit, you should use
+ * {@link #commit(CommitType, ConsumerCommitCallback)}. Synchronous commits (i.e. {@link CommitType#SYNC})
+ * block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown
+ * to the caller).
+ *
+ * @param commitType Whether or not the commit should block until it is acknowledged.
+ */
+ @Override
+ public void commit(CommitType commitType) {
+ commit(commitType, null);
+ }
+
+ /**
* Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API
* is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
* you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
@@ -868,8 +904,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
try {
Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
: Arrays.asList(partitions);
- for (TopicPartition tp : parts)
+ for (TopicPartition tp : parts) {
+ log.debug("Seeking to beginning of partition {}", tp);
subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
+ }
} finally {
release();
}
@@ -883,8 +921,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
try {
Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
: Arrays.asList(partitions);
- for (TopicPartition tp : parts)
+ for (TopicPartition tp : parts) {
+ log.debug("Seeking to end of partition {}", tp);
subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
+ }
} finally {
release();
}
@@ -931,19 +971,21 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public long committed(TopicPartition partition) {
acquire();
try {
- Set<TopicPartition> partitionsToFetch;
+ Long committed;
if (subscriptions.assignedPartitions().contains(partition)) {
- Long committed = this.subscriptions.committed(partition);
- if (committed != null)
- return committed;
- partitionsToFetch = subscriptions.assignedPartitions();
+ committed = this.subscriptions.committed(partition);
+ if (committed == null) {
+ coordinator.refreshCommittedOffsetsIfNeeded();
+ committed = this.subscriptions.committed(partition);
+ }
} else {
- partitionsToFetch = Collections.singleton(partition);
+ Map<TopicPartition, Long> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition));
+ committed = offsets.get(partition);
}
- refreshCommittedOffsets(partitionsToFetch);
- Long committed = this.subscriptions.committed(partition);
+
if (committed == null)
throw new NoOffsetForPartitionException("No offset has been committed for partition " + partition);
+
return committed;
} finally {
release();
@@ -973,7 +1015,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
if (parts == null) {
metadata.add(topic);
- awaitMetadataUpdate();
+ client.awaitMetadataUpdate();
parts = metadata.fetch().partitionsForTopic(topic);
}
return parts;
@@ -999,7 +1041,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void wakeup() {
- this.wakeup.set(true);
this.client.wakeup();
}
@@ -1017,55 +1058,18 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
}
+ private Coordinator.RebalanceCallback wrapRebalanceCallback(final ConsumerRebalanceCallback callback) {
+ return new Coordinator.RebalanceCallback() {
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+ callback.onPartitionsAssigned(KafkaConsumer.this, partitions);
+ }
- private boolean shouldAutoCommit(long now) {
- return this.autoCommit && this.lastCommitAttemptMs <= now - this.autoCommitIntervalMs;
- }
-
- private long timeToNextCommit(long now) {
- if (!this.autoCommit)
- return Long.MAX_VALUE;
- long timeSinceLastCommit = now - this.lastCommitAttemptMs;
- if (timeSinceLastCommit > this.autoCommitIntervalMs)
- return 0;
- return this.autoCommitIntervalMs - timeSinceLastCommit;
- }
-
- /**
- * Request a metadata update and wait until it has occurred
- */
- private void awaitMetadataUpdate() {
- int version = this.metadata.requestUpdate();
- do {
- long now = time.milliseconds();
- this.pollClient(this.retryBackoffMs, now);
- } while (this.metadata.version() == version);
- }
-
- /**
- * Get partition assignment
- */
- private void reassignPartitions(long now) {
- // execute the user's callback before rebalance
- log.debug("Revoking previously assigned partitions {}", this.subscriptions.assignedPartitions());
- try {
- this.rebalanceCallback.onPartitionsRevoked(this, this.subscriptions.assignedPartitions());
- } catch (Exception e) {
- log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
- + " failed on partition revocation: ", e);
- }
-
- // get new assigned partitions from the coordinator
- assignPartitions();
-
- // execute the user's callback after rebalance
- log.debug("Setting newly assigned partitions {}", this.subscriptions.assignedPartitions());
- try {
- this.rebalanceCallback.onPartitionsAssigned(this, this.subscriptions.assignedPartitions());
- } catch (Exception e) {
- log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
- + " failed on partition assignment: ", e);
- }
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ callback.onPartitionsRevoked(KafkaConsumer.this, partitions);
+ }
+ };
}
/**
@@ -1077,267 +1081,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* defined
*/
private void updateFetchPositions(Set<TopicPartition> partitions) {
- // first refresh the committed positions in case they are not up-to-date
- refreshCommittedOffsets(partitions);
-
- // reset the fetch position to the committed position
- for (TopicPartition tp : partitions) {
- // Skip if we already have a fetch position
- if (subscriptions.fetched(tp) != null)
- continue;
-
- // TODO: If there are several offsets to reset, we could submit offset requests in parallel
- if (subscriptions.isOffsetResetNeeded(tp)) {
- resetOffset(tp);
- } else if (subscriptions.committed(tp) == null) {
- // There's no committed position, so we need to reset with the default strategy
- subscriptions.needOffsetReset(tp);
- resetOffset(tp);
- } else {
- log.debug("Resetting offset for partition {} to the committed offset {}",
- tp, subscriptions.committed(tp));
- subscriptions.seek(tp, subscriptions.committed(tp));
- }
- }
- }
+ // refresh commits for all assigned partitions
+ coordinator.refreshCommittedOffsetsIfNeeded();
- /**
- * Reset offsets for the given partition using the offset reset strategy.
- *
- * @param partition The given partition that needs reset offset
- * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined
- */
- private void resetOffset(TopicPartition partition) {
- OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
- final long timestamp;
- if (strategy == OffsetResetStrategy.EARLIEST)
- timestamp = EARLIEST_OFFSET_TIMESTAMP;
- else if (strategy == OffsetResetStrategy.LATEST)
- timestamp = LATEST_OFFSET_TIMESTAMP;
- else
- throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
-
- log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase());
- long offset = listOffset(partition, timestamp);
- this.subscriptions.seek(partition, offset);
- }
-
- /**
- * Fetch a single offset before the given timestamp for the partition.
- *
- * @param partition The partition that needs fetching offset.
- * @param timestamp The timestamp for fetching offset.
- * @return The offset of the message that is published before the given timestamp
- */
- private long listOffset(TopicPartition partition, long timestamp) {
- while (true) {
- RequestFuture<Long> future = fetcher.listOffset(partition, timestamp);
-
- if (!future.isDone())
- pollFuture(future, requestTimeoutMs);
-
- if (future.isDone()) {
- if (future.succeeded())
- return future.value();
- handleRequestFailure(future);
- }
- }
- }
-
- /**
- * Refresh the committed offsets for given set of partitions and update the cache
- */
- private void refreshCommittedOffsets(Set<TopicPartition> partitions) {
- // we only need to fetch latest committed offset from coordinator if there
- // is some commit process in progress, otherwise our current
- // committed cache is up-to-date
- if (subscriptions.refreshCommitsNeeded()) {
- // contact coordinator to fetch committed offsets
- Map<TopicPartition, Long> offsets = fetchCommittedOffsets(partitions);
-
- // update the position with the offsets
- for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
- TopicPartition tp = entry.getKey();
- this.subscriptions.committed(tp, entry.getValue());
- }
- }
- }
-
- /**
- * Block until we have received a partition assignment from the coordinator.
- */
- private void assignPartitions() {
- // Ensure that there are no pending requests to the coordinator. This is important
- // in particular to avoid resending a pending JoinGroup request.
- awaitCoordinatorInFlightRequests();
-
- while (subscriptions.partitionAssignmentNeeded()) {
- RequestFuture<Void> future = coordinator.assignPartitions(time.milliseconds());
-
- // Block indefinitely for the join group request (which can take as long as a session timeout)
- if (!future.isDone())
- pollFuture(future);
-
- if (future.failed())
- handleRequestFailure(future);
- }
- }
-
- /**
- * Block until the coordinator for this group is known.
- */
- private void ensureCoordinatorKnown() {
- while (coordinator.coordinatorUnknown()) {
- RequestFuture<Void> future = coordinator.discoverConsumerCoordinator();
-
- if (!future.isDone())
- pollFuture(future, requestTimeoutMs);
-
- if (future.failed())
- handleRequestFailure(future);
- }
- }
-
- /**
- * Block until any pending requests to the coordinator have been handled.
- */
- public void awaitCoordinatorInFlightRequests() {
- while (coordinator.hasInFlightRequests()) {
- long now = time.milliseconds();
- pollClient(-1, now);
- }
- }
-
- /**
- * Lookup the committed offsets for a set of partitions. This will block until the coordinator has
- * responded to the offset fetch request.
- * @param partitions List of partitions to get offsets for
- * @return Map from partition to its respective offset
- */
- private Map<TopicPartition, Long> fetchCommittedOffsets(Set<TopicPartition> partitions) {
- while (true) {
- long now = time.milliseconds();
- RequestFuture<Map<TopicPartition, Long>> future = coordinator.fetchOffsets(partitions, now);
-
- if (!future.isDone())
- pollFuture(future, requestTimeoutMs);
-
- if (future.isDone()) {
- if (future.succeeded())
- return future.value();
- handleRequestFailure(future);
- }
- }
- }
-
- /**
- * Commit offsets. This call blocks (regardless of commitType) until the coordinator
- * can receive the commit request. Once the request has been made, however, only the
- * synchronous commits will wait for a successful response from the coordinator.
- * @param offsets Offsets to commit.
- * @param commitType Commit policy
- */
- private void commitOffsets(Map<TopicPartition, Long> offsets, CommitType commitType) {
- if (commitType == CommitType.ASYNC) {
- commitOffsetsAsync(offsets);
- } else {
- commitOffsetsSync(offsets);
- }
- }
-
- private void commitOffsetsAsync(Map<TopicPartition, Long> offsets) {
- while (true) {
- long now = time.milliseconds();
- RequestFuture<Void> future = coordinator.commitOffsets(offsets, now);
-
- if (!future.isDone() || future.succeeded())
- return;
-
- handleRequestFailure(future);
- }
- }
-
- private void commitOffsetsSync(Map<TopicPartition, Long> offsets) {
- while (true) {
- long now = time.milliseconds();
- RequestFuture<Void> future = coordinator.commitOffsets(offsets, now);
-
- if (!future.isDone())
- pollFuture(future, requestTimeoutMs);
-
- if (future.isDone()) {
- if (future.succeeded())
- return;
- else
- handleRequestFailure(future);
- }
- }
- }
-
- private void handleRequestFailure(RequestFuture<?> future) {
- if (future.hasException())
- throw future.exception();
-
- switch (future.retryAction()) {
- case BACKOFF:
- Utils.sleep(retryBackoffMs);
- break;
- case POLL:
- pollClient(retryBackoffMs, time.milliseconds());
- break;
- case FIND_COORDINATOR:
- ensureCoordinatorKnown();
- break;
- case REFRESH_METADATA:
- awaitMetadataUpdate();
- break;
- case NOOP:
- // Do nothing (retry now)
- }
- }
-
- /**
- * Poll until a result is ready or timeout expires
- * @param future The future to poll for
- * @param timeout The time in milliseconds to wait for the result
- */
- private void pollFuture(RequestFuture<?> future, long timeout) {
- // TODO: Update this code for KAFKA-2120, which adds request timeout to NetworkClient
- // In particular, we must ensure that "timed out" requests will not have their callbacks
- // invoked at a later time.
- long remaining = timeout;
- while (!future.isDone() && remaining >= 0) {
- long start = time.milliseconds();
- pollClient(remaining, start);
- if (future.isDone()) return;
- remaining -= time.milliseconds() - start;
- }
- }
-
- /**
- * Poll indefinitely until the result is ready.
- * @param future The future to poll for.
- */
- private void pollFuture(RequestFuture<?> future) {
- while (!future.isDone()) {
- long now = time.milliseconds();
- pollClient(-1, now);
- }
- }
-
- /**
- * Poll for IO.
- * @param timeout The maximum time to wait for IO to become available
- * @param now The current time in milliseconds
- * @throws ConsumerWakeupException if {@link #wakeup()} is invoked while the poll is active
- */
- private void pollClient(long timeout, long now) {
- this.client.poll(timeout, now);
-
- if (wakeup.get()) {
- wakeup.set(false);
- throw new ConsumerWakeupException();
- }
+ // then do any offset lookups in case some positions are not known
+ fetcher.updateFetchPositions(partitions);
}
/*
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 46e26a6..c14eed1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -106,16 +106,29 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
}
@Override
- public synchronized void commit(Map<TopicPartition, Long> offsets, CommitType commitType) {
+ public synchronized void commit(Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback) {
ensureNotClosed();
for (Entry<TopicPartition, Long> entry : offsets.entrySet())
subscriptions.committed(entry.getKey(), entry.getValue());
+ if (callback != null) {
+ callback.onComplete(offsets, null);
+ }
}
@Override
- public synchronized void commit(CommitType commitType) {
+ public synchronized void commit(Map<TopicPartition, Long> offsets, CommitType commitType) {
+ commit(offsets, commitType, null);
+ }
+
+ @Override
+ public synchronized void commit(CommitType commitType, ConsumerCommitCallback callback) {
ensureNotClosed();
- commit(this.subscriptions.allConsumed(), commitType);
+ commit(this.subscriptions.allConsumed(), commitType, callback);
+ }
+
+ @Override
+ public synchronized void commit(CommitType commitType) {
+ commit(commitType, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
new file mode 100644
index 0000000..9517d9d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.RequestSend;
+import org.apache.kafka.common.utils.Time;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Higher level consumer access to the network layer with basic support for futures and
+ * task scheduling. NOT thread-safe!
+ *
+ * TODO: The current implementation is simplistic in that it provides a facility for queueing requests
+ * prior to delivery, but it makes no effort to retry requests which cannot be sent at the time
+ * {@link #poll(long)} is called. This makes the behavior of the queue predictable and easy to
+ * understand, but there are opportunities to provide timeout or retry capabilities in the future.
+ * How we do this may depend on KAFKA-2120, so for now, we retain the simplistic behavior.
+ */
+public class ConsumerNetworkClient implements Closeable {
+ private final KafkaClient client;
+ private final AtomicBoolean wakeup = new AtomicBoolean(false);
+ private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue();
+ private final Map<Node, List<ClientRequest>> unsent = new HashMap<Node, List<ClientRequest>>();
+ private final Metadata metadata;
+ private final Time time;
+ private final long retryBackoffMs;
+
+ public ConsumerNetworkClient(KafkaClient client,
+ Metadata metadata,
+ Time time,
+ long retryBackoffMs) {
+ this.client = client;
+ this.metadata = metadata;
+ this.time = time;
+ this.retryBackoffMs = retryBackoffMs;
+ }
+
+ /**
+ * Schedule a new task to be executed at the given time. This is "best-effort" scheduling and
+ * should only be used for coarse synchronization.
+ * @param task The task to be scheduled
+ * @param at The time it should run
+ */
+ public void schedule(DelayedTask task, long at) {
+ delayedTasks.add(task, at);
+ }
+
+ /**
+ * Unschedule a task. This will remove all instances of the task from the task queue.
+ * This is a no-op if the task is not scheduled.
+ * @param task The task to be unscheduled.
+ */
+ public void unschedule(DelayedTask task) {
+ delayedTasks.remove(task);
+ }
+
+ /**
+ * Send a new request. Note that the request is not actually transmitted on the
+ * network until one of the {@link #poll(long)} variants is invoked. At this
+ * point the request will either be transmitted successfully or will fail.
+ * Use the returned future to obtain the result of the send.
+ * @param node The destination of the request
+ * @param api The Kafka API call
+ * @param request The request payload
+ * @return A future which indicates the result of the send.
+ */
+ public RequestFuture<ClientResponse> send(Node node,
+ ApiKeys api,
+ AbstractRequest request) {
+ long now = time.milliseconds();
+ RequestFutureCompletionHandler future = new RequestFutureCompletionHandler();
+ RequestHeader header = client.nextRequestHeader(api);
+ RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
+ put(node, new ClientRequest(now, true, send, future));
+ return future;
+ }
+
+ private void put(Node node, ClientRequest request) {
+ List<ClientRequest> nodeUnsent = unsent.get(node);
+ if (nodeUnsent == null) {
+ nodeUnsent = new ArrayList<ClientRequest>();
+ unsent.put(node, nodeUnsent);
+ }
+ nodeUnsent.add(request);
+ }
+
+ public Node leastLoadedNode() {
+ return client.leastLoadedNode(time.milliseconds());
+ }
+
+ /**
+ * Block until the metadata has been refreshed.
+ */
+ public void awaitMetadataUpdate() {
+ int version = this.metadata.requestUpdate();
+ do {
+ poll(Long.MAX_VALUE);
+ } while (this.metadata.version() == version);
+ }
+
+ /**
+ * Wakeup an active poll. This will cause the polling thread to throw an exception either
+ * on the current poll if one is active, or the next poll.
+ */
+ public void wakeup() {
+ this.wakeup.set(true);
+ this.client.wakeup();
+ }
+
+ /**
+ * Block indefinitely until the given request future has finished.
+ * @param future The request future to await.
+ * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread
+ */
+ public void poll(RequestFuture<?> future) {
+ while (!future.isDone())
+ poll(Long.MAX_VALUE);
+ }
+
+ /**
+ * Block until the provided request future request has finished or the timeout has expired.
+ * @param future The request future to wait for
+ * @param timeout The maximum duration (in ms) to wait for the request
+ * @return true if the future is done, false otherwise
+ * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread
+ */
+ public boolean poll(RequestFuture<?> future, long timeout) {
+ long now = time.milliseconds();
+ long deadline = now + timeout;
+ while (!future.isDone() && now < deadline) {
+ poll(deadline - now, now);
+ now = time.milliseconds();
+ }
+ return future.isDone();
+ }
+
+ /**
+ * Poll for any network IO. All send requests will either be transmitted on the network
+ * or failed when this call completes.
+ * @param timeout The maximum time to wait for an IO event.
+ * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread
+ */
+ public void poll(long timeout) {
+ poll(timeout, time.milliseconds());
+ }
+
+ private void poll(long timeout, long now) {
+ // send all the requests we can send now
+ pollUnsentRequests(now);
+
+ // ensure we don't poll any longer than the deadline for
+ // the next scheduled task
+ timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
+ clientPoll(timeout, now);
+
+ // execute scheduled tasks
+ now = time.milliseconds();
+ delayedTasks.poll(now);
+
+ // try again to send requests since buffer space may have been
+ // cleared or a connect finished in the poll
+ pollUnsentRequests(now);
+
+ // fail all requests that couldn't be sent
+ clearUnsentRequests(now);
+
+ }
+
+ /**
+ * Block until all pending requests from the given node have finished.
+ * @param node The node to await requests from
+ */
+ public void awaitPendingRequests(Node node) {
+ while (pendingRequestCount(node) > 0)
+ poll(retryBackoffMs);
+ }
+
+ /**
+ * Get the count of pending requests to the given node. This includes both request that
+ * have been transmitted (i.e. in-flight requests) and those which are awaiting transmission.
+ * @param node The node in question
+ * @return The number of pending requests
+ */
+ public int pendingRequestCount(Node node) {
+ List<ClientRequest> pending = unsent.get(node);
+ int unsentCount = pending == null ? 0 : pending.size();
+ return unsentCount + client.inFlightRequestCount(node.idString());
+ }
+
+ /**
+ * Get the total count of pending requests from all nodes. This includes both requests that
+ * have been transmitted (i.e. in-flight requests) and those which are awaiting transmission.
+ * @return The total count of pending requests
+ */
+ public int pendingRequestCount() {
+ int total = 0;
+ for (List<ClientRequest> requests: unsent.values())
+ total += requests.size();
+ return total + client.inFlightRequestCount();
+ }
+
+ private void pollUnsentRequests(long now) {
+ while (trySend(now))
+ clientPoll(0, now);
+ }
+
+ private void clearUnsentRequests(long now) {
+ // clear all unsent requests and fail their corresponding futures
+ for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
+ Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
+ while (iterator.hasNext()) {
+ ClientRequest request = iterator.next();
+ RequestFutureCompletionHandler handler =
+ (RequestFutureCompletionHandler) request.callback();
+ handler.raise(SendFailedException.INSTANCE);
+ iterator.remove();
+ }
+ }
+ unsent.clear();
+ }
+
+ private boolean trySend(long now) {
+ // send any requests that can be sent now
+ boolean requestsSent = false;
+ for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
+ Node node = requestEntry.getKey();
+ Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
+ while (iterator.hasNext()) {
+ ClientRequest request = iterator.next();
+ if (client.ready(node, now)) {
+ client.send(request);
+ iterator.remove();
+ requestsSent = true;
+ } else if (client.connectionFailed(node)) {
+ RequestFutureCompletionHandler handler =
+ (RequestFutureCompletionHandler) request.callback();
+ handler.onComplete(new ClientResponse(request, now, true, null));
+ iterator.remove();
+ }
+ }
+ }
+ return requestsSent;
+ }
+
+ private void clientPoll(long timeout, long now) {
+ client.poll(timeout, now);
+ if (wakeup.get()) {
+ clearUnsentRequests(now);
+ wakeup.set(false);
+ throw new ConsumerWakeupException();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ client.close();
+ }
+
+ public static class RequestFutureCompletionHandler
+ extends RequestFuture<ClientResponse>
+ implements RequestCompletionHandler {
+
+ @Override
+ public void onComplete(ClientResponse response) {
+ complete(response);
+ }
+ }
+}
[2/3] kafka git commit: KAFKA-2123: add callback in commit api and
use a delayed queue for async requests;
reviewed by Ewen Cheslack-Postava and Guozhang Wang
Posted by gu...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index c1c8172..8e3cd09 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -12,14 +12,14 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
-import org.apache.kafka.clients.KafkaClient;
-import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.CommitType;
+import org.apache.kafka.clients.consumer.ConsumerCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -30,7 +30,6 @@ import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.ConsumerMetadataRequest;
import org.apache.kafka.common.requests.ConsumerMetadataResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
@@ -41,15 +40,15 @@ import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
-import org.apache.kafka.common.requests.RequestHeader;
-import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -62,23 +61,27 @@ public final class Coordinator {
private static final Logger log = LoggerFactory.getLogger(Coordinator.class);
- private final KafkaClient client;
-
+ private final ConsumerNetworkClient client;
private final Time time;
private final String groupId;
private final Heartbeat heartbeat;
+ private final HeartbeatTask heartbeatTask;
private final int sessionTimeoutMs;
private final String assignmentStrategy;
private final SubscriptionState subscriptions;
private final CoordinatorMetrics sensors;
+ private final long requestTimeoutMs;
+ private final long retryBackoffMs;
+ private final RebalanceCallback rebalanceCallback;
private Node consumerCoordinator;
private String consumerId;
private int generation;
+
/**
* Initialize the coordination manager.
*/
- public Coordinator(KafkaClient client,
+ public Coordinator(ConsumerNetworkClient client,
String groupId,
int sessionTimeoutMs,
String assignmentStrategy,
@@ -86,10 +89,13 @@ public final class Coordinator {
Metrics metrics,
String metricGrpPrefix,
Map<String, String> metricTags,
- Time time) {
+ Time time,
+ long requestTimeoutMs,
+ long retryBackoffMs,
+ RebalanceCallback rebalanceCallback) {
- this.time = time;
this.client = client;
+ this.time = time;
this.generation = -1;
this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
this.groupId = groupId;
@@ -98,19 +104,190 @@ public final class Coordinator {
this.sessionTimeoutMs = sessionTimeoutMs;
this.assignmentStrategy = assignmentStrategy;
this.heartbeat = new Heartbeat(this.sessionTimeoutMs, time.milliseconds());
+ this.heartbeatTask = new HeartbeatTask();
this.sensors = new CoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
+ this.requestTimeoutMs = requestTimeoutMs;
+ this.retryBackoffMs = retryBackoffMs;
+ this.rebalanceCallback = rebalanceCallback;
+ }
+
+ /**
+ * Refresh the committed offsets for provided partitions.
+ */
+ public void refreshCommittedOffsetsIfNeeded() {
+ if (subscriptions.refreshCommitsNeeded()) {
+ Map<TopicPartition, Long> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());
+ for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
+ TopicPartition tp = entry.getKey();
+ this.subscriptions.committed(tp, entry.getValue());
+ }
+ this.subscriptions.commitsRefreshed();
+ }
+ }
+
+ /**
+ * Fetch the current committed offsets from the coordinator for a set of partitions.
+ * @param partitions The partitions to fetch offsets for
+ * @return A map from partition to the committed offset
+ */
+ public Map<TopicPartition, Long> fetchCommittedOffsets(Set<TopicPartition> partitions) {
+ while (true) {
+ ensureCoordinatorKnown();
+ ensurePartitionAssignment();
+
+ // contact coordinator to fetch committed offsets
+ RequestFuture<Map<TopicPartition, Long>> future = sendOffsetFetchRequest(partitions);
+ client.poll(future);
+
+ if (future.succeeded())
+ return future.value();
+
+ if (!future.isRetriable())
+ throw future.exception();
+
+ Utils.sleep(retryBackoffMs);
+ }
+ }
+
+ /**
+ * Ensure that we have a valid partition assignment from the coordinator.
+ */
+ public void ensurePartitionAssignment() {
+ if (!subscriptions.partitionAssignmentNeeded())
+ return;
+
+ // execute the user's callback before rebalance
+ log.debug("Revoking previously assigned partitions {}", this.subscriptions.assignedPartitions());
+ try {
+ Set<TopicPartition> revoked = new HashSet<TopicPartition>(subscriptions.assignedPartitions());
+ rebalanceCallback.onPartitionsRevoked(revoked);
+ } catch (Exception e) {
+ log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
+ + " failed on partition revocation: ", e);
+ }
+
+ reassignPartitions();
+
+ // execute the user's callback after rebalance
+ log.debug("Setting newly assigned partitions {}", this.subscriptions.assignedPartitions());
+ try {
+ Set<TopicPartition> assigned = new HashSet<TopicPartition>(subscriptions.assignedPartitions());
+ rebalanceCallback.onPartitionsAssigned(assigned);
+ } catch (Exception e) {
+ log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
+ + " failed on partition assignment: ", e);
+ }
+ }
+
+ private void reassignPartitions() {
+ while (subscriptions.partitionAssignmentNeeded()) {
+ ensureCoordinatorKnown();
+
+ // ensure that there are no pending requests to the coordinator. This is important
+ // in particular to avoid resending a pending JoinGroup request.
+ if (client.pendingRequestCount(this.consumerCoordinator) > 0) {
+ client.awaitPendingRequests(this.consumerCoordinator);
+ continue;
+ }
+
+ RequestFuture<Void> future = sendJoinGroupRequest();
+ client.poll(future);
+
+ if (future.failed()) {
+ if (!future.isRetriable())
+ throw future.exception();
+ Utils.sleep(retryBackoffMs);
+ }
+ }
+ }
+
+ /**
+ * Block until the coordinator for this group is known.
+ */
+ public void ensureCoordinatorKnown() {
+ while (coordinatorUnknown()) {
+ RequestFuture<Void> future = sendConsumerMetadataRequest();
+ client.poll(future, requestTimeoutMs);
+
+ if (future.failed())
+ client.awaitMetadataUpdate();
+ }
+ }
+
+ /**
+ * Commit offsets. This call blocks (regardless of commitType) until the coordinator
+ * can receive the commit request. Once the request has been made, however, only the
+ * synchronous commits will wait for a successful response from the coordinator.
+ * @param offsets Offsets to commit.
+ * @param commitType Commit policy
+ * @param callback Callback to be executed when the commit request finishes
+ */
+ public void commitOffsets(Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback) {
+ if (commitType == CommitType.ASYNC)
+ commitOffsetsAsync(offsets, callback);
+ else
+ commitOffsetsSync(offsets, callback);
+ }
+
+ private class HeartbeatTask implements DelayedTask {
+
+ public void reset() {
+ // start or restart the heartbeat task to be executed at the next chance
+ long now = time.milliseconds();
+ heartbeat.resetSessionTimeout(now);
+ client.unschedule(this);
+ client.schedule(this, now);
+ }
+
+ @Override
+ public void run(final long now) {
+ if (!subscriptions.partitionsAutoAssigned() ||
+ subscriptions.partitionAssignmentNeeded() ||
+ coordinatorUnknown())
+ // no need to send if we're not using auto-assignment or if we are
+ // awaiting a rebalance
+ return;
+
+ if (heartbeat.sessionTimeoutExpired(now)) {
+ // we haven't received a successful heartbeat in one session interval
+ // so mark the coordinator dead
+ coordinatorDead();
+ return;
+ }
+
+ if (!heartbeat.shouldHeartbeat(now)) {
+ // we don't need to heartbeat now, so reschedule for when we do
+ client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
+ } else {
+ heartbeat.sentHeartbeat(now);
+ RequestFuture<Void> future = sendHeartbeatRequest();
+ future.addListener(new RequestFutureListener<Void>() {
+ @Override
+ public void onSuccess(Void value) {
+ long now = time.milliseconds();
+ heartbeat.receiveHeartbeat(now);
+ long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
+ client.schedule(HeartbeatTask.this, nextHeartbeatTime);
+ }
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ client.schedule(HeartbeatTask.this, retryBackoffMs);
+ }
+ });
+ }
+ }
}
/**
* Send a request to get a new partition assignment. This is a non-blocking call which sends
* a JoinGroup request to the coordinator (if it is available). The returned future must
* be polled to see if the request completed successfully.
- * @param now The current time in milliseconds
* @return A request future whose completion indicates the result of the JoinGroup request.
*/
- public RequestFuture<Void> assignPartitions(final long now) {
- final RequestFuture<Void> future = newCoordinatorRequestFuture(now);
- if (future.isDone()) return future;
+ private RequestFuture<Void> sendJoinGroupRequest() {
+ if (coordinatorUnknown())
+ return RequestFuture.coordinatorNotAvailable();
// send a join group request to the coordinator
List<String> subscribedTopics = new ArrayList<String>(subscriptions.subscribedTopics());
@@ -124,25 +301,20 @@ public final class Coordinator {
// create the request for the coordinator
log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.consumerCoordinator.id());
+ return client.send(consumerCoordinator, ApiKeys.JOIN_GROUP, request)
+ .compose(new JoinGroupResponseHandler());
+ }
- RequestCompletionHandler completionHandler = new RequestCompletionHandler() {
- @Override
- public void onComplete(ClientResponse resp) {
- handleJoinResponse(resp, future);
- }
- };
+ private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, Void> {
- sendCoordinator(ApiKeys.JOIN_GROUP, request.toStruct(), completionHandler, now);
- return future;
- }
+ @Override
+ public JoinGroupResponse parse(ClientResponse response) {
+ return new JoinGroupResponse(response.responseBody());
+ }
- private void handleJoinResponse(ClientResponse response, RequestFuture<Void> future) {
- if (response.wasDisconnected()) {
- handleCoordinatorDisconnect(response);
- future.retryWithNewCoordinator();
- } else {
+ @Override
+ public void handle(JoinGroupResponse joinResponse, RequestFuture<Void> future) {
// process the response
- JoinGroupResponse joinResponse = new JoinGroupResponse(response.responseBody());
short errorCode = joinResponse.errorCode();
if (errorCode == Errors.NONE.code()) {
@@ -152,36 +324,36 @@ public final class Coordinator {
// set the flag to refresh last committed offsets
subscriptions.needRefreshCommits();
- log.debug("Joined group: {}", response);
+ log.debug("Joined group: {}", joinResponse.toStruct());
// record re-assignment time
- this.sensors.partitionReassignments.record(response.requestLatencyMs());
+ sensors.partitionReassignments.record(response.requestLatencyMs());
// update partition assignment
subscriptions.changePartitionAssignment(joinResponse.assignedPartitions());
+ heartbeatTask.reset();
future.complete(null);
} else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()) {
// reset the consumer id and retry immediately
Coordinator.this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
log.info("Attempt to join group {} failed due to unknown consumer id, resetting and retrying.",
groupId);
-
- future.retryNow();
+ future.raise(Errors.UNKNOWN_CONSUMER_ID);
} else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
|| errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
// re-discover the coordinator and retry with backoff
coordinatorDead();
log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.",
groupId);
- future.retryWithNewCoordinator();
+ future.raise(Errors.forCode(errorCode));
} else if (errorCode == Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code()
|| errorCode == Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code()
|| errorCode == Errors.INVALID_SESSION_TIMEOUT.code()) {
// log the error and re-throw the exception
- KafkaException e = Errors.forCode(errorCode).exception();
+ Errors error = Errors.forCode(errorCode);
log.error("Attempt to join group {} failed due to: {}",
- groupId, e.getMessage());
- future.raise(e);
+ groupId, error.exception().getMessage());
+ future.raise(error);
} else {
// unexpected error, throw the exception
future.raise(new KafkaException("Unexpected error in join group response: "
@@ -190,55 +362,134 @@ public final class Coordinator {
}
}
+ private void commitOffsetsAsync(final Map<TopicPartition, Long> offsets, final ConsumerCommitCallback callback) {
+ this.subscriptions.needRefreshCommits();
+ RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
+ if (callback != null) {
+ future.addListener(new RequestFutureListener<Void>() {
+ @Override
+ public void onSuccess(Void value) {
+ callback.onComplete(offsets, null);
+ }
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ callback.onComplete(offsets, e);
+ }
+ });
+ }
+ }
+
+ private void commitOffsetsSync(Map<TopicPartition, Long> offsets, ConsumerCommitCallback callback) {
+ while (true) {
+ ensureCoordinatorKnown();
+ ensurePartitionAssignment();
+
+ RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
+ client.poll(future);
+
+ if (future.succeeded()) {
+ if (callback != null)
+ callback.onComplete(offsets, null);
+ return;
+ }
+
+ if (!future.isRetriable()) {
+ if (callback == null)
+ throw future.exception();
+ else
+ callback.onComplete(offsets, future.exception());
+ return;
+ }
+
+ Utils.sleep(retryBackoffMs);
+ }
+ }
+
/**
* Commit offsets for the specified list of topics and partitions. This is a non-blocking call
* which returns a request future that can be polled in the case of a synchronous commit or ignored in the
* asynchronous case.
*
* @param offsets The list of offsets per partition that should be committed.
- * @param now The current time
* @return A request future whose value indicates whether the commit was successful or not
*/
- public RequestFuture<Void> commitOffsets(final Map<TopicPartition, Long> offsets, long now) {
- final RequestFuture<Void> future = newCoordinatorRequestFuture(now);
- if (future.isDone()) return future;
+ private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, Long> offsets) {
+ if (coordinatorUnknown())
+ return RequestFuture.coordinatorNotAvailable();
- if (offsets.isEmpty()) {
- future.complete(null);
- } else {
- // create the offset commit request
- Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData;
- offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
- for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
- offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), ""));
- OffsetCommitRequest req = new OffsetCommitRequest(this.groupId,
+ if (offsets.isEmpty())
+ return RequestFuture.voidSuccess();
+
+ // create the offset commit request
+ Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData;
+ offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
+ for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
+ offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), ""));
+ OffsetCommitRequest req = new OffsetCommitRequest(this.groupId,
this.generation,
this.consumerId,
OffsetCommitRequest.DEFAULT_RETENTION_TIME,
offsetData);
- RequestCompletionHandler handler = new OffsetCommitCompletionHandler(offsets, future);
- sendCoordinator(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now);
- }
-
- return future;
+ return client.send(consumerCoordinator, ApiKeys.OFFSET_COMMIT, req)
+ .compose(new OffsetCommitResponseHandler(offsets));
}
- private <T> RequestFuture<T> newCoordinatorRequestFuture(long now) {
- if (coordinatorUnknown())
- return RequestFuture.newCoordinatorNeeded();
- if (client.ready(this.consumerCoordinator, now))
- // We have an open connection and we're ready to send
- return new RequestFuture<T>();
+ private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {
+
+ private final Map<TopicPartition, Long> offsets;
+
+ public OffsetCommitResponseHandler(Map<TopicPartition, Long> offsets) {
+ this.offsets = offsets;
+ }
- if (this.client.connectionFailed(this.consumerCoordinator)) {
- coordinatorDead();
- return RequestFuture.newCoordinatorNeeded();
+ @Override
+ public OffsetCommitResponse parse(ClientResponse response) {
+ return new OffsetCommitResponse(response.responseBody());
}
- // The connection has been initiated, so we need to poll to finish it
- return RequestFuture.pollNeeded();
+ @Override
+ public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
+ sensors.commitLatency.record(response.requestLatencyMs());
+ for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
+ TopicPartition tp = entry.getKey();
+ long offset = this.offsets.get(tp);
+ short errorCode = entry.getValue();
+ if (errorCode == Errors.NONE.code()) {
+ log.debug("Committed offset {} for partition {}", offset, tp);
+ subscriptions.committed(tp, offset);
+ } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
+ || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
+ coordinatorDead();
+ future.raise(Errors.forCode(errorCode));
+ return;
+ } else if (errorCode == Errors.OFFSET_METADATA_TOO_LARGE.code()
+ || errorCode == Errors.INVALID_COMMIT_OFFSET_SIZE.code()) {
+ // do not need to throw the exception but just log the error
+ log.error("Error committing partition {} at offset {}: {}",
+ tp,
+ offset,
+ Errors.forCode(errorCode).exception().getMessage());
+ } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()
+ || errorCode == Errors.ILLEGAL_GENERATION.code()) {
+ // need to re-join group
+ subscriptions.needReassignment();
+ future.raise(Errors.forCode(errorCode));
+ return;
+ } else {
+ // do not need to throw the exception but just log the error
+ future.raise(Errors.forCode(errorCode));
+ log.error("Error committing partition {} at offset {}: {}",
+ tp,
+ offset,
+ Errors.forCode(errorCode).exception().getMessage());
+ }
+ }
+
+ future.complete(null);
+ }
}
/**
@@ -246,35 +497,30 @@ public final class Coordinator {
* returned future can be polled to get the actual offsets returned from the broker.
*
* @param partitions The set of partitions to get offsets for.
- * @param now The current time in milliseconds
* @return A request future containing the committed offsets.
*/
- public RequestFuture<Map<TopicPartition, Long>> fetchOffsets(Set<TopicPartition> partitions, long now) {
- final RequestFuture<Map<TopicPartition, Long>> future = newCoordinatorRequestFuture(now);
- if (future.isDone()) return future;
+ private RequestFuture<Map<TopicPartition, Long>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
+ if (coordinatorUnknown())
+ return RequestFuture.coordinatorNotAvailable();
- log.debug("Fetching committed offsets for partitions: " + Utils.join(partitions, ", "));
+ log.debug("Fetching committed offsets for partitions: {}", Utils.join(partitions, ", "));
// construct the request
OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
// send the request with a callback
- RequestCompletionHandler completionHandler = new RequestCompletionHandler() {
- @Override
- public void onComplete(ClientResponse resp) {
- handleOffsetFetchResponse(resp, future);
- }
- };
- sendCoordinator(ApiKeys.OFFSET_FETCH, request.toStruct(), completionHandler, now);
- return future;
+ return client.send(consumerCoordinator, ApiKeys.OFFSET_FETCH, request)
+ .compose(new OffsetFetchResponseHandler());
}
- private void handleOffsetFetchResponse(ClientResponse resp, RequestFuture<Map<TopicPartition, Long>> future) {
- if (resp.wasDisconnected()) {
- handleCoordinatorDisconnect(resp);
- future.retryWithNewCoordinator();
- } else {
- // parse the response to get the offsets
- OffsetFetchResponse response = new OffsetFetchResponse(resp.responseBody());
+ private class OffsetFetchResponseHandler extends CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, Long>> {
+
+ @Override
+ public OffsetFetchResponse parse(ClientResponse response) {
+ return new OffsetFetchResponse(response.responseBody());
+ }
+
+ @Override
+ public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, Long>> future) {
Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(response.responseData().size());
for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition tp = entry.getKey();
@@ -285,19 +531,21 @@ public final class Coordinator {
.getMessage());
if (data.errorCode == Errors.OFFSET_LOAD_IN_PROGRESS.code()) {
// just retry
- future.retryAfterBackoff();
+ future.raise(Errors.OFFSET_LOAD_IN_PROGRESS);
} else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
// re-discover the coordinator and retry
coordinatorDead();
- future.retryWithNewCoordinator();
+ future.raise(Errors.NOT_COORDINATOR_FOR_CONSUMER);
} else if (data.errorCode == Errors.UNKNOWN_CONSUMER_ID.code()
|| data.errorCode == Errors.ILLEGAL_GENERATION.code()) {
// need to re-join group
subscriptions.needReassignment();
+ future.raise(Errors.forCode(data.errorCode));
} else {
future.raise(new KafkaException("Unexpected error in fetch offset response: "
+ Errors.forCode(data.errorCode).exception().getMessage()));
}
+ return;
} else if (data.offset >= 0) {
// record the position with the offset (-1 indicates no committed offset to fetch)
offsets.put(tp, data.offset);
@@ -306,82 +554,47 @@ public final class Coordinator {
}
}
- if (!future.isDone())
- future.complete(offsets);
- }
- }
-
- /**
- * Attempt to heartbeat the consumer coordinator if necessary, and check if the coordinator is still alive.
- *
- * @param now The current time
- */
- public void maybeHeartbeat(long now) {
- if (heartbeat.shouldHeartbeat(now) && coordinatorReady(now)) {
- HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.consumerId);
- sendCoordinator(ApiKeys.HEARTBEAT, req.toStruct(), new HeartbeatCompletionHandler(), now);
- this.heartbeat.sentHeartbeat(now);
+ future.complete(offsets);
}
}
/**
- * Get the time until the next heartbeat is needed.
- * @param now The current time
- * @return The duration in milliseconds before the next heartbeat will be needed.
+ * Send a heartbeat request now (visible only for testing).
*/
- public long timeToNextHeartbeat(long now) {
- return heartbeat.timeToNextHeartbeat(now);
- }
-
- /**
- * Check whether the coordinator has any in-flight requests.
- * @return true if the coordinator has pending requests.
- */
- public boolean hasInFlightRequests() {
- return !coordinatorUnknown() && client.inFlightRequestCount(consumerCoordinator.idString()) > 0;
+ public RequestFuture<Void> sendHeartbeatRequest() {
+ HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.consumerId);
+ return client.send(consumerCoordinator, ApiKeys.HEARTBEAT, req)
+ .compose(new HeartbeatCompletionHandler());
}
public boolean coordinatorUnknown() {
return this.consumerCoordinator == null;
}
- private boolean coordinatorReady(long now) {
- return !coordinatorUnknown() && this.client.ready(this.consumerCoordinator, now);
- }
-
/**
* Discover the current coordinator for the consumer group. Sends a ConsumerMetadata request to
* one of the brokers. The returned future should be polled to get the result of the request.
* @return A request future which indicates the completion of the metadata request
*/
- public RequestFuture<Void> discoverConsumerCoordinator() {
+ private RequestFuture<Void> sendConsumerMetadataRequest() {
// initiate the consumer metadata request
// find a node to ask about the coordinator
- long now = time.milliseconds();
- Node node = this.client.leastLoadedNode(now);
-
+ Node node = this.client.leastLoadedNode();
if (node == null) {
- return RequestFuture.metadataRefreshNeeded();
- } else if (!this.client.ready(node, now)) {
- if (this.client.connectionFailed(node)) {
- return RequestFuture.metadataRefreshNeeded();
- } else {
- return RequestFuture.pollNeeded();
- }
+ // TODO: If there are no brokers left, perhaps we should use the bootstrap set
+ // from configuration?
+ return RequestFuture.noBrokersAvailable();
} else {
- final RequestFuture<Void> future = new RequestFuture<Void>();
-
// create a consumer metadata request
log.debug("Issuing consumer metadata request to broker {}", node.id());
ConsumerMetadataRequest metadataRequest = new ConsumerMetadataRequest(this.groupId);
- RequestCompletionHandler completionHandler = new RequestCompletionHandler() {
- @Override
- public void onComplete(ClientResponse resp) {
- handleConsumerMetadataResponse(resp, future);
- }
- };
- send(node, ApiKeys.CONSUMER_METADATA, metadataRequest.toStruct(), completionHandler, now);
- return future;
+ return client.send(node, ApiKeys.CONSUMER_METADATA, metadataRequest)
+ .compose(new RequestFutureAdapter<ClientResponse, Void>() {
+ @Override
+ public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
+ handleConsumerMetadataResponse(response, future);
+ }
+ });
}
}
@@ -391,7 +604,10 @@ public final class Coordinator {
// parse the response to get the coordinator info if it is not disconnected,
// otherwise we need to request metadata update
if (resp.wasDisconnected()) {
- future.retryAfterMetadataRefresh();
+ future.raise(new DisconnectException());
+ } else if (!coordinatorUnknown()) {
+ // We already found the coordinator, so ignore the request
+ future.complete(null);
} else {
ConsumerMetadataResponse consumerMetadataResponse = new ConsumerMetadataResponse(resp.responseBody());
// use MAX_VALUE - node.id as the coordinator id to mimic separate connections
@@ -401,9 +617,10 @@ public final class Coordinator {
this.consumerCoordinator = new Node(Integer.MAX_VALUE - consumerMetadataResponse.node().id(),
consumerMetadataResponse.node().host(),
consumerMetadataResponse.node().port());
+ heartbeatTask.reset();
future.complete(null);
} else {
- future.retryAfterBackoff();
+ future.raise(Errors.forCode(consumerMetadataResponse.errorCode()));
}
}
}
@@ -418,115 +635,84 @@ public final class Coordinator {
}
}
- /**
- * Handle the case when the request gets cancelled due to coordinator disconnection.
- */
- private void handleCoordinatorDisconnect(ClientResponse response) {
- int correlation = response.request().request().header().correlationId();
- log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected",
- response.request(),
- correlation,
- response.request().request().destination());
-
- // mark the coordinator as dead
- coordinatorDead();
- }
-
-
- private void sendCoordinator(ApiKeys api, Struct request, RequestCompletionHandler handler, long now) {
- send(this.consumerCoordinator, api, request, handler, now);
- }
-
- private void send(Node node, ApiKeys api, Struct request, RequestCompletionHandler handler, long now) {
- RequestHeader header = this.client.nextRequestHeader(api);
- RequestSend send = new RequestSend(node.idString(), header, request);
- this.client.send(new ClientRequest(now, true, send, handler));
- }
+ private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
+ @Override
+ public HeartbeatResponse parse(ClientResponse response) {
+ return new HeartbeatResponse(response.responseBody());
+ }
- private class HeartbeatCompletionHandler implements RequestCompletionHandler {
@Override
- public void onComplete(ClientResponse resp) {
- if (resp.wasDisconnected()) {
- handleCoordinatorDisconnect(resp);
+ public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
+ sensors.heartbeatLatency.record(response.requestLatencyMs());
+ short error = heartbeatResponse.errorCode();
+ if (error == Errors.NONE.code()) {
+ log.debug("Received successful heartbeat response.");
+ future.complete(null);
+ } else if (error == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
+ || error == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
+ log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
+ coordinatorDead();
+ future.raise(Errors.forCode(error));
+ } else if (error == Errors.ILLEGAL_GENERATION.code()) {
+ log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
+ subscriptions.needReassignment();
+ future.raise(Errors.ILLEGAL_GENERATION);
+ } else if (error == Errors.UNKNOWN_CONSUMER_ID.code()) {
+ log.info("Attempt to heart beat failed since consumer id is not valid, reset it and try to re-join group.");
+ consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
+ subscriptions.needReassignment();
+ future.raise(Errors.UNKNOWN_CONSUMER_ID);
} else {
- HeartbeatResponse response = new HeartbeatResponse(resp.responseBody());
- if (response.errorCode() == Errors.NONE.code()) {
- log.debug("Received successful heartbeat response.");
- } else if (response.errorCode() == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
- || response.errorCode() == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
- log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
- coordinatorDead();
- } else if (response.errorCode() == Errors.ILLEGAL_GENERATION.code()) {
- log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
- subscriptions.needReassignment();
- } else if (response.errorCode() == Errors.UNKNOWN_CONSUMER_ID.code()) {
- log.info("Attempt to heart beat failed since consumer id is not valid, reset it and try to re-join group.");
- consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
- subscriptions.needReassignment();
- } else {
- throw new KafkaException("Unexpected error in heartbeat response: "
- + Errors.forCode(response.errorCode()).exception().getMessage());
- }
+ future.raise(new KafkaException("Unexpected error in heartbeat response: "
+ + Errors.forCode(error).exception().getMessage()));
}
- sensors.heartbeatLatency.record(resp.requestLatencyMs());
}
}
- private class OffsetCommitCompletionHandler implements RequestCompletionHandler {
+ private abstract class CoordinatorResponseHandler<R, T>
+ extends RequestFutureAdapter<ClientResponse, T> {
+ protected ClientResponse response;
- private final Map<TopicPartition, Long> offsets;
- private final RequestFuture<Void> future;
+ public abstract R parse(ClientResponse response);
- public OffsetCommitCompletionHandler(Map<TopicPartition, Long> offsets, RequestFuture<Void> future) {
- this.offsets = offsets;
- this.future = future;
- }
+ public abstract void handle(R response, RequestFuture<T> future);
@Override
- public void onComplete(ClientResponse resp) {
- if (resp.wasDisconnected()) {
- handleCoordinatorDisconnect(resp);
- future.retryWithNewCoordinator();
- } else {
- OffsetCommitResponse commitResponse = new OffsetCommitResponse(resp.responseBody());
- for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
- TopicPartition tp = entry.getKey();
- short errorCode = entry.getValue();
- long offset = this.offsets.get(tp);
- if (errorCode == Errors.NONE.code()) {
- log.debug("Committed offset {} for partition {}", offset, tp);
- subscriptions.committed(tp, offset);
- } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
- || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
- coordinatorDead();
- future.retryWithNewCoordinator();
- } else if (errorCode == Errors.OFFSET_METADATA_TOO_LARGE.code()
- || errorCode == Errors.INVALID_COMMIT_OFFSET_SIZE.code()) {
- // do not need to throw the exception but just log the error
- log.error("Error committing partition {} at offset {}: {}",
- tp,
- offset,
- Errors.forCode(errorCode).exception().getMessage());
- } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()
- || errorCode == Errors.ILLEGAL_GENERATION.code()) {
- // need to re-join group
- subscriptions.needReassignment();
- } else {
- // re-throw the exception as these should not happen
- log.error("Error committing partition {} at offset {}: {}",
- tp,
- offset,
- Errors.forCode(errorCode).exception().getMessage());
- }
- }
+ public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
+ this.response = clientResponse;
- if (!future.isDone())
- future.complete(null);
+ if (clientResponse.wasDisconnected()) {
+ int correlation = response.request().request().header().correlationId();
+ log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected",
+ response.request(),
+ correlation,
+ response.request().request().destination());
+
+ // mark the coordinator as dead
+ coordinatorDead();
+ future.raise(new DisconnectException());
+ return;
+ }
+
+ R response = parse(clientResponse);
+ handle(response, future);
+ }
+
+ @Override
+ public void onFailure(RuntimeException e, RequestFuture<T> future) {
+ if (e instanceof DisconnectException) {
+ log.debug("Coordinator request failed", e);
+ coordinatorDead();
}
- sensors.commitLatency.record(resp.requestLatencyMs());
+ future.raise(e);
}
}
+ public interface RebalanceCallback {
+ void onPartitionsAssigned(Collection<TopicPartition> partitions);
+ void onPartitionsRevoked(Collection<TopicPartition> partitions);
+ }
+
private class CoordinatorMetrics {
public final Metrics metrics;
public final String metricGrpName;
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
new file mode 100644
index 0000000..61663f8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+
+public interface DelayedTask {
+
+ /**
+ * Execute the task.
+ * @param now current time in milliseconds
+ */
+ void run(long now);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
new file mode 100644
index 0000000..61cab20
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+/**
+ * Tracks a set of tasks to be executed after a delay.
+ */
+public class DelayedTaskQueue {
+
+ private PriorityQueue<Entry> tasks;
+
+ public DelayedTaskQueue() {
+ tasks = new PriorityQueue<Entry>();
+ }
+
+ /**
+ * Schedule a task for execution in the future.
+ *
+ * @param task the task to execute
+ * @param at the time at which to
+ */
+ public void add(DelayedTask task, long at) {
+ tasks.add(new Entry(task, at));
+ }
+
+ /**
+ * Remove a task from the queue if it is present
+ * @param task the task to be removed
+ * @returns true if a task was removed as a result of this call
+ */
+ public boolean remove(DelayedTask task) {
+ boolean wasRemoved = false;
+ Iterator<Entry> iterator = tasks.iterator();
+ while (iterator.hasNext()) {
+ Entry entry = iterator.next();
+ if (entry.task.equals(task)) {
+ iterator.remove();
+ wasRemoved = true;
+ }
+ }
+ return wasRemoved;
+ }
+
+ /**
+ * Get amount of time in milliseconds until the next event. Returns Long.MAX_VALUE if no tasks are scheduled.
+ *
+ * @return the remaining time in milliseconds
+ */
+ public long nextTimeout(long now) {
+ if (tasks.isEmpty())
+ return Long.MAX_VALUE;
+ else
+ return Math.max(tasks.peek().timeout - now, 0);
+ }
+
+ /**
+ * Run any ready tasks.
+ *
+ * @param now the current time
+ */
+ public void poll(long now) {
+ while (!tasks.isEmpty() && tasks.peek().timeout <= now) {
+ Entry entry = tasks.poll();
+ entry.task.run(now);
+ }
+ }
+
+ private static class Entry implements Comparable<Entry> {
+ DelayedTask task;
+ long timeout;
+
+ public Entry(DelayedTask task, long timeout) {
+ this.task = task;
+ this.timeout = timeout;
+ }
+
+ @Override
+ public int compareTo(Entry entry) {
+ return Long.compare(timeout, entry.timeout);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 695eaf6..d595c1c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -13,17 +13,18 @@
package org.apache.kafka.clients.consumer.internals;
-import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
-import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
-import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@@ -38,7 +39,6 @@ import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
-import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -52,21 +52,24 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* This class manage the fetching process with the brokers.
*/
public class Fetcher<K, V> {
+ private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
+ private static final long LATEST_OFFSET_TIMESTAMP = -1L;
private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
- private final KafkaClient client;
-
+ private final ConsumerNetworkClient client;
private final Time time;
private final int minBytes;
private final int maxWaitMs;
private final int fetchSize;
+ private final long retryBackoffMs;
private final boolean checkCrcs;
private final Metadata metadata;
private final FetchManagerMetrics sensors;
@@ -75,8 +78,7 @@ public class Fetcher<K, V> {
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
-
- public Fetcher(KafkaClient client,
+ public Fetcher(ConsumerNetworkClient client,
int minBytes,
int maxWaitMs,
int fetchSize,
@@ -88,7 +90,8 @@ public class Fetcher<K, V> {
Metrics metrics,
String metricGrpPrefix,
Map<String, String> metricTags,
- Time time) {
+ Time time,
+ long retryBackoffMs) {
this.time = time;
this.client = client;
@@ -105,25 +108,105 @@ public class Fetcher<K, V> {
this.records = new LinkedList<PartitionRecords<K, V>>();
this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
+ this.retryBackoffMs = retryBackoffMs;
}
/**
* Set-up a fetch request for any node that we have assigned partitions for which doesn't have one.
*
* @param cluster The current cluster metadata
- * @param now The current time
*/
- public void initFetches(Cluster cluster, long now) {
- for (ClientRequest request : createFetchRequests(cluster)) {
- Node node = cluster.nodeById(Integer.parseInt(request.request().destination()));
- if (client.ready(node, now)) {
- log.trace("Initiating fetch to node {}: {}", node.id(), request);
- client.send(request);
+ public void initFetches(Cluster cluster) {
+ for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests(cluster).entrySet()) {
+ final FetchRequest fetch = fetchEntry.getValue();
+ client.send(fetchEntry.getKey(), ApiKeys.FETCH, fetch)
+ .addListener(new RequestFutureListener<ClientResponse>() {
+ @Override
+ public void onSuccess(ClientResponse response) {
+ handleFetchResponse(response, fetch);
+ }
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ log.debug("Fetch failed", e);
+ }
+ });
+ }
+ }
+
+ /**
+ * Update the fetch positions for the provided partitions.
+ * @param partitions
+ */
+ public void updateFetchPositions(Set<TopicPartition> partitions) {
+ // reset the fetch position to the committed position
+ for (TopicPartition tp : partitions) {
+ // skip if we already have a fetch position
+ if (subscriptions.fetched(tp) != null)
+ continue;
+
+ // TODO: If there are several offsets to reset, we could submit offset requests in parallel
+ if (subscriptions.isOffsetResetNeeded(tp)) {
+ resetOffset(tp);
+ } else if (subscriptions.committed(tp) == null) {
+ // there's no committed position, so we need to reset with the default strategy
+ subscriptions.needOffsetReset(tp);
+ resetOffset(tp);
+ } else {
+ log.debug("Resetting offset for partition {} to the committed offset {}",
+ tp, subscriptions.committed(tp));
+ subscriptions.seek(tp, subscriptions.committed(tp));
}
}
}
/**
+ * Reset offsets for the given partition using the offset reset strategy.
+ *
+ * @param partition The given partition that needs reset offset
+ * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined
+ */
+ private void resetOffset(TopicPartition partition) {
+ OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
+ final long timestamp;
+ if (strategy == OffsetResetStrategy.EARLIEST)
+ timestamp = EARLIEST_OFFSET_TIMESTAMP;
+ else if (strategy == OffsetResetStrategy.LATEST)
+ timestamp = LATEST_OFFSET_TIMESTAMP;
+ else
+ throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
+
+ log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase());
+ long offset = listOffset(partition, timestamp);
+ this.subscriptions.seek(partition, offset);
+ }
+
+ /**
+ * Fetch a single offset before the given timestamp for the partition.
+ *
+ * @param partition The partition that needs fetching offset.
+ * @param timestamp The timestamp for fetching offset.
+ * @return The offset of the message that is published before the given timestamp
+ */
+ private long listOffset(TopicPartition partition, long timestamp) {
+ while (true) {
+ RequestFuture<Long> future = sendListOffsetRequest(partition, timestamp);
+ client.poll(future);
+
+ if (future.succeeded())
+ return future.value();
+
+ if (!future.isRetriable())
+ throw future.exception();
+
+ if (future.exception() instanceof InvalidMetadataException)
+ client.awaitMetadataUpdate();
+ else
+ Utils.sleep(retryBackoffMs);
+ }
+ }
+
+ /**
* Return the fetched records, empty the record buffer and update the consumed position.
*
* @return The fetched records per partition
@@ -163,37 +246,27 @@ public class Fetcher<K, V> {
* @param timestamp The timestamp for fetching offset.
* @return A response which can be polled to obtain the corresponding offset.
*/
- public RequestFuture<Long> listOffset(final TopicPartition topicPartition, long timestamp) {
+ private RequestFuture<Long> sendListOffsetRequest(final TopicPartition topicPartition, long timestamp) {
Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>(1);
partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1));
- long now = time.milliseconds();
PartitionInfo info = metadata.fetch().partition(topicPartition);
if (info == null) {
metadata.add(topicPartition.topic());
log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition);
- return RequestFuture.metadataRefreshNeeded();
+ return RequestFuture.staleMetadata();
} else if (info.leader() == null) {
log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition);
- return RequestFuture.metadataRefreshNeeded();
- } else if (this.client.ready(info.leader(), now)) {
- final RequestFuture<Long> future = new RequestFuture<Long>();
+ return RequestFuture.leaderNotAvailable();
+ } else {
Node node = info.leader();
ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
- RequestSend send = new RequestSend(node.idString(),
- this.client.nextRequestHeader(ApiKeys.LIST_OFFSETS),
- request.toStruct());
- RequestCompletionHandler completionHandler = new RequestCompletionHandler() {
- @Override
- public void onComplete(ClientResponse resp) {
- handleListOffsetResponse(topicPartition, resp, future);
- }
- };
- ClientRequest clientRequest = new ClientRequest(now, true, send, completionHandler);
- this.client.send(clientRequest);
- return future;
- } else {
- // We initiated a connect to the leader, but we need to poll to finish it.
- return RequestFuture.pollNeeded();
+ return client.send(node, ApiKeys.LIST_OFFSETS, request)
+ .compose(new RequestFutureAdapter<ClientResponse, Long>() {
+ @Override
+ public void onSuccess(ClientResponse response, RequestFuture<Long> future) {
+ handleListOffsetResponse(topicPartition, response, future);
+ }
+ });
}
}
@@ -206,7 +279,7 @@ public class Fetcher<K, V> {
ClientResponse clientResponse,
RequestFuture<Long> future) {
if (clientResponse.wasDisconnected()) {
- future.retryAfterMetadataRefresh();
+ future.raise(new DisconnectException());
} else {
ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody());
short errorCode = lor.responseData().get(topicPartition).errorCode;
@@ -222,11 +295,11 @@ public class Fetcher<K, V> {
|| errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
topicPartition);
- future.retryAfterMetadataRefresh();
+ future.raise(Errors.forCode(errorCode));
} else {
log.error("Attempt to fetch offsets for partition {} failed due to: {}",
topicPartition, Errors.forCode(errorCode).exception().getMessage());
- future.retryAfterMetadataRefresh();
+ future.raise(new StaleMetadataException());
}
}
}
@@ -235,37 +308,31 @@ public class Fetcher<K, V> {
* Create fetch requests for all nodes for which we have assigned partitions
* that have no existing requests in flight.
*/
- private List<ClientRequest> createFetchRequests(Cluster cluster) {
+ private Map<Node, FetchRequest> createFetchRequests(Cluster cluster) {
// create the fetch info
- Map<Integer, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<Integer, Map<TopicPartition, FetchRequest.PartitionData>>();
+ Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<Node, Map<TopicPartition, FetchRequest.PartitionData>>();
for (TopicPartition partition : subscriptions.assignedPartitions()) {
Node node = cluster.leaderFor(partition);
if (node == null) {
metadata.requestUpdate();
- } else if (this.client.inFlightRequestCount(node.idString()) == 0) {
+ } else if (this.client.pendingRequestCount(node) == 0) {
// if there is a leader and no in-flight requests, issue a new fetch
- Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node.id());
+ Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
if (fetch == null) {
fetch = new HashMap<TopicPartition, FetchRequest.PartitionData>();
- fetchable.put(node.id(), fetch);
+ fetchable.put(node, fetch);
}
long offset = this.subscriptions.fetched(partition);
fetch.put(partition, new FetchRequest.PartitionData(offset, this.fetchSize));
}
}
- // create the requests
- List<ClientRequest> requests = new ArrayList<ClientRequest>(fetchable.size());
- for (Map.Entry<Integer, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
- int nodeId = entry.getKey();
- final FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
- RequestSend send = new RequestSend(Integer.toString(nodeId), this.client.nextRequestHeader(ApiKeys.FETCH), fetch.toStruct());
- RequestCompletionHandler handler = new RequestCompletionHandler() {
- public void onComplete(ClientResponse response) {
- handleFetchResponse(response, fetch);
- }
- };
- requests.add(new ClientRequest(time.milliseconds(), true, send, handler));
+ // create the fetches
+ Map<Node, FetchRequest> requests = new HashMap<Node, FetchRequest>();
+ for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
+ Node node = entry.getKey();
+ FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
+ requests.put(node, fetch);
}
return requests;
}
@@ -353,7 +420,6 @@ public class Fetcher<K, V> {
}
}
-
private class FetchManagerMetrics {
public final Metrics metrics;
public final String metricGrpName;
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
index 51eae19..6da8936 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -13,7 +13,7 @@
package org.apache.kafka.clients.consumer.internals;
/**
- * A helper class for managing the heartbeat to the co-ordinator
+ * A helper class for managing the heartbeat to the coordinator
*/
public final class Heartbeat {
@@ -25,18 +25,24 @@ public final class Heartbeat {
private final long timeout;
private long lastHeartbeatSend;
+ private long lastHeartbeatReceive;
+ private long lastSessionReset;
public Heartbeat(long timeout, long now) {
this.timeout = timeout;
- this.lastHeartbeatSend = now;
+ this.lastSessionReset = now;
}
public void sentHeartbeat(long now) {
this.lastHeartbeatSend = now;
}
+ public void receiveHeartbeat(long now) {
+ this.lastHeartbeatReceive = now;
+ }
+
public boolean shouldHeartbeat(long now) {
- return now - lastHeartbeatSend > (1.0 / HEARTBEATS_PER_SESSION_INTERVAL) * this.timeout;
+ return timeToNextHeartbeat(now) == 0;
}
public long lastHeartbeatSend() {
@@ -44,7 +50,7 @@ public final class Heartbeat {
}
public long timeToNextHeartbeat(long now) {
- long timeSinceLastHeartbeat = now - lastHeartbeatSend;
+ long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset);
long hbInterval = timeout / HEARTBEATS_PER_SESSION_INTERVAL;
if (timeSinceLastHeartbeat > hbInterval)
@@ -52,4 +58,17 @@ public final class Heartbeat {
else
return hbInterval - timeSinceLastHeartbeat;
}
+
+ public boolean sessionTimeoutExpired(long now) {
+ return now - Math.max(lastSessionReset, lastHeartbeatReceive) > timeout;
+ }
+
+ public long interval() {
+ return timeout / HEARTBEATS_PER_SESSION_INTERVAL;
+ }
+
+ public void resetSessionTimeout(long now) {
+ this.lastSessionReset = now;
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
new file mode 100644
index 0000000..0ec6017
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.errors.InvalidMetadataException;
+
+/**
+ * No brokers were available to complete a request.
+ */
+public class NoAvailableBrokersException extends InvalidMetadataException {
+ private static final long serialVersionUID = 1L;
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
index 13fc9af..5f00251 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
@@ -12,78 +12,49 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.ArrayList;
+import java.util.List;
+
/**
- * Result of an asynchronous request through {@link org.apache.kafka.clients.KafkaClient}. To get the
- * result of the request, you must use poll using {@link org.apache.kafka.clients.KafkaClient#poll(long, long)}
- * until {@link #isDone()} returns true. Typical usage might look like this:
+ * Result of an asynchronous request from {@link ConsumerNetworkClient}. Use {@link ConsumerNetworkClient#poll(long)}
+ * (and variants) to finish a request future. Use {@link #isDone()} to check if the future is complete, and
+ * {@link #succeeded()} to check if the request completed successfully. Typical usage might look like this:
*
* <pre>
- * RequestFuture future = sendRequest();
- * while (!future.isDone()) {
- * client.poll(timeout, now);
- * }
+ * RequestFuture<ClientResponse> future = client.send(api, request);
+ * client.poll(future);
*
- * switch (future.outcome()) {
- * case SUCCESS:
- * // handle request success
- * break;
- * case NEED_RETRY:
- * // retry after taking possible retry action
- * break;
- * case EXCEPTION:
- * // handle exception
- * }
+ * if (future.succeeded()) {
+ * ClientResponse response = future.value();
+ * // Handle response
+ * } else {
+ * throw future.exception();
+ * }
* </pre>
*
- * When {@link #isDone()} returns true, there are three possible outcomes (obtained through {@link #outcome()}):
- *
- * <ol>
- * <li> {@link org.apache.kafka.clients.consumer.internals.RequestFuture.Outcome#SUCCESS}: If the request was
- * successful, then you can use {@link #value()} to obtain the result.</li>
- * <li> {@link org.apache.kafka.clients.consumer.internals.RequestFuture.Outcome#EXCEPTION}: If an unhandled exception
- * was encountered, you can use {@link #exception()} to get it.</li>
- * <li> {@link org.apache.kafka.clients.consumer.internals.RequestFuture.Outcome#NEED_RETRY}: The request may
- * not have been successful, but the failure may be ephemeral and the caller just needs to try the request again.
- * In this case, use {@link #retryAction()} to determine what action should be taken (if any) before
- * retrying.</li>
- * </ol>
- *
* @param <T> Return type of the result (Can be Void if there is no response)
*/
public class RequestFuture<T> {
- public static final RequestFuture<Object> NEED_NEW_COORDINATOR = newRetryFuture(RetryAction.FIND_COORDINATOR);
- public static final RequestFuture<Object> NEED_POLL = newRetryFuture(RetryAction.POLL);
- public static final RequestFuture<Object> NEED_METADATA_REFRESH = newRetryFuture(RetryAction.REFRESH_METADATA);
-
- public enum RetryAction {
- NOOP, // Retry immediately.
- POLL, // Retry after calling poll (e.g. to finish a connection)
- BACKOFF, // Retry after a delay
- FIND_COORDINATOR, // Find a new coordinator before retrying
- REFRESH_METADATA // Refresh metadata before retrying
- }
-
- public enum Outcome {
- SUCCESS,
- NEED_RETRY,
- EXCEPTION
- }
- private Outcome outcome;
- private RetryAction retryAction;
+ private boolean isDone = false;
private T value;
private RuntimeException exception;
+ private List<RequestFutureListener<T>> listeners = new ArrayList<RequestFutureListener<T>>();
+
/**
* Check whether the response is ready to be handled
* @return true if the response is ready, false otherwise
*/
public boolean isDone() {
- return outcome != null;
+ return isDone;
}
/**
- * Get the value corresponding to this request (if it has one, as indicated by {@link #outcome()}).
+ * Get the value corresponding to this request (only available if the request succeeded)
* @return the value if it exists or null
*/
public T value() {
@@ -92,32 +63,31 @@ public class RequestFuture<T> {
/**
* Check if the request succeeded;
- * @return true if a value is available, false otherwise
+ * @return true if the request completed and was successful
*/
public boolean succeeded() {
- return outcome == Outcome.SUCCESS;
+ return isDone && exception == null;
}
/**
- * Check if the request completed failed.
- * @return true if the request failed (whether or not it can be retried)
+ * Check if the request failed.
+ * @return true if the request completed with a failure
*/
public boolean failed() {
- return outcome != Outcome.SUCCESS;
+ return isDone && exception != null;
}
/**
- * Return the error from this response (assuming {@link #succeeded()} has returned false. If the
- * response is not ready or if there is no retryAction, null is returned.
- * @return the error if it exists or null
+ * Check if the request is retriable (convenience method for checking if
+ * the exception is an instance of {@link RetriableException}.
+ * @return true if it is retriable, false otherwise
*/
- public RetryAction retryAction() {
- return retryAction;
+ public boolean isRetriable() {
+ return exception instanceof RetriableException;
}
/**
- * Get the exception from a failed result. You should check that there is an exception
- * with {@link #hasException()} before using this method.
+ * Get the exception from a failed result (only available if the request failed)
* @return The exception if it exists or null
*/
public RuntimeException exception() {
@@ -125,85 +95,108 @@ public class RequestFuture<T> {
}
/**
- * Check whether there was an exception.
- * @return true if this request failed with an exception
+ * Complete the request successfully. After this call, {@link #succeeded()} will return true
+ * and the value can be obtained through {@link #value()}.
+ * @param value corresponding value (or null if there is none)
*/
- public boolean hasException() {
- return outcome == Outcome.EXCEPTION;
+ public void complete(T value) {
+ this.value = value;
+ this.isDone = true;
+ fireSuccess();
}
/**
- * Check the outcome of the future if it is ready.
- * @return the outcome or null if the future is not finished
+ * Raise an exception. The request will be marked as failed, and the caller can either
+ * handle the exception or throw it.
+ * @param e corresponding exception to be passed to caller
*/
- public Outcome outcome() {
- return outcome;
+ public void raise(RuntimeException e) {
+ this.exception = e;
+ this.isDone = true;
+ fireFailure();
}
/**
- * The request failed, but should be retried using the provided retry action.
- * @param retryAction The action that should be taken by the caller before retrying the request
+ * Raise an error. The request will be marked as failed.
+ * @param error corresponding error to be passed to caller
*/
- public void retry(RetryAction retryAction) {
- this.outcome = Outcome.NEED_RETRY;
- this.retryAction = retryAction;
- }
-
- public void retryNow() {
- retry(RetryAction.NOOP);
- }
-
- public void retryAfterBackoff() {
- retry(RetryAction.BACKOFF);
+ public void raise(Errors error) {
+ raise(error.exception());
}
- public void retryWithNewCoordinator() {
- retry(RetryAction.FIND_COORDINATOR);
+ private void fireSuccess() {
+ for (RequestFutureListener listener: listeners)
+ listener.onSuccess(value);
}
- public void retryAfterMetadataRefresh() {
- retry(RetryAction.REFRESH_METADATA);
+ private void fireFailure() {
+ for (RequestFutureListener listener: listeners)
+ listener.onFailure(exception);
}
/**
- * Complete the request successfully. After this call, {@link #succeeded()} will return true
- * and the value can be obtained through {@link #value()}.
- * @param value corresponding value (or null if there is none)
+ * Add a listener which will be notified when the future completes
+ * @param listener
*/
- public void complete(T value) {
- this.outcome = Outcome.SUCCESS;
- this.value = value;
+ public void addListener(RequestFutureListener<T> listener) {
+ if (isDone) {
+ if (exception != null)
+ listener.onFailure(exception);
+ else
+ listener.onSuccess(value);
+ } else {
+ this.listeners.add(listener);
+ }
}
/**
- * Raise an exception. The request will be marked as failed, and the caller can either
- * handle the exception or throw it.
- * @param e The exception that
+ * Convert from a request future of one type to another type
+ * @param adapter The adapter which does the conversion
+ * @param <S> The type of the future adapted to
+ * @return The new future
*/
- public void raise(RuntimeException e) {
- this.outcome = Outcome.EXCEPTION;
- this.exception = e;
+ public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
+ final RequestFuture<S> adapted = new RequestFuture<S>();
+ addListener(new RequestFutureListener<T>() {
+ @Override
+ public void onSuccess(T value) {
+ adapter.onSuccess(value, adapted);
+ }
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ adapter.onFailure(e, adapted);
+ }
+ });
+ return adapted;
+ }
+
+ public static <T> RequestFuture<T> failure(RuntimeException e) {
+ RequestFuture<T> future = new RequestFuture<T>();
+ future.raise(e);
+ return future;
+ }
+
+ public static RequestFuture<Void> voidSuccess() {
+ RequestFuture<Void> future = new RequestFuture<Void>();
+ future.complete(null);
+ return future;
}
- private static <T> RequestFuture<T> newRetryFuture(RetryAction retryAction) {
- RequestFuture<T> result = new RequestFuture<T>();
- result.retry(retryAction);
- return result;
+ public static <T> RequestFuture<T> coordinatorNotAvailable() {
+ return failure(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception());
}
- @SuppressWarnings("unchecked")
- public static <T> RequestFuture<T> pollNeeded() {
- return (RequestFuture<T>) NEED_POLL;
+ public static <T> RequestFuture<T> leaderNotAvailable() {
+ return failure(Errors.LEADER_NOT_AVAILABLE.exception());
}
- @SuppressWarnings("unchecked")
- public static <T> RequestFuture<T> metadataRefreshNeeded() {
- return (RequestFuture<T>) NEED_METADATA_REFRESH;
+ public static <T> RequestFuture<T> noBrokersAvailable() {
+ return failure(new NoAvailableBrokersException());
}
- @SuppressWarnings("unchecked")
- public static <T> RequestFuture<T> newCoordinatorNeeded() {
- return (RequestFuture<T>) NEED_NEW_COORDINATOR;
+ public static <T> RequestFuture<T> staleMetadata() {
+ return failure(new StaleMetadataException());
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
new file mode 100644
index 0000000..cc5322f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * Adapt from a request future of one type to another.
+ *
+ * @param <F> Type to adapt from
+ * @param <T> Type to adapt to
+ */
+public abstract class RequestFutureAdapter<F, T> {
+
+ public abstract void onSuccess(F value, RequestFuture<T> future);
+
+ public void onFailure(RuntimeException e, RequestFuture<T> future) {
+ future.raise(e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
new file mode 100644
index 0000000..b39261b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * Listener interface to hook into RequestFuture completion.
+ */
+public interface RequestFutureListener<T> {
+
+ void onSuccess(T value);
+
+ void onFailure(RuntimeException e);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
new file mode 100644
index 0000000..3312a2c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.errors.RetriableException;
+
+/**
+ * Exception used in {@link ConsumerNetworkClient} to indicate the failure
+ * to transmit a request to the networking layer. This could be either because
+ * the client is still connecting to the given host or its send buffer is full.
+ */
+public class SendFailedException extends RetriableException {
+ public static final SendFailedException INSTANCE = new SendFailedException();
+
+ private static final long serialVersionUID = 1L;
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
new file mode 100644
index 0000000..09114cb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.errors.InvalidMetadataException;
+
+/**
+ * Thrown when metadata is old and needs to be refreshed.
+ */
+public class StaleMetadataException extends InvalidMetadataException {
+ private static final long serialVersionUID = 1L;
+}