You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/06/23 06:19:53 UTC
[1/2] kafka git commit: kafka-2168;
New consumer poll() can block other calls like position(), commit(),
and close() indefinitely; patched by Jason Gustafson;
reviewed by Jay Kreps, Ewen Cheslack-Postava, Guozhang Wang and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 2270a7537 -> b6d326b08
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 56281ee..695eaf6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -19,7 +19,6 @@ import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
@@ -61,9 +60,6 @@ import java.util.Map;
public class Fetcher<K, V> {
private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
- private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
- private static final long LATEST_OFFSET_TIMESTAMP = -1L;
-
private final KafkaClient client;
@@ -72,23 +68,19 @@ public class Fetcher<K, V> {
private final int maxWaitMs;
private final int fetchSize;
private final boolean checkCrcs;
- private final long retryBackoffMs;
private final Metadata metadata;
private final FetchManagerMetrics sensors;
private final SubscriptionState subscriptions;
private final List<PartitionRecords<K, V>> records;
- private final AutoOffsetResetStrategy offsetResetStrategy;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
public Fetcher(KafkaClient client,
- long retryBackoffMs,
int minBytes,
int maxWaitMs,
int fetchSize,
boolean checkCrcs,
- String offsetReset,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
Metadata metadata,
@@ -102,17 +94,16 @@ public class Fetcher<K, V> {
this.client = client;
this.metadata = metadata;
this.subscriptions = subscriptions;
- this.retryBackoffMs = retryBackoffMs;
this.minBytes = minBytes;
this.maxWaitMs = maxWaitMs;
this.fetchSize = fetchSize;
this.checkCrcs = checkCrcs;
- this.offsetResetStrategy = AutoOffsetResetStrategy.valueOf(offsetReset);
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
this.records = new LinkedList<PartitionRecords<K, V>>();
+
this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
}
@@ -166,84 +157,76 @@ public class Fetcher<K, V> {
}
/**
- * Reset offsets for the given partition using the offset reset strategy.
- *
- * @param partition The given partition that needs reset offset
- * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined
- */
- public void resetOffset(TopicPartition partition) {
- long timestamp;
- if (this.offsetResetStrategy == AutoOffsetResetStrategy.EARLIEST)
- timestamp = EARLIEST_OFFSET_TIMESTAMP;
- else if (this.offsetResetStrategy == AutoOffsetResetStrategy.LATEST)
- timestamp = LATEST_OFFSET_TIMESTAMP;
- else
- throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
-
- log.debug("Resetting offset for partition {} to {} offset.", partition, this.offsetResetStrategy.name()
- .toLowerCase());
- this.subscriptions.seek(partition, offsetBefore(partition, timestamp));
- }
-
- /**
* Fetch a single offset before the given timestamp for the partition.
*
* @param topicPartition The partition that needs fetching offset.
* @param timestamp The timestamp for fetching offset.
- * @return The offset of the message that is published before the given timestamp
+ * @return A response which can be polled to obtain the corresponding offset.
*/
- public long offsetBefore(TopicPartition topicPartition, long timestamp) {
- log.debug("Fetching offsets for partition {}.", topicPartition);
+ public RequestFuture<Long> listOffset(final TopicPartition topicPartition, long timestamp) {
Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>(1);
partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1));
- while (true) {
- long now = time.milliseconds();
- PartitionInfo info = metadata.fetch().partition(topicPartition);
- if (info == null) {
- metadata.add(topicPartition.topic());
- log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition);
- awaitMetadataUpdate();
- } else if (info.leader() == null) {
- log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition);
- awaitMetadataUpdate();
- } else if (this.client.ready(info.leader(), now)) {
- Node node = info.leader();
- ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
- RequestSend send = new RequestSend(node.idString(),
+ long now = time.milliseconds();
+ PartitionInfo info = metadata.fetch().partition(topicPartition);
+ if (info == null) {
+ metadata.add(topicPartition.topic());
+ log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition);
+ return RequestFuture.metadataRefreshNeeded();
+ } else if (info.leader() == null) {
+ log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition);
+ return RequestFuture.metadataRefreshNeeded();
+ } else if (this.client.ready(info.leader(), now)) {
+ final RequestFuture<Long> future = new RequestFuture<Long>();
+ Node node = info.leader();
+ ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
+ RequestSend send = new RequestSend(node.idString(),
this.client.nextRequestHeader(ApiKeys.LIST_OFFSETS),
request.toStruct());
- ClientRequest clientRequest = new ClientRequest(now, true, send, null);
- this.client.send(clientRequest);
- List<ClientResponse> responses = this.client.completeAll(node.idString(), now);
- if (responses.isEmpty())
- throw new IllegalStateException("This should not happen.");
- ClientResponse response = responses.get(responses.size() - 1);
- if (response.wasDisconnected()) {
- awaitMetadataUpdate();
- } else {
- ListOffsetResponse lor = new ListOffsetResponse(response.responseBody());
- short errorCode = lor.responseData().get(topicPartition).errorCode;
- if (errorCode == Errors.NONE.code()) {
- List<Long> offsets = lor.responseData().get(topicPartition).offsets;
- if (offsets.size() != 1)
- throw new IllegalStateException("This should not happen.");
- long offset = offsets.get(0);
- log.debug("Fetched offset {} for partition {}", offset, topicPartition);
- return offset;
- } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
- || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
- log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
- topicPartition);
- awaitMetadataUpdate();
- } else {
- log.error("Attempt to fetch offsets for partition {} failed due to: {}",
- topicPartition, Errors.forCode(errorCode).exception().getMessage());
- awaitMetadataUpdate();
- }
+ RequestCompletionHandler completionHandler = new RequestCompletionHandler() {
+ @Override
+ public void onComplete(ClientResponse resp) {
+ handleListOffsetResponse(topicPartition, resp, future);
}
+ };
+ ClientRequest clientRequest = new ClientRequest(now, true, send, completionHandler);
+ this.client.send(clientRequest);
+ return future;
+ } else {
+ // We initiated a connect to the leader, but we need to poll to finish it.
+ return RequestFuture.pollNeeded();
+ }
+ }
+
+ /**
+ * Callback for the response of the list offset call above.
+ * @param topicPartition The partition that was fetched
+ * @param clientResponse The response from the server.
+ */
+ private void handleListOffsetResponse(TopicPartition topicPartition,
+ ClientResponse clientResponse,
+ RequestFuture<Long> future) {
+ if (clientResponse.wasDisconnected()) {
+ future.retryAfterMetadataRefresh();
+ } else {
+ ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody());
+ short errorCode = lor.responseData().get(topicPartition).errorCode;
+ if (errorCode == Errors.NONE.code()) {
+ List<Long> offsets = lor.responseData().get(topicPartition).offsets;
+ if (offsets.size() != 1)
+ throw new IllegalStateException("This should not happen.");
+ long offset = offsets.get(0);
+ log.debug("Fetched offset {} for partition {}", offset, topicPartition);
+
+ future.complete(offset);
+ } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
+ || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
+ log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
+ topicPartition);
+ future.retryAfterMetadataRefresh();
} else {
- log.debug("Leader for partition {} is not ready, retry fetching offsets", topicPartition);
- client.poll(this.retryBackoffMs, now);
+ log.error("Attempt to fetch offsets for partition {} failed due to: {}",
+ topicPartition, Errors.forCode(errorCode).exception().getMessage());
+ future.retryAfterMetadataRefresh();
}
}
}
@@ -257,8 +240,10 @@ public class Fetcher<K, V> {
Map<Integer, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<Integer, Map<TopicPartition, FetchRequest.PartitionData>>();
for (TopicPartition partition : subscriptions.assignedPartitions()) {
Node node = cluster.leaderFor(partition);
- // if there is a leader and no in-flight requests, issue a new fetch
- if (node != null && this.client.inFlightRequestCount(node.idString()) == 0) {
+ if (node == null) {
+ metadata.requestUpdate();
+ } else if (this.client.inFlightRequestCount(node.idString()) == 0) {
+ // if there is a leader and no in-flight requests, issue a new fetch
Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node.id());
if (fetch == null) {
fetch = new HashMap<TopicPartition, FetchRequest.PartitionData>();
@@ -327,7 +312,7 @@ public class Fetcher<K, V> {
} else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
// TODO: this could be optimized by grouping all out-of-range partitions
log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
- resetOffset(tp);
+ subscriptions.needOffsetReset(tp);
} else if (partition.errorCode == Errors.UNKNOWN.code()) {
log.warn("Unknown error fetching data for topic-partition {}", tp);
} else {
@@ -356,17 +341,6 @@ public class Fetcher<K, V> {
return new ConsumerRecord<K, V>(partition.topic(), partition.partition(), offset, key, value);
}
- /*
- * Request a metadata update and wait until it has occurred
- */
- private void awaitMetadataUpdate() {
- int version = this.metadata.requestUpdate();
- do {
- long now = time.milliseconds();
- this.client.poll(this.retryBackoffMs, now);
- } while (this.metadata.version() == version);
- }
-
private static class PartitionRecords<K, V> {
public long fetchOffset;
public TopicPartition partition;
@@ -379,9 +353,6 @@ public class Fetcher<K, V> {
}
}
- private static enum AutoOffsetResetStrategy {
- LATEST, EARLIEST, NONE
- }
private class FetchManagerMetrics {
public final Metrics metrics;
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
index e7cfaaa..51eae19 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -42,4 +42,14 @@ public final class Heartbeat {
public long lastHeartbeatSend() {
return this.lastHeartbeatSend;
}
+
+ public long timeToNextHeartbeat(long now) {
+ long timeSinceLastHeartbeat = now - lastHeartbeatSend;
+
+ long hbInterval = timeout / HEARTBEATS_PER_SESSION_INTERVAL;
+ if (timeSinceLastHeartbeat > hbInterval)
+ return 0;
+ else
+ return hbInterval - timeSinceLastHeartbeat;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
new file mode 100644
index 0000000..13fc9af
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * Result of an asynchronous request through {@link org.apache.kafka.clients.KafkaClient}. To get the
+ * result of the request, you must use poll using {@link org.apache.kafka.clients.KafkaClient#poll(long, long)}
+ * until {@link #isDone()} returns true. Typical usage might look like this:
+ *
+ * <pre>
+ * RequestFuture future = sendRequest();
+ * while (!future.isDone()) {
+ * client.poll(timeout, now);
+ * }
+ *
+ * switch (future.outcome()) {
+ * case SUCCESS:
+ * // handle request success
+ * break;
+ * case NEED_RETRY:
+ * // retry after taking possible retry action
+ * break;
+ * case EXCEPTION:
+ * // handle exception
+ * }
+ * </pre>
+ *
+ * When {@link #isDone()} returns true, there are three possible outcomes (obtained through {@link #outcome()}):
+ *
+ * <ol>
+ * <li> {@link org.apache.kafka.clients.consumer.internals.RequestFuture.Outcome#SUCCESS}: If the request was
+ * successful, then you can use {@link #value()} to obtain the result.</li>
+ * <li> {@link org.apache.kafka.clients.consumer.internals.RequestFuture.Outcome#EXCEPTION}: If an unhandled exception
+ * was encountered, you can use {@link #exception()} to get it.</li>
+ * <li> {@link org.apache.kafka.clients.consumer.internals.RequestFuture.Outcome#NEED_RETRY}: The request may
+ * not have been successful, but the failure may be ephemeral and the caller just needs to try the request again.
+ * In this case, use {@link #retryAction()} to determine what action should be taken (if any) before
+ * retrying.</li>
+ * </ol>
+ *
+ * @param <T> Return type of the result (Can be Void if there is no response)
+ */
+public class RequestFuture<T> {
+ public static final RequestFuture<Object> NEED_NEW_COORDINATOR = newRetryFuture(RetryAction.FIND_COORDINATOR);
+ public static final RequestFuture<Object> NEED_POLL = newRetryFuture(RetryAction.POLL);
+ public static final RequestFuture<Object> NEED_METADATA_REFRESH = newRetryFuture(RetryAction.REFRESH_METADATA);
+
+ public enum RetryAction {
+ NOOP, // Retry immediately.
+ POLL, // Retry after calling poll (e.g. to finish a connection)
+ BACKOFF, // Retry after a delay
+ FIND_COORDINATOR, // Find a new coordinator before retrying
+ REFRESH_METADATA // Refresh metadata before retrying
+ }
+
+ public enum Outcome {
+ SUCCESS,
+ NEED_RETRY,
+ EXCEPTION
+ }
+
+ private Outcome outcome;
+ private RetryAction retryAction;
+ private T value;
+ private RuntimeException exception;
+
+ /**
+ * Check whether the response is ready to be handled
+ * @return true if the response is ready, false otherwise
+ */
+ public boolean isDone() {
+ return outcome != null;
+ }
+
+ /**
+ * Get the value corresponding to this request (if it has one, as indicated by {@link #outcome()}).
+ * @return the value if it exists or null
+ */
+ public T value() {
+ return value;
+ }
+
+ /**
+ * Check if the request succeeded;
+ * @return true if a value is available, false otherwise
+ */
+ public boolean succeeded() {
+ return outcome == Outcome.SUCCESS;
+ }
+
+ /**
+ * Check if the request completed failed.
+ * @return true if the request failed (whether or not it can be retried)
+ */
+ public boolean failed() {
+ return outcome != Outcome.SUCCESS;
+ }
+
+ /**
+ * Return the error from this response (assuming {@link #succeeded()} has returned false. If the
+ * response is not ready or if there is no retryAction, null is returned.
+ * @return the error if it exists or null
+ */
+ public RetryAction retryAction() {
+ return retryAction;
+ }
+
+ /**
+ * Get the exception from a failed result. You should check that there is an exception
+ * with {@link #hasException()} before using this method.
+ * @return The exception if it exists or null
+ */
+ public RuntimeException exception() {
+ return exception;
+ }
+
+ /**
+ * Check whether there was an exception.
+ * @return true if this request failed with an exception
+ */
+ public boolean hasException() {
+ return outcome == Outcome.EXCEPTION;
+ }
+
+ /**
+ * Check the outcome of the future if it is ready.
+ * @return the outcome or null if the future is not finished
+ */
+ public Outcome outcome() {
+ return outcome;
+ }
+
+ /**
+ * The request failed, but should be retried using the provided retry action.
+ * @param retryAction The action that should be taken by the caller before retrying the request
+ */
+ public void retry(RetryAction retryAction) {
+ this.outcome = Outcome.NEED_RETRY;
+ this.retryAction = retryAction;
+ }
+
+ public void retryNow() {
+ retry(RetryAction.NOOP);
+ }
+
+ public void retryAfterBackoff() {
+ retry(RetryAction.BACKOFF);
+ }
+
+ public void retryWithNewCoordinator() {
+ retry(RetryAction.FIND_COORDINATOR);
+ }
+
+ public void retryAfterMetadataRefresh() {
+ retry(RetryAction.REFRESH_METADATA);
+ }
+
+ /**
+ * Complete the request successfully. After this call, {@link #succeeded()} will return true
+ * and the value can be obtained through {@link #value()}.
+ * @param value corresponding value (or null if there is none)
+ */
+ public void complete(T value) {
+ this.outcome = Outcome.SUCCESS;
+ this.value = value;
+ }
+
+ /**
+ * Raise an exception. The request will be marked as failed, and the caller can either
+ * handle the exception or throw it.
+ * @param e The exception that
+ */
+ public void raise(RuntimeException e) {
+ this.outcome = Outcome.EXCEPTION;
+ this.exception = e;
+ }
+
+ private static <T> RequestFuture<T> newRetryFuture(RetryAction retryAction) {
+ RequestFuture<T> result = new RequestFuture<T>();
+ result.retry(retryAction);
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T> RequestFuture<T> pollNeeded() {
+ return (RequestFuture<T>) NEED_POLL;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T> RequestFuture<T> metadataRefreshNeeded() {
+ return (RequestFuture<T>) NEED_METADATA_REFRESH;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T> RequestFuture<T> newCoordinatorNeeded() {
+ return (RequestFuture<T>) NEED_NEW_COORDINATOR;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index cee7541..6837453 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -12,14 +12,15 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.kafka.common.TopicPartition;
-
/**
* A class for tracking the topics, partitions, and offsets for the consumer
*/
@@ -49,7 +50,14 @@ public class SubscriptionState {
/* do we need to request the latest committed offsets from the coordinator? */
private boolean needsFetchCommittedOffsets;
- public SubscriptionState() {
+ /* Partitions that need to be reset before fetching */
+ private Map<TopicPartition, OffsetResetStrategy> resetPartitions;
+
+ /* Default offset reset strategy */
+ private OffsetResetStrategy offsetResetStrategy;
+
+ public SubscriptionState(OffsetResetStrategy offsetResetStrategy) {
+ this.offsetResetStrategy = offsetResetStrategy;
this.subscribedTopics = new HashSet<String>();
this.subscribedPartitions = new HashSet<TopicPartition>();
this.assignedPartitions = new HashSet<TopicPartition>();
@@ -58,6 +66,7 @@ public class SubscriptionState {
this.committed = new HashMap<TopicPartition, Long>();
this.needsPartitionAssignment = false;
this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
+ this.resetPartitions = new HashMap<TopicPartition, OffsetResetStrategy>();
}
public void subscribe(String topic) {
@@ -102,12 +111,14 @@ public class SubscriptionState {
this.committed.remove(tp);
this.fetched.remove(tp);
this.consumed.remove(tp);
+ this.resetPartitions.remove(tp);
}
public void clearAssignment() {
this.assignedPartitions.clear();
this.committed.clear();
this.fetched.clear();
+ this.consumed.clear();
this.needsPartitionAssignment = !subscribedTopics().isEmpty();
}
@@ -145,6 +156,7 @@ public class SubscriptionState {
public void seek(TopicPartition tp, long offset) {
fetched(tp, offset);
consumed(tp, offset);
+ resetPartitions.remove(tp);
}
public Set<TopicPartition> assignedPartitions() {
@@ -169,6 +181,28 @@ public class SubscriptionState {
return this.consumed;
}
+ public void needOffsetReset(TopicPartition partition, OffsetResetStrategy offsetResetStrategy) {
+ this.resetPartitions.put(partition, offsetResetStrategy);
+ this.fetched.remove(partition);
+ this.consumed.remove(partition);
+ }
+
+ public void needOffsetReset(TopicPartition partition) {
+ needOffsetReset(partition, offsetResetStrategy);
+ }
+
+ public boolean isOffsetResetNeeded(TopicPartition partition) {
+ return resetPartitions.containsKey(partition);
+ }
+
+ public boolean isOffsetResetNeeded() {
+ return !resetPartitions.isEmpty();
+ }
+
+ public OffsetResetStrategy resetStrategy(TopicPartition partition) {
+ return resetPartitions.get(partition);
+ }
+
public boolean hasAllFetchPositions() {
return this.fetched.size() >= this.assignedPartitions.size();
}
@@ -192,4 +226,5 @@ public class SubscriptionState {
this.needsPartitionAssignment = false;
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index f73eedb..af9993c 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -182,6 +182,21 @@ public class Utils {
}
/**
+ * Get the minimum of some long values.
+ * @param first Used to ensure at least one value
+ * @param rest The rest of longs to compare
+ * @return The minimum of all passed argument.
+ */
+ public static long min(long first, long ... rest) {
+ long min = first;
+ for (int i = 0; i < rest.length; i++) {
+ if (rest[i] < min)
+ min = rest[i];
+ }
+ return min;
+ }
+
+ /**
* Get the length for UTF8-encoding a string without encoding it first
*
* @param s The string to calculate the length for
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 677edd3..26b6b40 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -25,7 +25,7 @@ import org.junit.Test;
public class MockConsumerTest {
- private MockConsumer<String, String> consumer = new MockConsumer<String, String>();
+ private MockConsumer<String, String> consumer = new MockConsumer<String, String>(OffsetResetStrategy.EARLIEST);
@Test
public void testSimpleMock() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
index 1454ab7..613b192 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -17,10 +17,11 @@
package org.apache.kafka.clients.consumer.internals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@@ -49,24 +50,20 @@ public class CoordinatorTest {
private String topicName = "test";
private String groupId = "test-group";
private TopicPartition tp = new TopicPartition(topicName, 0);
- private long retryBackoffMs = 0L;
private int sessionTimeoutMs = 10;
private String rebalanceStrategy = "not-matter";
private MockTime time = new MockTime();
private MockClient client = new MockClient(time);
- private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
private Node node = cluster.nodes().get(0);
- private SubscriptionState subscriptions = new SubscriptionState();
+ private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
private Metrics metrics = new Metrics(time);
private Map<String, String> metricTags = new LinkedHashMap<String, String>();
private Coordinator coordinator = new Coordinator(client,
groupId,
- retryBackoffMs,
sessionTimeoutMs,
rebalanceStrategy,
- metadata,
subscriptions,
metrics,
"consumer" + groupId,
@@ -75,13 +72,14 @@ public class CoordinatorTest {
@Before
public void setup() {
- metadata.update(cluster, time.milliseconds());
client.setNode(node);
}
@Test
public void testNormalHeartbeat() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.discoverConsumerCoordinator();
+ client.poll(0, time.milliseconds());
// normal heartbeat
time.sleep(sessionTimeoutMs);
@@ -94,6 +92,8 @@ public class CoordinatorTest {
@Test
public void testCoordinatorNotAvailable() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.discoverConsumerCoordinator();
+ client.poll(0, time.milliseconds());
// consumer_coordinator_not_available will mark coordinator as unknown
time.sleep(sessionTimeoutMs);
@@ -108,6 +108,8 @@ public class CoordinatorTest {
@Test
public void testNotCoordinator() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.discoverConsumerCoordinator();
+ client.poll(0, time.milliseconds());
// not_coordinator will mark coordinator as unknown
time.sleep(sessionTimeoutMs);
@@ -122,6 +124,8 @@ public class CoordinatorTest {
@Test
public void testIllegalGeneration() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.discoverConsumerCoordinator();
+ client.poll(0, time.milliseconds());
// illegal_generation will cause re-partition
subscriptions.subscribe(topicName);
@@ -139,6 +143,8 @@ public class CoordinatorTest {
@Test
public void testCoordinatorDisconnect() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.discoverConsumerCoordinator();
+ client.poll(0, time.milliseconds());
// coordinator disconnect will mark coordinator as unknown
time.sleep(sessionTimeoutMs);
@@ -152,39 +158,67 @@ public class CoordinatorTest {
@Test
public void testNormalJoinGroup() {
+ subscriptions.subscribe(topicName);
+ subscriptions.needReassignment();
+
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.discoverConsumerCoordinator();
+ client.poll(0, time.milliseconds());
// normal join group
client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()));
- assertEquals(Collections.singletonList(tp),
- coordinator.assignPartitions(Collections.singletonList(topicName), time.milliseconds()));
- assertEquals(0, client.inFlightRequestCount());
+ coordinator.assignPartitions(time.milliseconds());
+ client.poll(0, time.milliseconds());
+
+ assertFalse(subscriptions.partitionAssignmentNeeded());
+ assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
}
@Test
public void testReJoinGroup() {
+ subscriptions.subscribe(topicName);
+ subscriptions.needReassignment();
+
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.discoverConsumerCoordinator();
+ client.poll(0, time.milliseconds());
+ assertTrue(subscriptions.partitionAssignmentNeeded());
// diconnected from original coordinator will cause re-discover and join again
client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()), true);
+ coordinator.assignPartitions(time.milliseconds());
+ client.poll(0, time.milliseconds());
+ assertTrue(subscriptions.partitionAssignmentNeeded());
+
+ // rediscover the coordinator
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.discoverConsumerCoordinator();
+ client.poll(0, time.milliseconds());
+
+ // try assigning partitions again
client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()));
- assertEquals(Collections.singletonList(tp),
- coordinator.assignPartitions(Collections.singletonList(topicName), time.milliseconds()));
- assertEquals(0, client.inFlightRequestCount());
+ coordinator.assignPartitions(time.milliseconds());
+ client.poll(0, time.milliseconds());
+ assertFalse(subscriptions.partitionAssignmentNeeded());
+ assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
}
@Test
public void testCommitOffsetNormal() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.discoverConsumerCoordinator();
+ client.poll(0, time.milliseconds());
- // sync commit
+ // With success flag
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
- coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds());
+ RequestFuture<Void> result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
+ assertEquals(1, client.poll(0, time.milliseconds()).size());
+ assertTrue(result.isDone());
+ assertTrue(result.succeeded());
- // async commit
- coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds());
+ // Without success flag
+ coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
client.respond(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
assertEquals(1, client.poll(0, time.milliseconds()).size());
}
@@ -192,34 +226,55 @@ public class CoordinatorTest {
@Test
public void testCommitOffsetError() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.discoverConsumerCoordinator();
+ client.poll(0, time.milliseconds());
// async commit with coordinator not available
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
- coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds());
+ coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
assertEquals(1, client.poll(0, time.milliseconds()).size());
assertTrue(coordinator.coordinatorUnknown());
// resume
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.discoverConsumerCoordinator();
+ client.poll(0, time.milliseconds());
// async commit with not coordinator
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
- coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds());
+ coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
assertEquals(1, client.poll(0, time.milliseconds()).size());
assertTrue(coordinator.coordinatorUnknown());
// resume
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.discoverConsumerCoordinator();
+ client.poll(0, time.milliseconds());
// sync commit with not_coordinator
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
- coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds());
+ RequestFuture<Void> result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
+ assertEquals(1, client.poll(0, time.milliseconds()).size());
+ assertTrue(result.isDone());
+ assertEquals(RequestFuture.RetryAction.FIND_COORDINATOR, result.retryAction());
// sync commit with coordinator disconnected
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
- coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds());
+ result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
+
+ assertEquals(0, client.poll(0, time.milliseconds()).size());
+ assertTrue(result.isDone());
+ assertEquals(RequestFuture.RetryAction.FIND_COORDINATOR, result.retryAction());
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.discoverConsumerCoordinator();
+ client.poll(0, time.milliseconds());
+
+ result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds());
+ assertEquals(1, client.poll(0, time.milliseconds()).size());
+ assertTrue(result.isDone());
+ assertTrue(result.succeeded());
}
@@ -227,33 +282,70 @@ public class CoordinatorTest {
public void testFetchOffset() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.discoverConsumerCoordinator();
+ client.poll(0, time.milliseconds());
// normal fetch
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
- assertEquals(100L, (long) coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).get(tp));
+ RequestFuture<Map<TopicPartition, Long>> result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
+ client.poll(0, time.milliseconds());
+ assertTrue(result.isDone());
+ assertEquals(100L, (long) result.value().get(tp));
// fetch with loading in progress
client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L));
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
- assertEquals(100L, (long) coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).get(tp));
+
+ result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
+ client.poll(0, time.milliseconds());
+ assertTrue(result.isDone());
+ assertTrue(result.failed());
+ assertEquals(RequestFuture.RetryAction.BACKOFF, result.retryAction());
+
+ result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
+ client.poll(0, time.milliseconds());
+ assertTrue(result.isDone());
+ assertEquals(100L, (long) result.value().get(tp));
// fetch with not coordinator
client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code(), "", 100L));
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
- assertEquals(100L, (long) coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).get(tp));
+
+ result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
+ client.poll(0, time.milliseconds());
+ assertTrue(result.isDone());
+ assertTrue(result.failed());
+ assertEquals(RequestFuture.RetryAction.FIND_COORDINATOR, result.retryAction());
+
+ coordinator.discoverConsumerCoordinator();
+ client.poll(0, time.milliseconds());
+
+ result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
+ client.poll(0, time.milliseconds());
+ assertTrue(result.isDone());
+ assertEquals(100L, (long) result.value().get(tp));
// fetch with no fetchable offsets
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
- assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size());
+ result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
+ client.poll(0, time.milliseconds());
+ assertTrue(result.isDone());
+ assertTrue(result.value().isEmpty());
// fetch with offset topic unknown
client.prepareResponse(offsetFetchResponse(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "", 100L));
- assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size());
+ result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
+ client.poll(0, time.milliseconds());
+ assertTrue(result.isDone());
+ assertTrue(result.value().isEmpty());
// fetch with offset -1
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
- assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size());
+ result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
+ client.poll(0, time.milliseconds());
+ assertTrue(result.isDone());
+ assertTrue(result.value().isEmpty());
}
private Struct consumerMetadataResponse(Node node, short error) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 4195410..405efdc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -16,11 +16,10 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import static org.junit.Assert.assertEquals;
-
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@@ -30,10 +29,11 @@ import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchResponse;
-import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Collections;
@@ -41,37 +41,33 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class FetcherTest {
private String topicName = "test";
private String groupId = "test-group";
private TopicPartition tp = new TopicPartition(topicName, 0);
- private long retryBackoffMs = 0L;
private int minBytes = 1;
private int maxWaitMs = 0;
private int fetchSize = 1000;
- private String offsetReset = "EARLIEST";
private MockTime time = new MockTime();
private MockClient client = new MockClient(time);
private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
private Node node = cluster.nodes().get(0);
- private SubscriptionState subscriptions = new SubscriptionState();
+ private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
private Metrics metrics = new Metrics(time);
private Map<String, String> metricTags = new LinkedHashMap<String, String>();
private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
private Fetcher<byte[], byte[]> fetcher = new Fetcher<byte[], byte[]>(client,
- retryBackoffMs,
minBytes,
maxWaitMs,
fetchSize,
true, // check crc
- offsetReset,
new ByteArrayDeserializer(),
new ByteArrayDeserializer(),
metadata,
@@ -140,11 +136,11 @@ public class FetcherTest {
subscriptions.fetched(tp, 5);
fetcher.initFetches(cluster, time.milliseconds());
client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
- client.prepareResponse(listOffsetResponse(Collections.singletonList(0L), Errors.NONE.code()));
client.poll(0, time.milliseconds());
+ assertTrue(subscriptions.isOffsetResetNeeded(tp));
assertEquals(0, fetcher.fetchedRecords().size());
- assertEquals(0L, (long) subscriptions.fetched(tp));
- assertEquals(0L, (long) subscriptions.consumed(tp));
+ assertEquals(null, subscriptions.fetched(tp));
+ assertEquals(null, subscriptions.consumed(tp));
}
@Test
@@ -157,11 +153,11 @@ public class FetcherTest {
// fetch with out of range
fetcher.initFetches(cluster, time.milliseconds());
client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
- client.prepareResponse(listOffsetResponse(Collections.singletonList(0L), Errors.NONE.code()));
client.poll(0, time.milliseconds());
+ assertTrue(subscriptions.isOffsetResetNeeded(tp));
assertEquals(0, fetcher.fetchedRecords().size());
- assertEquals(0L, (long) subscriptions.fetched(tp));
- assertEquals(0L, (long) subscriptions.consumed(tp));
+ assertEquals(null, subscriptions.fetched(tp));
+ assertEquals(null, subscriptions.consumed(tp));
}
private Struct fetchResponse(ByteBuffer buffer, short error, long hw) {
@@ -169,9 +165,5 @@ public class FetcherTest {
return response.toStruct();
}
- private Struct listOffsetResponse(List<Long> offsets, short error) {
- ListOffsetResponse response = new ListOffsetResponse(Collections.singletonMap(tp, new ListOffsetResponse.PartitionData(error, offsets)));
- return response.toStruct();
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
index ecc78ce..ee1ede0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -42,4 +43,12 @@ public class HeartbeatTest {
time.sleep(timeout / (2 * Heartbeat.HEARTBEATS_PER_SESSION_INTERVAL));
assertFalse(heartbeat.shouldHeartbeat(time.milliseconds()));
}
+
+ @Test
+ public void testTimeToNextHeartbeat() {
+ heartbeat.sentHeartbeat(0);
+ assertEquals(100, heartbeat.timeToNextHeartbeat(0));
+ assertEquals(0, heartbeat.timeToNextHeartbeat(100));
+ assertEquals(0, heartbeat.timeToNextHeartbeat(200));
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index e000cf8..319751c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -22,12 +22,13 @@ import static java.util.Arrays.asList;
import java.util.Collections;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;
public class SubscriptionStateTest {
- private final SubscriptionState state = new SubscriptionState();
+ private final SubscriptionState state = new SubscriptionState(OffsetResetStrategy.EARLIEST);
private final TopicPartition tp0 = new TopicPartition("test", 0);
private final TopicPartition tp1 = new TopicPartition("test", 1);
@@ -43,7 +44,21 @@ public class SubscriptionStateTest {
assertTrue(state.assignedPartitions().isEmpty());
assertAllPositions(tp0, null);
}
-
+
+ @Test
+ public void partitionReset() {
+ state.subscribe(tp0);
+ state.seek(tp0, 5);
+ assertEquals(5L, (long) state.fetched(tp0));
+ assertEquals(5L, (long) state.consumed(tp0));
+ state.needOffsetReset(tp0);
+ assertTrue(state.isOffsetResetNeeded());
+ assertTrue(state.isOffsetResetNeeded(tp0));
+ assertEquals(null, state.fetched(tp0));
+ assertEquals(null, state.consumed(tp0));
+ }
+
+ @Test
public void topicSubscription() {
state.subscribe("test");
assertEquals(1, state.subscribedTopics().size());
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 2ebe3c2..e7951d8 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -100,4 +100,12 @@ public class UtilsTest {
buffer = ByteBuffer.wrap(myvar).asReadOnlyBuffer();
this.subTest(buffer);
}
+
+ @Test
+ public void testMin() {
+ assertEquals(1, Utils.min(1));
+ assertEquals(1, Utils.min(1, 2, 3));
+ assertEquals(1, Utils.min(2, 1, 3));
+ assertEquals(1, Utils.min(2, 3, 1));
+ }
}
\ No newline at end of file
[2/2] kafka git commit: kafka-2168;
New consumer poll() can block other calls like position(), commit(),
and close() indefinitely; patched by Jason Gustafson;
reviewed by Jay Kreps, Ewen Cheslack-Postava, Guozhang Wang and Jun Rao
Posted by ju...@apache.org.
kafka-2168; New consumer poll() can block other calls like position(), commit(), and close() indefinitely; patched by Jason Gustafson; reviewed by Jay Kreps, Ewen Cheslack-Postava, Guozhang Wang and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b6d326b0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b6d326b0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b6d326b0
Branch: refs/heads/trunk
Commit: b6d326b0893e60b350608260fd1bd2542337cb5a
Parents: 2270a75
Author: Jason Gustafson <as...@confluent.io>
Authored: Tue Jun 23 00:07:19 2015 -0400
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jun 23 00:09:06 2015 -0400
----------------------------------------------------------------------
.../apache/kafka/clients/consumer/Consumer.java | 5 +
.../kafka/clients/consumer/ConsumerRecords.java | 7 +
.../consumer/ConsumerWakeupException.java | 20 +
.../kafka/clients/consumer/KafkaConsumer.java | 715 +++++++++++++++----
.../kafka/clients/consumer/MockConsumer.java | 9 +-
.../clients/consumer/OffsetResetStrategy.java | 17 +
.../clients/consumer/internals/Coordinator.java | 447 ++++++------
.../clients/consumer/internals/Fetcher.java | 159 ++---
.../clients/consumer/internals/Heartbeat.java | 10 +
.../consumer/internals/RequestFuture.java | 209 ++++++
.../consumer/internals/SubscriptionState.java | 41 +-
.../org/apache/kafka/common/utils/Utils.java | 15 +
.../clients/consumer/MockConsumerTest.java | 2 +-
.../consumer/internals/CoordinatorTest.java | 148 +++-
.../clients/consumer/internals/FetcherTest.java | 32 +-
.../consumer/internals/HeartbeatTest.java | 9 +
.../internals/SubscriptionStateTest.java | 19 +-
.../apache/kafka/common/utils/UtilsTest.java | 8 +
18 files changed, 1330 insertions(+), 542 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 8f587bc..fd98740 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -108,4 +108,9 @@ public interface Consumer<K, V> extends Closeable {
*/
public void close();
+ /**
+ * @see KafkaConsumer#wakeup()
+ */
+ public void wakeup();
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index 1ca75f8..eb75d2e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -27,6 +27,8 @@ import java.util.Map;
* {@link Consumer#poll(long)} operation.
*/
public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
+ public static final ConsumerRecords<Object, Object> EMPTY =
+ new ConsumerRecords<Object, Object>(Collections.EMPTY_MAP);
private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
@@ -103,4 +105,9 @@ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
}
}
+ @SuppressWarnings("unchecked")
+ public static <K, V> ConsumerRecords<K, V> empty() {
+ return (ConsumerRecords<K, V>) EMPTY;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
new file mode 100644
index 0000000..35f1ec9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.KafkaException;
+
+public class ConsumerWakeupException extends KafkaException {
+ private static final long serialVersionUID = 1L;
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 951c34c..9be8fbc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -12,44 +12,48 @@
*/
package org.apache.kafka.clients.consumer;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.Coordinator;
import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.common.utils.Utils.min;
+
/**
* A Kafka client that consumes records from a Kafka cluster.
* <p>
@@ -298,10 +302,54 @@ import org.slf4j.LoggerFactory;
*
* <h3>Multithreaded Processing</h3>
*
- * The Kafka consumer is threadsafe but coarsely synchronized. All network I/O happens in the thread of the application
- * making the call. We have intentionally avoided implementing a particular threading model for processing.
+ * The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application
+ * making the call. It is the responsibility of the user to ensure that multi-threaded access
+ * is properly synchronized. Un-synchronized access will result in {@link ConcurrentModificationException}.
+ *
+ * <p>
+ * The only exception to this rule is {@link #wakeup()}, which can safely be used from an external thread to
+ * interrupt an active operation. In this case, a {@link ConsumerWakeupException} will be thrown from the thread
+ * blocking on the operation. This can be used to shutdown the consumer from another thread. The following
+ * snippet shows the typical pattern:
+ *
+ * <pre>
+ * public class KafkaConsumerRunner implements Runnable {
+ * private final AtomicBoolean closed = new AtomicBoolean(false);
+ * private final KafkaConsumer consumer;
+ *
+ * public void run() {
+ * try {
+ * consumer.subscribe("topic");
+ * while (!closed.get()) {
+ * ConsumerRecords records = consumer.poll(10000);
+ * // Handle new records
+ * }
+ * } catch (ConsumerWakeupException e) {
+ * // Ignore exception if closing
+ * if (!closed.get()) throw e;
+ * } finally {
+ * consumer.close();
+ * }
+ * }
+ *
+ * public void shutdown() {
+ * closed.set(true);
+ * consumer.wakeup();
+ * }
+ * }
+ * </pre>
+ *
+ * Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.
+ *
+ * <pre>
+ * closed.set(true);
+ * consumer.wakeup();
+ * </pre>
+ *
* <p>
- * This leaves several options for implementing multi-threaded processing of records.
+ * We have intentionally avoided implementing a particular threading model for processing. This leaves several
+ * options for implementing multi-threaded processing of records.
+ *
*
* <h4>1. One Consumer Per Thread</h4>
*
@@ -363,6 +411,17 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final ConsumerRebalanceCallback rebalanceCallback;
private long lastCommitAttemptMs;
private boolean closed = false;
+ private final AtomicBoolean wakeup = new AtomicBoolean(false);
+
+ // currentThread holds the threadId of the current thread accessing KafkaConsumer
+ // and is used to prevent multi-threaded access
+ private final AtomicReference<Long> currentThread = new AtomicReference<Long>();
+ // refcount is used to allow reentrant access by the thread who has acquired currentThread
+ private int refcount = 0; // reference count for reentrant access
+
+ // TODO: This timeout controls how long we should wait before retrying a request. We should be able
+ // to leverage the work of KAFKA-2120 to get this value from configuration.
+ private long requestTimeoutMs = 5000L;
/**
* A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -480,13 +539,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG));
- this.subscriptions = new SubscriptionState();
+ OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
+ this.subscriptions = new SubscriptionState(offsetResetStrategy);
this.coordinator = new Coordinator(this.client,
config.getString(ConsumerConfig.GROUP_ID_CONFIG),
- this.retryBackoffMs,
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
- this.metadata,
this.subscriptions,
metrics,
metricGrpPrefix,
@@ -508,12 +566,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.valueDeserializer = valueDeserializer;
}
this.fetcher = new Fetcher<K, V>(this.client,
- this.retryBackoffMs,
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
- config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(),
this.keyDeserializer,
this.valueDeserializer,
this.metadata,
@@ -542,8 +598,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* then this will give the set of topics currently assigned to the consumer (which may be none if the assignment
* hasn't happened yet, or the partitions are in the process of getting reassigned).
*/
- public synchronized Set<TopicPartition> subscriptions() {
- return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
+ public Set<TopicPartition> subscriptions() {
+ acquire();
+ try {
+ return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
+ } finally {
+ release();
+ }
}
/**
@@ -561,12 +622,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @param topics A variable list of topics that the consumer wants to subscribe to
*/
@Override
- public synchronized void subscribe(String... topics) {
- ensureNotClosed();
- log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
- for (String topic : topics)
- this.subscriptions.subscribe(topic);
- metadata.addTopics(topics);
+ public void subscribe(String... topics) {
+ acquire();
+ try {
+ log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
+ for (String topic : topics)
+ this.subscriptions.subscribe(topic);
+ metadata.addTopics(topics);
+ } finally {
+ release();
+ }
}
/**
@@ -574,16 +639,20 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
* metadata change.
* <p>
- *
+ *
* @param partitions Partitions to incrementally subscribe to
*/
@Override
- public synchronized void subscribe(TopicPartition... partitions) {
- ensureNotClosed();
- log.debug("Subscribed to partitions(s): {}", Utils.join(partitions, ", "));
- for (TopicPartition tp : partitions) {
- this.subscriptions.subscribe(tp);
- metadata.addTopics(tp.topic());
+ public void subscribe(TopicPartition... partitions) {
+ acquire();
+ try {
+ log.debug("Subscribed to partitions(s): {}", Utils.join(partitions, ", "));
+ for (TopicPartition tp : partitions) {
+ this.subscriptions.subscribe(tp);
+ metadata.addTopics(tp.topic());
+ }
+ } finally {
+ release();
}
}
@@ -593,12 +662,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*
* @param topics Topics to unsubscribe from
*/
- public synchronized void unsubscribe(String... topics) {
- ensureNotClosed();
- log.debug("Unsubscribed from topic(s): {}", Utils.join(topics, ", "));
- // throw an exception if the topic was never subscribed to
- for (String topic : topics)
- this.subscriptions.unsubscribe(topic);
+ public void unsubscribe(String... topics) {
+ acquire();
+ try {
+ log.debug("Unsubscribed from topic(s): {}", Utils.join(topics, ", "));
+ // throw an exception if the topic was never subscribed to
+ for (String topic : topics)
+ this.subscriptions.unsubscribe(topic);
+ } finally {
+ release();
+ }
}
/**
@@ -607,12 +680,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*
* @param partitions Partitions to unsubscribe from
*/
- public synchronized void unsubscribe(TopicPartition... partitions) {
- ensureNotClosed();
- log.debug("Unsubscribed from partitions(s): {}", Utils.join(partitions, ", "));
- // throw an exception if the partition was never subscribed to
- for (TopicPartition partition : partitions)
- this.subscriptions.unsubscribe(partition);
+ public void unsubscribe(TopicPartition... partitions) {
+ acquire();
+ try {
+ log.debug("Unsubscribed from partitions(s): {}", Utils.join(partitions, ", "));
+ // throw an exception if the partition was never subscribed to
+ for (TopicPartition partition : partitions)
+ this.subscriptions.unsubscribe(partition);
+ } finally {
+ release();
+ }
}
/**
@@ -624,17 +701,65 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed
* offset using {@link #commit(Map, CommitType) commit(offsets, sync)} for the subscribed list of partitions.
*
- * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits
- * indefinitely. Must not be negative
+ * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns
+ * immediately with any records available now. Must not be negative.
* @return map of topic to records since the last fetch for the subscribed list of topics and partitions
*
* @throws NoOffsetForPartitionException If there is no stored offset for a subscribed partition and no automatic
* offset reset policy has been configured.
*/
@Override
- public synchronized ConsumerRecords<K, V> poll(long timeout) {
- ensureNotClosed();
- long now = time.milliseconds();
+ public ConsumerRecords<K, V> poll(long timeout) {
+ acquire();
+ try {
+ if (timeout < 0)
+ throw new IllegalArgumentException("Timeout must not be negative");
+
+ // Poll for new data until the timeout expires
+ long remaining = timeout;
+ while (remaining >= 0) {
+ long start = time.milliseconds();
+ long pollTimeout = min(remaining, timeToNextCommit(start), coordinator.timeToNextHeartbeat(start));
+
+ Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(pollTimeout, start);
+ long end = time.milliseconds();
+
+ if (!records.isEmpty()) {
+ // If data is available, then return it, but first send off the
+ // next round of fetches to enable pipelining while the user is
+ // handling the fetched records.
+ fetcher.initFetches(metadata.fetch(), end);
+ pollClient(0, end);
+ return new ConsumerRecords<K, V>(records);
+ }
+
+ remaining -= end - start;
+
+ // Nothing was available, so we should backoff before retrying
+ if (remaining > 0) {
+ Utils.sleep(min(remaining, retryBackoffMs));
+ remaining -= time.milliseconds() - end;
+ }
+ }
+
+ return ConsumerRecords.empty();
+ } finally {
+ release();
+ }
+ }
+
+
+ /**
+ * Do one round of polling. In addition to checking for new data, this does any needed
+ * heart-beating, auto-commits, and offset updates.
+ * @param timeout The maximum time to block in the underlying poll
+ * @param now Current time in millis
+ * @return The fetched records (may be empty)
+ */
+ private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout, long now) {
+ Cluster cluster = this.metadata.fetch();
+
+ // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
if (subscriptions.partitionsAutoAssigned()) {
if (subscriptions.partitionAssignmentNeeded()) {
@@ -649,26 +774,18 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
if (!subscriptions.hasAllFetchPositions())
- updateFetchPositions(this.subscriptions.missingFetchPositions(), now);
+ updateFetchPositions(this.subscriptions.missingFetchPositions());
// maybe autocommit position
if (shouldAutoCommit(now))
commit(CommitType.ASYNC);
- /*
- * initiate any needed fetches, then block for the timeout the user specified
- */
- Cluster cluster = this.metadata.fetch();
+ // Init any new fetches (won't resend pending fetches)
fetcher.initFetches(cluster, now);
- client.poll(timeout, now);
- /*
- * initiate a fetch request for any nodes that we just got a response from without blocking
- */
- fetcher.initFetches(cluster, now);
- client.poll(0, now);
+ pollClient(timeout, now);
- return new ConsumerRecords<K, V>(fetcher.fetchedRecords());
+ return fetcher.fetchedRecords();
}
/**
@@ -686,18 +803,20 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @param commitType Control whether the commit is blocking
*/
@Override
- public synchronized void commit(final Map<TopicPartition, Long> offsets, CommitType commitType) {
- ensureNotClosed();
- log.debug("Committing offsets ({}): {} ", commitType.toString().toLowerCase(), offsets);
+ public void commit(final Map<TopicPartition, Long> offsets, CommitType commitType) {
+ acquire();
+ try {
+ log.debug("Committing offsets ({}): {} ", commitType.toString().toLowerCase(), offsets);
- long now = time.milliseconds();
- this.lastCommitAttemptMs = now;
+ this.lastCommitAttemptMs = time.milliseconds();
- // commit the offsets with the coordinator
- boolean syncCommit = commitType.equals(CommitType.SYNC);
- if (!syncCommit)
- this.subscriptions.needRefreshCommits();
- coordinator.commitOffsets(offsets, syncCommit, now);
+ // commit the offsets with the coordinator
+ if (commitType == CommitType.ASYNC)
+ this.subscriptions.needRefreshCommits();
+ commitOffsets(offsets, commitType);
+ } finally {
+ release();
+ }
}
/**
@@ -710,9 +829,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @param commitType Whether or not the commit should block until it is acknowledged.
*/
@Override
- public synchronized void commit(CommitType commitType) {
- ensureNotClosed();
- commit(this.subscriptions.allConsumed(), commitType);
+ public void commit(CommitType commitType) {
+ acquire();
+ try {
+ commit(this.subscriptions.allConsumed(), commitType);
+ } finally {
+ release();
+ }
}
/**
@@ -721,35 +844,43 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
*/
@Override
- public synchronized void seek(TopicPartition partition, long offset) {
- ensureNotClosed();
- log.debug("Seeking to offset {} for partition {}", offset, partition);
- this.subscriptions.seek(partition, offset);
+ public void seek(TopicPartition partition, long offset) {
+ acquire();
+ try {
+ log.debug("Seeking to offset {} for partition {}", offset, partition);
+ this.subscriptions.seek(partition, offset);
+ } finally {
+ release();
+ }
}
/**
* Seek to the first offset for each of the given partitions
*/
- public synchronized void seekToBeginning(TopicPartition... partitions) {
- ensureNotClosed();
- Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
- : Arrays.asList(partitions);
- for (TopicPartition tp : parts) {
- // TODO: list offset call could be optimized by grouping by node
- seek(tp, fetcher.offsetBefore(tp, EARLIEST_OFFSET_TIMESTAMP));
+ public void seekToBeginning(TopicPartition... partitions) {
+ acquire();
+ try {
+ Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
+ : Arrays.asList(partitions);
+ for (TopicPartition tp : parts)
+ subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
+ } finally {
+ release();
}
}
/**
* Seek to the last offset for each of the given partitions
*/
- public synchronized void seekToEnd(TopicPartition... partitions) {
- ensureNotClosed();
- Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
- : Arrays.asList(partitions);
- for (TopicPartition tp : parts) {
- // TODO: list offset call could be optimized by grouping by node
- seek(tp, fetcher.offsetBefore(tp, LATEST_OFFSET_TIMESTAMP));
+ public void seekToEnd(TopicPartition... partitions) {
+ acquire();
+ try {
+ Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
+ : Arrays.asList(partitions);
+ for (TopicPartition tp : parts)
+ subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
+ } finally {
+ release();
}
}
@@ -761,16 +892,20 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws NoOffsetForPartitionException If a position hasn't been set for a given partition, and no reset policy is
* available.
*/
- public synchronized long position(TopicPartition partition) {
- ensureNotClosed();
- if (!this.subscriptions.assignedPartitions().contains(partition))
- throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
- Long offset = this.subscriptions.consumed(partition);
- if (offset == null) {
- updateFetchPositions(Collections.singleton(partition), time.milliseconds());
- return this.subscriptions.consumed(partition);
- } else {
- return offset;
+ public long position(TopicPartition partition) {
+ acquire();
+ try {
+ if (!this.subscriptions.assignedPartitions().contains(partition))
+ throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
+ Long offset = this.subscriptions.consumed(partition);
+ if (offset == null) {
+ updateFetchPositions(Collections.singleton(partition));
+ return this.subscriptions.consumed(partition);
+ } else {
+ return offset;
+ }
+ } finally {
+ release();
}
}
@@ -787,22 +922,26 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* partition.
*/
@Override
- public synchronized long committed(TopicPartition partition) {
- ensureNotClosed();
- Set<TopicPartition> partitionsToFetch;
- if (subscriptions.assignedPartitions().contains(partition)) {
+ public long committed(TopicPartition partition) {
+ acquire();
+ try {
+ Set<TopicPartition> partitionsToFetch;
+ if (subscriptions.assignedPartitions().contains(partition)) {
+ Long committed = this.subscriptions.committed(partition);
+ if (committed != null)
+ return committed;
+ partitionsToFetch = subscriptions.assignedPartitions();
+ } else {
+ partitionsToFetch = Collections.singleton(partition);
+ }
+ refreshCommittedOffsets(partitionsToFetch);
Long committed = this.subscriptions.committed(partition);
- if (committed != null)
- return committed;
- partitionsToFetch = subscriptions.assignedPartitions();
- } else {
- partitionsToFetch = Collections.singleton(partition);
+ if (committed == null)
+ throw new NoOffsetForPartitionException("No offset has been committed for partition " + partition);
+ return committed;
+ } finally {
+ release();
}
- refreshCommittedOffsets(partitionsToFetch, time.milliseconds());
- Long committed = this.subscriptions.committed(partition);
- if (committed == null)
- throw new NoOffsetForPartitionException("No offset has been committed for partition " + partition);
- return committed;
}
/**
@@ -822,19 +961,41 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public List<PartitionInfo> partitionsFor(String topic) {
- Cluster cluster = this.metadata.fetch();
- List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
- if (parts == null) {
- metadata.add(topic);
- awaitMetadataUpdate();
- parts = metadata.fetch().partitionsForTopic(topic);
+ acquire();
+ try {
+ Cluster cluster = this.metadata.fetch();
+ List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
+ if (parts == null) {
+ metadata.add(topic);
+ awaitMetadataUpdate();
+ parts = metadata.fetch().partitionsForTopic(topic);
+ }
+ return parts;
+ } finally {
+ release();
}
- return parts;
}
@Override
- public synchronized void close() {
- close(false);
+ public void close() {
+ if (closed) return;
+
+ acquire();
+ try {
+ close(false);
+ } finally {
+ release();
+ }
+ }
+
+ /**
+ * Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll.
+ * The thread which is blocking in an operation will throw {@link ConsumerWakeupException}.
+ */
+ @Override
+ public void wakeup() {
+ this.wakeup.set(true);
+ this.client.wakeup();
}
private void close(boolean swallowException) {
@@ -856,6 +1017,15 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
return this.autoCommit && this.lastCommitAttemptMs <= now - this.autoCommitIntervalMs;
}
+ private long timeToNextCommit(long now) {
+ if (!this.autoCommit)
+ return Long.MAX_VALUE;
+ long timeSinceLastCommit = now - this.lastCommitAttemptMs;
+ if (timeSinceLastCommit > this.autoCommitIntervalMs)
+ return 0;
+ return this.autoCommitIntervalMs - timeSinceLastCommit;
+ }
+
/**
* Request a metadata update and wait until it has occurred
*/
@@ -863,7 +1033,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
int version = this.metadata.requestUpdate();
do {
long now = time.milliseconds();
- this.client.poll(this.retryBackoffMs, now);
+ this.pollClient(this.retryBackoffMs, now);
} while (this.metadata.version() == version);
}
@@ -881,8 +1051,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
// get new assigned partitions from the coordinator
- this.subscriptions.changePartitionAssignment(coordinator.assignPartitions(
- new ArrayList<String>(this.subscriptions.subscribedTopics()), now));
+ assignPartitions();
// execute the user's callback after rebalance
log.debug("Setting newly assigned partitions {}", this.subscriptions.assignedPartitions());
@@ -899,25 +1068,73 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* or reset it using the offset reset policy the user has configured.
*
* @param partitions The partitions that needs updating fetch positions
- * @param now The current time
* @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
* defined
*/
- private void updateFetchPositions(Set<TopicPartition> partitions, long now) {
+ private void updateFetchPositions(Set<TopicPartition> partitions) {
// first refresh the committed positions in case they are not up-to-date
- refreshCommittedOffsets(partitions, now);
+ refreshCommittedOffsets(partitions);
// reset the fetch position to the committed position
for (TopicPartition tp : partitions) {
- if (subscriptions.fetched(tp) == null) {
- if (subscriptions.committed(tp) == null) {
- // if the committed position is unknown reset the position
- fetcher.resetOffset(tp);
- } else {
- log.debug("Resetting offset for partition {} to the committed offset {}",
- tp, subscriptions.committed(tp));
- subscriptions.seek(tp, subscriptions.committed(tp));
- }
+ // Skip if we already have a fetch position
+ if (subscriptions.fetched(tp) != null)
+ continue;
+
+ // TODO: If there are several offsets to reset, we could submit offset requests in parallel
+ if (subscriptions.isOffsetResetNeeded(tp)) {
+ resetOffset(tp);
+ } else if (subscriptions.committed(tp) == null) {
+ // There's no committed position, so we need to reset with the default strategy
+ subscriptions.needOffsetReset(tp);
+ resetOffset(tp);
+ } else {
+ log.debug("Resetting offset for partition {} to the committed offset {}",
+ tp, subscriptions.committed(tp));
+ subscriptions.seek(tp, subscriptions.committed(tp));
+ }
+ }
+ }
+
+ /**
+ * Reset offsets for the given partition using the offset reset strategy.
+ *
+ * @param partition The given partition that needs reset offset
+ * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined
+ */
+ private void resetOffset(TopicPartition partition) {
+ OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
+ final long timestamp;
+ if (strategy == OffsetResetStrategy.EARLIEST)
+ timestamp = EARLIEST_OFFSET_TIMESTAMP;
+ else if (strategy == OffsetResetStrategy.LATEST)
+ timestamp = LATEST_OFFSET_TIMESTAMP;
+ else
+ throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
+
+ log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase());
+ long offset = listOffset(partition, timestamp);
+ this.subscriptions.seek(partition, offset);
+ }
+
+ /**
+ * Fetch a single offset before the given timestamp for the partition.
+ *
+ * @param partition The partition that needs fetching offset.
+ * @param timestamp The timestamp for fetching offset.
+ * @return The offset of the message that is published before the given timestamp
+ */
+ private long listOffset(TopicPartition partition, long timestamp) {
+ while (true) {
+ RequestFuture<Long> future = fetcher.listOffset(partition, timestamp);
+
+ if (!future.isDone())
+ pollFuture(future, requestTimeoutMs);
+
+ if (future.isDone()) {
+ if (future.succeeded())
+ return future.value();
+ handleRequestFailure(future);
}
}
}
@@ -925,13 +1142,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
/**
* Refresh the committed offsets for given set of partitions and update the cache
*/
- private void refreshCommittedOffsets(Set<TopicPartition> partitions, long now) {
+ private void refreshCommittedOffsets(Set<TopicPartition> partitions) {
// we only need to fetch latest committed offset from coordinator if there
// is some commit process in progress, otherwise our current
// committed cache is up-to-date
if (subscriptions.refreshCommitsNeeded()) {
// contact coordinator to fetch committed offsets
- Map<TopicPartition, Long> offsets = coordinator.fetchOffsets(partitions, now);
+ Map<TopicPartition, Long> offsets = fetchCommittedOffsets(partitions);
// update the position with the offsets
for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
@@ -941,6 +1158,183 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
}
+ /**
+ * Block until we have received a partition assignment from the coordinator.
+ */
+ private void assignPartitions() {
+ // Ensure that there are no pending requests to the coordinator. This is important
+ // in particular to avoid resending a pending JoinGroup request.
+ awaitCoordinatorInFlightRequests();
+
+ while (subscriptions.partitionAssignmentNeeded()) {
+ RequestFuture<Void> future = coordinator.assignPartitions(time.milliseconds());
+
+ // Block indefinitely for the join group request (which can take as long as a session timeout)
+ if (!future.isDone())
+ pollFuture(future);
+
+ if (future.failed())
+ handleRequestFailure(future);
+ }
+ }
+
+ /**
+ * Block until the coordinator for this group is known.
+ */
+ private void ensureCoordinatorKnown() {
+ while (coordinator.coordinatorUnknown()) {
+ RequestFuture<Void> future = coordinator.discoverConsumerCoordinator();
+
+ if (!future.isDone())
+ pollFuture(future, requestTimeoutMs);
+
+ if (future.failed())
+ handleRequestFailure(future);
+ }
+ }
+
+ /**
+ * Block until any pending requests to the coordinator have been handled.
+ */
+ public void awaitCoordinatorInFlightRequests() {
+ while (coordinator.hasInFlightRequests()) {
+ long now = time.milliseconds();
+ pollClient(-1, now);
+ }
+ }
+
+ /**
+ * Lookup the committed offsets for a set of partitions. This will block until the coordinator has
+ * responded to the offset fetch request.
+ * @param partitions List of partitions to get offsets for
+ * @return Map from partition to its respective offset
+ */
+ private Map<TopicPartition, Long> fetchCommittedOffsets(Set<TopicPartition> partitions) {
+ while (true) {
+ long now = time.milliseconds();
+ RequestFuture<Map<TopicPartition, Long>> future = coordinator.fetchOffsets(partitions, now);
+
+ if (!future.isDone())
+ pollFuture(future, requestTimeoutMs);
+
+ if (future.isDone()) {
+ if (future.succeeded())
+ return future.value();
+ handleRequestFailure(future);
+ }
+ }
+ }
+
+ /**
+ * Commit offsets. This call blocks (regardless of commitType) until the coordinator
+ * can receive the commit request. Once the request has been made, however, only the
+ * synchronous commits will wait for a successful response from the coordinator.
+ * @param offsets Offsets to commit.
+ * @param commitType Commit policy
+ */
+ private void commitOffsets(Map<TopicPartition, Long> offsets, CommitType commitType) {
+ if (commitType == CommitType.ASYNC) {
+ commitOffsetsAsync(offsets);
+ } else {
+ commitOffsetsSync(offsets);
+ }
+ }
+
+ private void commitOffsetsAsync(Map<TopicPartition, Long> offsets) {
+ while (true) {
+ long now = time.milliseconds();
+ RequestFuture<Void> future = coordinator.commitOffsets(offsets, now);
+
+ if (!future.isDone() || future.succeeded())
+ return;
+
+ handleRequestFailure(future);
+ }
+ }
+
+ private void commitOffsetsSync(Map<TopicPartition, Long> offsets) {
+ while (true) {
+ long now = time.milliseconds();
+ RequestFuture<Void> future = coordinator.commitOffsets(offsets, now);
+
+ if (!future.isDone())
+ pollFuture(future, requestTimeoutMs);
+
+ if (future.isDone()) {
+ if (future.succeeded())
+ return;
+ else
+ handleRequestFailure(future);
+ }
+ }
+ }
+
+ private void handleRequestFailure(RequestFuture<?> future) {
+ if (future.hasException())
+ throw future.exception();
+
+ switch (future.retryAction()) {
+ case BACKOFF:
+ Utils.sleep(retryBackoffMs);
+ break;
+ case POLL:
+ pollClient(retryBackoffMs, time.milliseconds());
+ break;
+ case FIND_COORDINATOR:
+ ensureCoordinatorKnown();
+ break;
+ case REFRESH_METADATA:
+ awaitMetadataUpdate();
+ break;
+ case NOOP:
+ // Do nothing (retry now)
+ }
+ }
+
+ /**
+ * Poll until a result is ready or timeout expires
+ * @param future The future to poll for
+ * @param timeout The time in milliseconds to wait for the result
+ */
+ private void pollFuture(RequestFuture<?> future, long timeout) {
+ // TODO: Update this code for KAFKA-2120, which adds request timeout to NetworkClient
+ // In particular, we must ensure that "timed out" requests will not have their callbacks
+ // invoked at a later time.
+ long remaining = timeout;
+ while (!future.isDone() && remaining >= 0) {
+ long start = time.milliseconds();
+ pollClient(remaining, start);
+ if (future.isDone()) return;
+ remaining -= time.milliseconds() - start;
+ }
+ }
+
+ /**
+ * Poll indefinitely until the result is ready.
+ * @param future The future to poll for.
+ */
+ private void pollFuture(RequestFuture<?> future) {
+ while (!future.isDone()) {
+ long now = time.milliseconds();
+ pollClient(-1, now);
+ }
+ }
+
+ /**
+ * Poll for IO.
+ * @param timeout The maximum time to wait for IO to become available
+ * @param now The current time in milliseconds
+ * @throws ConsumerWakeupException if {@link #wakeup()} is invoked while the poll is active
+ */
+ private void pollClient(long timeout, long now) {
+ this.client.poll(timeout, now);
+
+ if (wakeup.get()) {
+ wakeup.set(false);
+ throw new ConsumerWakeupException();
+ }
+ }
+
/*
* Check that the consumer hasn't been closed.
*/
@@ -948,4 +1342,27 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
if (this.closed)
throw new IllegalStateException("This consumer has already been closed.");
}
+
+ /**
+ * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
+ * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
+ * supported).
+ * @throws IllegalStateException if the consumer has been closed
+ * @throws ConcurrentModificationException if another thread already has the lock
+ */
+ private void acquire() {
+ ensureNotClosed();
+ Long threadId = Thread.currentThread().getId();
+ if (!threadId.equals(currentThread.get()) && !currentThread.compareAndSet(null, threadId))
+ throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
+ refcount++;
+ }
+
+ /**
+ * Release the light lock protecting the consumer from multi-threaded access.
+ */
+ private void release() {
+ if (--refcount == 0)
+ currentThread.set(null);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index f50da82..46e26a6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -40,8 +40,8 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
private Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
private boolean closed;
- public MockConsumer() {
- this.subscriptions = new SubscriptionState();
+ public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
+ this.subscriptions = new SubscriptionState(offsetResetStrategy);
this.partitions = new HashMap<String, List<PartitionInfo>>();
this.records = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
this.closed = false;
@@ -175,6 +175,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
this.closed = true;
}
+ @Override
+ public void wakeup() {
+
+ }
+
private void ensureNotClosed() {
if (this.closed)
throw new IllegalStateException("This consumer has already been closed.");
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
new file mode 100644
index 0000000..542da7f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
@@ -0,0 +1,17 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+public enum OffsetResetStrategy {
+ LATEST, EARLIEST, NONE
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index 41cb945..6c26667 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -15,7 +15,6 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
-import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
@@ -57,7 +56,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
- * This class manage the coordination process with the consumer coordinator.
+ * This class manages the coordination process with the consumer coordinator.
*/
public final class Coordinator {
@@ -67,13 +66,11 @@ public final class Coordinator {
private final Time time;
private final String groupId;
- private final Metadata metadata;
private final Heartbeat heartbeat;
private final int sessionTimeoutMs;
private final String assignmentStrategy;
private final SubscriptionState subscriptions;
private final CoordinatorMetrics sensors;
- private final long retryBackoffMs;
private Node consumerCoordinator;
private String consumerId;
private int generation;
@@ -83,10 +80,8 @@ public final class Coordinator {
*/
public Coordinator(KafkaClient client,
String groupId,
- long retryBackoffMs,
int sessionTimeoutMs,
String assignmentStrategy,
- Metadata metadata,
SubscriptionState subscriptions,
Metrics metrics,
String metricGrpPrefix,
@@ -98,10 +93,8 @@ public final class Coordinator {
this.generation = -1;
this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
this.groupId = groupId;
- this.metadata = metadata;
this.consumerCoordinator = null;
this.subscriptions = subscriptions;
- this.retryBackoffMs = retryBackoffMs;
this.sessionTimeoutMs = sessionTimeoutMs;
this.assignmentStrategy = assignmentStrategy;
this.heartbeat = new Heartbeat(this.sessionTimeoutMs, time.milliseconds());
@@ -109,84 +102,110 @@ public final class Coordinator {
}
/**
- * Assign partitions for the subscribed topics.
- *
- * @param subscribedTopics The subscribed topics list
- * @param now The current time
- * @return The assigned partition info
+ * Send a request to get a new partition assignment. This is a non-blocking call which sends
+ * a JoinGroup request to the coordinator (if it is available). The returned future must
+ * be polled to see if the request completed successfully.
+ * @param now The current time in milliseconds
+ * @return A request future whose completion indicates the result of the JoinGroup request.
*/
- public List<TopicPartition> assignPartitions(List<String> subscribedTopics, long now) {
+ public RequestFuture<Void> assignPartitions(final long now) {
+ final RequestFuture<Void> future = newCoordinatorRequestFuture(now);
+ if (future.isDone()) return future;
// send a join group request to the coordinator
+ List<String> subscribedTopics = new ArrayList<String>(subscriptions.subscribedTopics());
log.debug("(Re-)joining group {} with subscribed topics {}", groupId, subscribedTopics);
- // repeat processing the response until succeed or fatal error
- do {
- JoinGroupRequest request = new JoinGroupRequest(groupId,
+ JoinGroupRequest request = new JoinGroupRequest(groupId,
this.sessionTimeoutMs,
subscribedTopics,
this.consumerId,
this.assignmentStrategy);
- ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.JOIN_GROUP, request.toStruct(), null, now);
- JoinGroupResponse response = new JoinGroupResponse(resp.responseBody());
- short errorCode = response.errorCode();
+ // create the request for the coordinator
+ log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.consumerCoordinator.id());
+
+ RequestCompletionHandler completionHandler = new RequestCompletionHandler() {
+ @Override
+ public void onComplete(ClientResponse resp) {
+ handleJoinResponse(resp, future);
+ }
+ };
+
+ sendCoordinator(ApiKeys.JOIN_GROUP, request.toStruct(), completionHandler, now);
+ return future;
+ }
+
+ private void handleJoinResponse(ClientResponse response, RequestFuture<Void> future) {
+ if (response.wasDisconnected()) {
+ handleCoordinatorDisconnect(response);
+ future.retryWithNewCoordinator();
+ } else {
+ // process the response
+ JoinGroupResponse joinResponse = new JoinGroupResponse(response.responseBody());
+ short errorCode = joinResponse.errorCode();
if (errorCode == Errors.NONE.code()) {
- this.consumerId = response.consumerId();
- this.generation = response.generationId();
+ Coordinator.this.consumerId = joinResponse.consumerId();
+ Coordinator.this.generation = joinResponse.generationId();
// set the flag to refresh last committed offsets
- this.subscriptions.needRefreshCommits();
+ subscriptions.needRefreshCommits();
log.debug("Joined group: {}", response);
// record re-assignment time
- this.sensors.partitionReassignments.record(time.milliseconds() - now);
+ this.sensors.partitionReassignments.record(response.requestLatencyMs());
- // return assigned partitions
- return response.assignedPartitions();
+ // update partition assignment
+ subscriptions.changePartitionAssignment(joinResponse.assignedPartitions());
+ future.complete(null);
} else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()) {
// reset the consumer id and retry immediately
- this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
+ Coordinator.this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
log.info("Attempt to join group {} failed due to unknown consumer id, resetting and retrying.",
- groupId);
+ groupId);
+
+ future.retryNow();
} else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
|| errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
// re-discover the coordinator and retry with backoff
coordinatorDead();
- Utils.sleep(this.retryBackoffMs);
-
log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.",
- groupId);
+ groupId);
+ future.retryWithNewCoordinator();
} else if (errorCode == Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code()
|| errorCode == Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code()
|| errorCode == Errors.INVALID_SESSION_TIMEOUT.code()) {
// log the error and re-throw the exception
+ KafkaException e = Errors.forCode(errorCode).exception();
log.error("Attempt to join group {} failed due to: {}",
- groupId, Errors.forCode(errorCode).exception().getMessage());
- Errors.forCode(errorCode).maybeThrow();
+ groupId, e.getMessage());
+ future.raise(e);
} else {
// unexpected error, throw the exception
- throw new KafkaException("Unexpected error in join group response: "
- + Errors.forCode(response.errorCode()).exception().getMessage());
+ future.raise(new KafkaException("Unexpected error in join group response: "
+ + Errors.forCode(joinResponse.errorCode()).exception().getMessage()));
}
- } while (true);
+ }
}
/**
- * Commit offsets for the specified list of topics and partitions.
- *
- * A non-blocking commit will attempt to commit offsets asychronously. No error will be thrown if the commit fails.
- * A blocking commit will wait for a response acknowledging the commit. In the event of an error it will retry until
- * the commit succeeds.
+ * Commit offsets for the specified list of topics and partitions. This is a non-blocking call
+ * which returns a request future that can be polled in the case of a synchronous commit or ignored in the
+ * asynchronous case.
*
* @param offsets The list of offsets per partition that should be committed.
- * @param blocking Control whether the commit is blocking
* @param now The current time
+ * @return A request future whose value indicates whether the commit was successful or not
*/
- public void commitOffsets(final Map<TopicPartition, Long> offsets, boolean blocking, long now) {
- if (!offsets.isEmpty()) {
+ public RequestFuture<Void> commitOffsets(final Map<TopicPartition, Long> offsets, long now) {
+ final RequestFuture<Void> future = newCoordinatorRequestFuture(now);
+ if (future.isDone()) return future;
+
+ if (offsets.isEmpty()) {
+ future.complete(null);
+ } else {
// create the offset commit request
Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData;
offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
@@ -198,52 +217,63 @@ public final class Coordinator {
OffsetCommitRequest.DEFAULT_RETENTION_TIME,
offsetData);
- // send request and possibly wait for response if it is blocking
- RequestCompletionHandler handler = new CommitOffsetCompletionHandler(offsets);
+ RequestCompletionHandler handler = new CommitOffsetCompletionHandler(offsets, future);
+ sendCoordinator(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now);
+ }
- if (blocking) {
- boolean done;
- do {
- ClientResponse response = blockingCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now);
+ return future;
+ }
- // check for errors
- done = true;
- OffsetCommitResponse commitResponse = new OffsetCommitResponse(response.responseBody());
- for (short errorCode : commitResponse.responseData().values()) {
- if (errorCode != Errors.NONE.code())
- done = false;
- }
- if (!done) {
- log.debug("Error in offset commit, backing off for {} ms before retrying again.",
- this.retryBackoffMs);
- Utils.sleep(this.retryBackoffMs);
- }
- } while (!done);
- } else {
- this.client.send(initiateCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now));
- }
+ private <T> RequestFuture<T> newCoordinatorRequestFuture(long now) {
+ if (coordinatorUnknown())
+ return RequestFuture.newCoordinatorNeeded();
+
+ if (client.ready(this.consumerCoordinator, now))
+ // We have an open connection and we're ready to send
+ return new RequestFuture<T>();
+
+ if (this.client.connectionFailed(this.consumerCoordinator)) {
+ coordinatorDead();
+ return RequestFuture.newCoordinatorNeeded();
}
+
+ // The connection has been initiated, so we need to poll to finish it
+ return RequestFuture.pollNeeded();
}
/**
- * Fetch the committed offsets of the given set of partitions.
+ * Fetch the committed offsets for a set of partitions. This is a non-blocking call. The
+ * returned future can be polled to get the actual offsets returned from the broker.
*
- * @param partitions The list of partitions which need to ask for committed offsets
- * @param now The current time
- * @return The fetched offset values
+ * @param partitions The set of partitions to get offsets for.
+ * @param now The current time in milliseconds
+ * @return A request future containing the committed offsets.
*/
- public Map<TopicPartition, Long> fetchOffsets(Set<TopicPartition> partitions, long now) {
- log.debug("Fetching committed offsets for partitions: " + Utils.join(partitions, ", "));
-
- while (true) {
- // construct the request
- OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
+ public RequestFuture<Map<TopicPartition, Long>> fetchOffsets(Set<TopicPartition> partitions, long now) {
+ final RequestFuture<Map<TopicPartition, Long>> future = newCoordinatorRequestFuture(now);
+ if (future.isDone()) return future;
- // send the request and block on waiting for response
- ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.OFFSET_FETCH, request.toStruct(), null, now);
+ log.debug("Fetching committed offsets for partitions: " + Utils.join(partitions, ", "));
+ // construct the request
+ OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
+
+ // send the request with a callback
+ RequestCompletionHandler completionHandler = new RequestCompletionHandler() {
+ @Override
+ public void onComplete(ClientResponse resp) {
+ handleOffsetResponse(resp, future);
+ }
+ };
+ sendCoordinator(ApiKeys.OFFSET_FETCH, request.toStruct(), completionHandler, now);
+ return future;
+ }
+ private void handleOffsetResponse(ClientResponse resp, RequestFuture<Map<TopicPartition, Long>> future) {
+ if (resp.wasDisconnected()) {
+ handleCoordinatorDisconnect(resp);
+ future.retryWithNewCoordinator();
+ } else {
// parse the response to get the offsets
- boolean offsetsReady = true;
OffsetFetchResponse response = new OffsetFetchResponse(resp.responseBody());
Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(response.responseData().size());
for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
@@ -251,23 +281,21 @@ public final class Coordinator {
OffsetFetchResponse.PartitionData data = entry.getValue();
if (data.hasError()) {
log.debug("Error fetching offset for topic-partition {}: {}", tp, Errors.forCode(data.errorCode)
- .exception()
- .getMessage());
+ .exception()
+ .getMessage());
if (data.errorCode == Errors.OFFSET_LOAD_IN_PROGRESS.code()) {
// just retry
- offsetsReady = false;
- Utils.sleep(this.retryBackoffMs);
+ future.retryAfterBackoff();
} else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
// re-discover the coordinator and retry
coordinatorDead();
- offsetsReady = false;
- Utils.sleep(this.retryBackoffMs);
+ future.retryWithNewCoordinator();
} else if (data.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
// just ignore this partition
log.debug("Unknown topic or partition for " + tp);
} else {
- throw new KafkaException("Unexpected error in fetch offset response: "
- + Errors.forCode(data.errorCode).exception().getMessage());
+ future.raise(new KafkaException("Unexpected error in fetch offset response: "
+ + Errors.forCode(data.errorCode).exception().getMessage()));
}
} else if (data.offset >= 0) {
// record the position with the offset (-1 indicates no committed offset to fetch)
@@ -277,8 +305,8 @@ public final class Coordinator {
}
}
- if (offsetsReady)
- return offsets;
+ if (!future.isDone())
+ future.complete(offsets);
}
}
@@ -288,124 +316,105 @@ public final class Coordinator {
* @param now The current time
*/
public void maybeHeartbeat(long now) {
- if (heartbeat.shouldHeartbeat(now)) {
+ if (heartbeat.shouldHeartbeat(now) && coordinatorReady(now)) {
HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.consumerId);
- this.client.send(initiateCoordinatorRequest(ApiKeys.HEARTBEAT, req.toStruct(), new HeartbeatCompletionHandler(), now));
+ sendCoordinator(ApiKeys.HEARTBEAT, req.toStruct(), new HeartbeatCompletionHandler(), now);
this.heartbeat.sentHeartbeat(now);
}
}
- public boolean coordinatorUnknown() {
- return this.consumerCoordinator == null;
- }
-
/**
- * Repeatedly attempt to send a request to the coordinator until a response is received (retry if we are
- * disconnected). Note that this means any requests sent this way must be idempotent.
- *
- * @return The response
+ * Get the time until the next heartbeat is needed.
+ * @param now The current time
+ * @return The duration in milliseconds before the next heartbeat will be needed.
*/
- private ClientResponse blockingCoordinatorRequest(ApiKeys api,
- Struct request,
- RequestCompletionHandler handler,
- long now) {
- while (true) {
- ClientRequest coordinatorRequest = initiateCoordinatorRequest(api, request, handler, now);
- ClientResponse coordinatorResponse = sendAndReceive(coordinatorRequest, now);
- if (coordinatorResponse.wasDisconnected()) {
- handleCoordinatorDisconnect(coordinatorResponse);
- Utils.sleep(this.retryBackoffMs);
- } else {
- return coordinatorResponse;
- }
- }
+ public long timeToNextHeartbeat(long now) {
+ return heartbeat.timeToNextHeartbeat(now);
}
/**
- * Ensure the consumer coordinator is known and we have a ready connection to it.
+ * Check whether the coordinator has any in-flight requests.
+ * @return true if the coordinator has pending requests.
*/
- private void ensureCoordinatorReady() {
- while (true) {
- if (this.consumerCoordinator == null)
- discoverCoordinator();
-
- while (true) {
- boolean ready = this.client.ready(this.consumerCoordinator, time.milliseconds());
- if (ready) {
- return;
- } else {
- log.debug("No connection to coordinator, attempting to connect.");
- this.client.poll(this.retryBackoffMs, time.milliseconds());
+ public boolean hasInFlightRequests() {
+ return !coordinatorUnknown() && client.inFlightRequestCount(consumerCoordinator.idString()) > 0;
+ }
- // if the coordinator connection has failed, we need to
- // break the inner loop to re-discover the coordinator
- if (this.client.connectionFailed(this.consumerCoordinator)) {
- log.debug("Coordinator connection failed. Attempting to re-discover.");
- coordinatorDead();
- break;
- }
- }
- }
- }
+ public boolean coordinatorUnknown() {
+ return this.consumerCoordinator == null;
}
- /**
- * Mark the current coordinator as dead.
- */
- private void coordinatorDead() {
- if (this.consumerCoordinator != null) {
- log.info("Marking the coordinator {} dead.", this.consumerCoordinator.id());
- this.consumerCoordinator = null;
- }
+ private boolean coordinatorReady(long now) {
+ return !coordinatorUnknown() && this.client.ready(this.consumerCoordinator, now);
}
/**
- * Keep discovering the consumer coordinator until it is found.
+ * Discover the current coordinator for the consumer group. Sends a ConsumerMetadata request to
+ * one of the brokers. The returned future should be polled to get the result of the request.
+ * @return A request future which indicates the completion of the metadata request
*/
- private void discoverCoordinator() {
- while (this.consumerCoordinator == null) {
- log.debug("No coordinator known, attempting to discover one.");
- Node coordinator = fetchConsumerCoordinator();
-
- if (coordinator == null) {
- log.debug("No coordinator found, backing off.");
- Utils.sleep(this.retryBackoffMs);
+ public RequestFuture<Void> discoverConsumerCoordinator() {
+ // initiate the consumer metadata request
+ // find a node to ask about the coordinator
+ long now = time.milliseconds();
+ Node node = this.client.leastLoadedNode(now);
+
+ if (node == null) {
+ return RequestFuture.metadataRefreshNeeded();
+ } else if (!this.client.ready(node, now)) {
+ if (this.client.connectionFailed(node)) {
+ return RequestFuture.metadataRefreshNeeded();
} else {
- log.debug("Found coordinator: " + coordinator);
- this.consumerCoordinator = coordinator;
+ return RequestFuture.pollNeeded();
}
+ } else {
+ final RequestFuture<Void> future = new RequestFuture<Void>();
+
+ // create a consumer metadata request
+ log.debug("Issuing consumer metadata request to broker {}", node.id());
+ ConsumerMetadataRequest metadataRequest = new ConsumerMetadataRequest(this.groupId);
+ RequestCompletionHandler completionHandler = new RequestCompletionHandler() {
+ @Override
+ public void onComplete(ClientResponse resp) {
+ handleConsumerMetadataResponse(resp, future);
+ }
+ };
+ send(node, ApiKeys.CONSUMER_METADATA, metadataRequest.toStruct(), completionHandler, now);
+ return future;
}
}
- /**
- * Get the current consumer coordinator information via consumer metadata request.
- *
- * @return the consumer coordinator node
- */
- private Node fetchConsumerCoordinator() {
-
- // initiate the consumer metadata request
- ClientRequest request = initiateConsumerMetadataRequest();
-
- // send the request and wait for its response
- ClientResponse response = sendAndReceive(request, request.createdTime());
+ private void handleConsumerMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {
+ log.debug("Consumer metadata response {}", resp);
// parse the response to get the coordinator info if it is not disconnected,
// otherwise we need to request metadata update
- if (!response.wasDisconnected()) {
- ConsumerMetadataResponse consumerMetadataResponse = new ConsumerMetadataResponse(response.responseBody());
+ if (resp.wasDisconnected()) {
+ future.retryAfterMetadataRefresh();
+ } else {
+ ConsumerMetadataResponse consumerMetadataResponse = new ConsumerMetadataResponse(resp.responseBody());
// use MAX_VALUE - node.id as the coordinator id to mimic separate connections
// for the coordinator in the underlying network client layer
// TODO: this needs to be better handled in KAFKA-1935
- if (consumerMetadataResponse.errorCode() == Errors.NONE.code())
- return new Node(Integer.MAX_VALUE - consumerMetadataResponse.node().id(),
- consumerMetadataResponse.node().host(),
- consumerMetadataResponse.node().port());
- } else {
- this.metadata.requestUpdate();
+ if (consumerMetadataResponse.errorCode() == Errors.NONE.code()) {
+ this.consumerCoordinator = new Node(Integer.MAX_VALUE - consumerMetadataResponse.node().id(),
+ consumerMetadataResponse.node().host(),
+ consumerMetadataResponse.node().port());
+ future.complete(null);
+ } else {
+ future.retryAfterBackoff();
+ }
}
+ }
- return null;
+ /**
+ * Mark the current coordinator as dead.
+ */
+ private void coordinatorDead() {
+ if (this.consumerCoordinator != null) {
+ log.info("Marking the coordinator {} dead.", this.consumerCoordinator.id());
+ this.consumerCoordinator = null;
+ }
}
/**
@@ -414,79 +423,23 @@ public final class Coordinator {
private void handleCoordinatorDisconnect(ClientResponse response) {
int correlation = response.request().request().header().correlationId();
log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected",
- response.request(),
- correlation,
- response.request().request().destination());
+ response.request(),
+ correlation,
+ response.request().request().destination());
// mark the coordinator as dead
coordinatorDead();
}
- /**
- * Initiate a consumer metadata request to the least loaded node.
- *
- * @return The created request
- */
- private ClientRequest initiateConsumerMetadataRequest() {
- // find a node to ask about the coordinator
- Node node = this.client.leastLoadedNode(time.milliseconds());
- while (node == null || !this.client.ready(node, time.milliseconds())) {
- long now = time.milliseconds();
- this.client.poll(this.retryBackoffMs, now);
- node = this.client.leastLoadedNode(now);
-
- // if there is no ready node, backoff before retry
- if (node == null)
- Utils.sleep(this.retryBackoffMs);
- }
-
- // create a consumer metadata request
- log.debug("Issuing consumer metadata request to broker {}", node.id());
-
- ConsumerMetadataRequest request = new ConsumerMetadataRequest(this.groupId);
- RequestSend send = new RequestSend(node.idString(),
- this.client.nextRequestHeader(ApiKeys.CONSUMER_METADATA),
- request.toStruct());
- long now = time.milliseconds();
- return new ClientRequest(now, true, send, null);
+ private void sendCoordinator(ApiKeys api, Struct request, RequestCompletionHandler handler, long now) {
+ send(this.consumerCoordinator, api, request, handler, now);
}
- /**
- * Initiate a request to the coordinator.
- */
- private ClientRequest initiateCoordinatorRequest(ApiKeys api, Struct request, RequestCompletionHandler handler, long now) {
-
- // first make sure the coordinator is known and ready
- ensureCoordinatorReady();
-
- // create the request for the coordinator
- log.debug("Issuing request ({}: {}) to coordinator {}", api, request, this.consumerCoordinator.id());
-
+ private void send(Node node, ApiKeys api, Struct request, RequestCompletionHandler handler, long now) {
RequestHeader header = this.client.nextRequestHeader(api);
- RequestSend send = new RequestSend(this.consumerCoordinator.idString(), header, request);
- return new ClientRequest(now, true, send, handler);
- }
-
- /**
- * Attempt to send a request and receive its response.
- *
- * @return The response
- */
- private ClientResponse sendAndReceive(ClientRequest clientRequest, long now) {
-
- // send the request
- this.client.send(clientRequest);
-
- // drain all responses from the destination node
- List<ClientResponse> responses = this.client.completeAll(clientRequest.request().destination(), now);
- if (responses.isEmpty()) {
- throw new IllegalStateException("This should not happen.");
- } else {
- // other requests should be handled by the callback, and
- // we only care about the response of the last request
- return responses.get(responses.size() - 1);
- }
+ RequestSend send = new RequestSend(node.idString(), header, request);
+ this.client.send(new ClientRequest(now, true, send, handler));
}
private class HeartbeatCompletionHandler implements RequestCompletionHandler {
@@ -521,18 +474,21 @@ public final class Coordinator {
private class CommitOffsetCompletionHandler implements RequestCompletionHandler {
private final Map<TopicPartition, Long> offsets;
+ private final RequestFuture<Void> future;
- public CommitOffsetCompletionHandler(Map<TopicPartition, Long> offsets) {
+ public CommitOffsetCompletionHandler(Map<TopicPartition, Long> offsets, RequestFuture<Void> future) {
this.offsets = offsets;
+ this.future = future;
}
@Override
public void onComplete(ClientResponse resp) {
if (resp.wasDisconnected()) {
handleCoordinatorDisconnect(resp);
+ future.retryWithNewCoordinator();
} else {
- OffsetCommitResponse response = new OffsetCommitResponse(resp.responseBody());
- for (Map.Entry<TopicPartition, Short> entry : response.responseData().entrySet()) {
+ OffsetCommitResponse commitResponse = new OffsetCommitResponse(resp.responseBody());
+ for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
TopicPartition tp = entry.getKey();
short errorCode = entry.getValue();
long offset = this.offsets.get(tp);
@@ -542,14 +498,19 @@ public final class Coordinator {
} else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
|| errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
coordinatorDead();
+ future.retryWithNewCoordinator();
} else {
// do not need to throw the exception but just log the error
+ future.retryAfterBackoff();
log.error("Error committing partition {} at offset {}: {}",
tp,
offset,
Errors.forCode(errorCode).exception().getMessage());
}
}
+
+ if (!future.isDone())
+ future.complete(null);
}
sensors.commitLatency.record(resp.requestLatencyMs());
}