You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/07/15 21:40:46 UTC
[3/3] kafka git commit: KAFKA-2123: add callback in commit api and
use a delayed queue for async requests;
reviewed by Ewen Cheslack-Postava and Guozhang Wang
KAFKA-2123: add callback in commit api and use a delayed queue for async requests; reviewed by Ewen Cheslack-Postava and Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/99c0686b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/99c0686b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/99c0686b
Branch: refs/heads/trunk
Commit: 99c0686be2141a0fffe1c55e279370a87ef8c1ea
Parents: a7e0ac3
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Jul 15 12:38:45 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jul 15 12:38:45 2015 -0700
----------------------------------------------------------------------
.../apache/kafka/clients/consumer/Consumer.java | 10 +
.../consumer/ConsumerCommitCallback.java | 33 +
.../consumer/ConsumerRebalanceCallback.java | 6 +-
.../kafka/clients/consumer/ConsumerRecords.java | 4 +
.../kafka/clients/consumer/KafkaConsumer.java | 512 ++++----------
.../kafka/clients/consumer/MockConsumer.java | 19 +-
.../internals/ConsumerNetworkClient.java | 296 +++++++++
.../clients/consumer/internals/Coordinator.java | 660 ++++++++++++-------
.../clients/consumer/internals/DelayedTask.java | 24 +
.../consumer/internals/DelayedTaskQueue.java | 96 +++
.../clients/consumer/internals/Fetcher.java | 182 +++--
.../clients/consumer/internals/Heartbeat.java | 27 +-
.../internals/NoAvailableBrokersException.java | 23 +
.../consumer/internals/RequestFuture.java | 219 +++---
.../internals/RequestFutureAdapter.java | 28 +
.../internals/RequestFutureListener.java | 23 +
.../consumer/internals/SendFailedException.java | 27 +
.../internals/StaleMetadataException.java | 22 +
.../consumer/internals/SubscriptionState.java | 5 +-
...onsumerCoordinatorNotAvailableException.java | 40 ++
.../common/errors/DisconnectException.java | 39 ++
.../errors/IllegalGenerationException.java | 33 +
.../NotCoordinatorForConsumerException.java | 40 ++
.../errors/OffsetLoadInProgressException.java | 40 ++
.../errors/UnknownConsumerIdException.java | 33 +
.../apache/kafka/common/protocol/Errors.java | 10 +-
.../internals/ConsumerNetworkClientTest.java | 125 ++++
.../consumer/internals/CoordinatorTest.java | 479 +++++++++-----
.../internals/DelayedTaskQueueTest.java | 89 +++
.../clients/consumer/internals/FetcherTest.java | 37 +-
.../consumer/internals/HeartbeatTest.java | 15 +
.../consumer/internals/RequestFutureTest.java | 57 ++
.../integration/kafka/api/ConsumerTest.scala | 81 ++-
33 files changed, 2350 insertions(+), 984 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index fd98740..252b759 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -64,11 +64,21 @@ public interface Consumer<K, V> extends Closeable {
public void commit(CommitType commitType);
/**
+ * @see KafkaConsumer#commit(CommitType, ConsumerCommitCallback)
+ */
+ public void commit(CommitType commitType, ConsumerCommitCallback callback);
+
+ /**
* @see KafkaConsumer#commit(Map, CommitType)
*/
public void commit(Map<TopicPartition, Long> offsets, CommitType commitType);
/**
+ * @see KafkaConsumer#commit(Map, CommitType, ConsumerCommitCallback)
+ */
+ public void commit(Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback);
+
+ /**
* @see KafkaConsumer#seek(TopicPartition, long)
*/
public void seek(TopicPartition partition, long offset);
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
new file mode 100644
index 0000000..f084385
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+
+/**
+ * A callback interface that the user can implement to trigger custom actions when a commit request completes. The callback
+ * may be executed in any thread calling {@link Consumer#poll(long) poll()}.
+ */
+public interface ConsumerCommitCallback {
+
+ /**
+ * A callback method the user can implement to provide asynchronous handling of commit request completion.
+ * This method will be called when the commit request sent to the server has been acknowledged.
+ *
+ * @param offsets A map of the offsets that this callback applies to
+ * @param exception The exception thrown during processing of the request, or null if the commit completed successfully
+ */
+ void onComplete(Map<TopicPartition, Long> offsets, Exception exception);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
index 74dfdba..ff3f50f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
@@ -73,7 +73,8 @@ public interface ConsumerRebalanceCallback {
* It is guaranteed that all the processes in a consumer group will execute their
* {@link #onPartitionsRevoked(Consumer, Collection)} callback before any instance executes its
* {@link #onPartitionsAssigned(Consumer, Collection)} callback.
- *
+ *
+ * @param consumer Reference to the consumer for convenience
* @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously
* assigned to the consumer)
*/
@@ -86,7 +87,8 @@ public interface ConsumerRebalanceCallback {
* custom offset store to prevent duplicate data
* <p>
* For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
- *
+ *
+ * @param consumer Reference to the consumer for convenience
* @param partitions The list of partitions that were assigned to the consumer on the last rebalance
*/
public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index eb75d2e..16a8357 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -105,6 +105,10 @@ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
}
}
+ public boolean isEmpty() {
+ return records.isEmpty();
+ }
+
@SuppressWarnings("unchecked")
public static <K, V> ConsumerRecords<K, V> empty() {
return (ConsumerRecords<K, V>) EMPTY;
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index b4e8f7f..9f64255 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -15,9 +15,10 @@ package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Coordinator;
+import org.apache.kafka.clients.consumer.internals.DelayedTask;
import org.apache.kafka.clients.consumer.internals.Fetcher;
-import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
@@ -49,7 +50,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -394,8 +394,6 @@ import static org.apache.kafka.common.utils.Utils.min;
public class KafkaConsumer<K, V> implements Consumer<K, V> {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
- private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
- private static final long LATEST_OFFSET_TIMESTAMP = -1L;
private static final long NO_CURRENT_THREAD = -1L;
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
@@ -405,17 +403,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final Fetcher<K, V> fetcher;
private final Time time;
- private final NetworkClient client;
+ private final ConsumerNetworkClient client;
private final Metrics metrics;
private final SubscriptionState subscriptions;
private final Metadata metadata;
private final long retryBackoffMs;
private final boolean autoCommit;
private final long autoCommitIntervalMs;
- private final ConsumerRebalanceCallback rebalanceCallback;
- private long lastCommitAttemptMs;
private boolean closed = false;
- private final AtomicBoolean wakeup = new AtomicBoolean(false);
// currentThread holds the threadId of the current thread accessing KafkaConsumer
// and is used to prevent multi-threaded access
@@ -507,14 +502,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
try {
log.debug("Starting the Kafka consumer");
if (callback == null)
- this.rebalanceCallback = config.getConfiguredInstance(ConsumerConfig.CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
+ callback = config.getConfiguredInstance(ConsumerConfig.CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
ConsumerRebalanceCallback.class);
- else
- this.rebalanceCallback = callback;
this.time = new SystemTime();
this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
- this.lastCommitAttemptMs = time.milliseconds();
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
@@ -535,7 +527,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
String metricGrpPrefix = "consumer";
Map<String, String> metricsTags = new LinkedHashMap<String, String>();
metricsTags.put("client-id", clientId);
- this.client = new NetworkClient(
+ NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags),
this.metadata,
clientId,
@@ -543,6 +535,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG));
+ this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
this.subscriptions = new SubscriptionState(offsetResetStrategy);
this.coordinator = new Coordinator(this.client,
@@ -553,8 +546,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
metrics,
metricGrpPrefix,
metricsTags,
- this.time);
-
+ this.time,
+ requestTimeoutMs,
+ retryBackoffMs,
+ wrapRebalanceCallback(callback));
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
@@ -581,10 +576,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
metrics,
metricGrpPrefix,
metricsTags,
- this.time);
+ this.time,
+ this.retryBackoffMs);
config.logUnused();
+ if (autoCommit)
+ scheduleAutoCommitTask(autoCommitIntervalMs);
+
log.debug("Kafka consumer created");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
@@ -719,27 +718,25 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative");
- // Poll for new data until the timeout expires
+ // poll for new data until the timeout expires
long remaining = timeout;
while (remaining >= 0) {
long start = time.milliseconds();
- long pollTimeout = min(remaining, timeToNextCommit(start), coordinator.timeToNextHeartbeat(start));
-
- Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(pollTimeout, start);
+ Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
long end = time.milliseconds();
if (!records.isEmpty()) {
- // If data is available, then return it, but first send off the
+ // if data is available, then return it, but first send off the
// next round of fetches to enable pipelining while the user is
// handling the fetched records.
- fetcher.initFetches(metadata.fetch(), end);
- pollClient(0, end);
+ fetcher.initFetches(metadata.fetch());
+ client.poll(0);
return new ConsumerRecords<K, V>(records);
}
remaining -= end - start;
- // Nothing was available, so we should backoff before retrying
+ // nothing was available, so we should backoff before retrying
if (remaining > 0) {
Utils.sleep(min(remaining, retryBackoffMs));
remaining -= time.milliseconds() - end;
@@ -752,46 +749,42 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
}
-
/**
* Do one round of polling. In addition to checking for new data, this does any needed
* heart-beating, auto-commits, and offset updates.
* @param timeout The maximum time to block in the underlying poll
- * @param now Current time in millis
* @return The fetched records (may be empty)
*/
- private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout, long now) {
- Cluster cluster = this.metadata.fetch();
-
+ private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
// TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
+ coordinator.ensureCoordinatorKnown();
- if (subscriptions.partitionsAutoAssigned()) {
- if (subscriptions.partitionAssignmentNeeded()) {
- // rebalance to get partition assignment
- reassignPartitions(now);
- } else {
- // try to heartbeat with the coordinator if needed
- coordinator.maybeHeartbeat(now);
- }
- }
+ // ensure we have partitions assigned if we expect to
+ if (subscriptions.partitionsAutoAssigned())
+ coordinator.ensurePartitionAssignment();
// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
- // maybe autocommit position
- if (shouldAutoCommit(now))
- commit(CommitType.ASYNC);
-
- // Init any new fetches (won't resend pending fetches)
- fetcher.initFetches(cluster, now);
-
- pollClient(timeout, now);
-
+ // init any new fetches (won't resend pending fetches)
+ Cluster cluster = this.metadata.fetch();
+ fetcher.initFetches(cluster);
+ client.poll(timeout);
return fetcher.fetchedRecords();
}
+ private void scheduleAutoCommitTask(final long interval) {
+ DelayedTask task = new DelayedTask() {
+ public void run(long now) {
+ commit(CommitType.ASYNC);
+ client.schedule(this, now + interval);
+ }
+ };
+ client.schedule(task, time.milliseconds() + interval);
+ }
+
/**
* Commits the specified offsets for the specified list of topics and partitions to Kafka.
* <p>
@@ -799,25 +792,42 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used.
* <p>
- * A non-blocking commit will attempt to commit offsets asynchronously. No error will be thrown if the commit fails.
- * A blocking commit will wait for a response acknowledging the commit. In the event of an error it will retry until
- * the commit succeeds.
- *
+ * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
+ * commit are silently discarded. If you need to determine the result of an asynchronous commit, you should use
+ * {@link #commit(Map, CommitType, ConsumerCommitCallback)}. Synchronous commits (i.e. {@link CommitType#SYNC})
+ * block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown
+ * to the caller).
+ *
* @param offsets The list of offsets per partition that should be committed to Kafka.
* @param commitType Control whether the commit is blocking
*/
@Override
public void commit(final Map<TopicPartition, Long> offsets, CommitType commitType) {
+ commit(offsets, commitType, null);
+ }
+
+ /**
+ * Commits the specified offsets for the specified list of topics and partitions to Kafka.
+ * <p>
+ * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
+ * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
+ * should not be used.
+ * <p>
+ * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
+ * commit are either passed to the callback (if provided) or silently discarded. Synchronous commits (i.e.
+ * {@link CommitType#SYNC}) block until either the commit succeeds or an unrecoverable error is encountered. In
+ * this case, the error is either passed to the callback (if provided) or thrown to the caller.
+ *
+ * @param offsets The list of offsets per partition that should be committed to Kafka.
+ * @param commitType Control whether the commit is blocking
+ * @param callback Callback to invoke when the commit completes
+ */
+ @Override
+ public void commit(final Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback) {
acquire();
try {
log.debug("Committing offsets ({}): {} ", commitType.toString().toLowerCase(), offsets);
-
- this.lastCommitAttemptMs = time.milliseconds();
-
- // commit the offsets with the coordinator
- if (commitType == CommitType.ASYNC)
- this.subscriptions.needRefreshCommits();
- commitOffsets(offsets, commitType);
+ coordinator.commitOffsets(offsets, commitType, callback);
} finally {
release();
}
@@ -829,22 +839,48 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
* every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used.
- *
+ * <p>
+ * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
+ * commit are either passed to the callback (if provided) or silently discarded. Synchronous commits (i.e.
+ * {@link CommitType#SYNC}) block until either the commit succeeds or an unrecoverable error is encountered. In
+ * this case, the error is either passed to the callback (if provided) or thrown to the caller.
+ *
* @param commitType Whether or not the commit should block until it is acknowledged.
+ * @param callback Callback to invoke when the commit completes
*/
@Override
- public void commit(CommitType commitType) {
+ public void commit(CommitType commitType, ConsumerCommitCallback callback) {
acquire();
try {
- // Need defensive copy to ensure offsets are not removed before completion (e.g. in rebalance)
+ // need defensive copy to ensure offsets are not removed before completion (e.g. in rebalance)
Map<TopicPartition, Long> allConsumed = new HashMap<TopicPartition, Long>(this.subscriptions.allConsumed());
- commit(allConsumed, commitType);
+ commit(allConsumed, commitType, callback);
} finally {
release();
}
}
/**
+ * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
+ * <p>
+ * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
+ * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
+ * should not be used.
+ * <p>
+ * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
+ * commit are silently discarded. If you need to determine the result of an asynchronous commit, you should use
+ * {@link #commit(CommitType, ConsumerCommitCallback)}. Synchronous commits (i.e. {@link CommitType#SYNC})
+ * block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown
+ * to the caller).
+ *
+ * @param commitType Whether or not the commit should block until it is acknowledged.
+ */
+ @Override
+ public void commit(CommitType commitType) {
+ commit(commitType, null);
+ }
+
+ /**
* Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API
* is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
* you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
@@ -868,8 +904,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
try {
Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
: Arrays.asList(partitions);
- for (TopicPartition tp : parts)
+ for (TopicPartition tp : parts) {
+ log.debug("Seeking to beginning of partition {}", tp);
subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
+ }
} finally {
release();
}
@@ -883,8 +921,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
try {
Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
: Arrays.asList(partitions);
- for (TopicPartition tp : parts)
+ for (TopicPartition tp : parts) {
+ log.debug("Seeking to end of partition {}", tp);
subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
+ }
} finally {
release();
}
@@ -931,19 +971,21 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public long committed(TopicPartition partition) {
acquire();
try {
- Set<TopicPartition> partitionsToFetch;
+ Long committed;
if (subscriptions.assignedPartitions().contains(partition)) {
- Long committed = this.subscriptions.committed(partition);
- if (committed != null)
- return committed;
- partitionsToFetch = subscriptions.assignedPartitions();
+ committed = this.subscriptions.committed(partition);
+ if (committed == null) {
+ coordinator.refreshCommittedOffsetsIfNeeded();
+ committed = this.subscriptions.committed(partition);
+ }
} else {
- partitionsToFetch = Collections.singleton(partition);
+ Map<TopicPartition, Long> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition));
+ committed = offsets.get(partition);
}
- refreshCommittedOffsets(partitionsToFetch);
- Long committed = this.subscriptions.committed(partition);
+
if (committed == null)
throw new NoOffsetForPartitionException("No offset has been committed for partition " + partition);
+
return committed;
} finally {
release();
@@ -973,7 +1015,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
if (parts == null) {
metadata.add(topic);
- awaitMetadataUpdate();
+ client.awaitMetadataUpdate();
parts = metadata.fetch().partitionsForTopic(topic);
}
return parts;
@@ -999,7 +1041,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void wakeup() {
- this.wakeup.set(true);
this.client.wakeup();
}
@@ -1017,55 +1058,18 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
}
+ private Coordinator.RebalanceCallback wrapRebalanceCallback(final ConsumerRebalanceCallback callback) {
+ return new Coordinator.RebalanceCallback() {
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+ callback.onPartitionsAssigned(KafkaConsumer.this, partitions);
+ }
- private boolean shouldAutoCommit(long now) {
- return this.autoCommit && this.lastCommitAttemptMs <= now - this.autoCommitIntervalMs;
- }
-
- private long timeToNextCommit(long now) {
- if (!this.autoCommit)
- return Long.MAX_VALUE;
- long timeSinceLastCommit = now - this.lastCommitAttemptMs;
- if (timeSinceLastCommit > this.autoCommitIntervalMs)
- return 0;
- return this.autoCommitIntervalMs - timeSinceLastCommit;
- }
-
- /**
- * Request a metadata update and wait until it has occurred
- */
- private void awaitMetadataUpdate() {
- int version = this.metadata.requestUpdate();
- do {
- long now = time.milliseconds();
- this.pollClient(this.retryBackoffMs, now);
- } while (this.metadata.version() == version);
- }
-
- /**
- * Get partition assignment
- */
- private void reassignPartitions(long now) {
- // execute the user's callback before rebalance
- log.debug("Revoking previously assigned partitions {}", this.subscriptions.assignedPartitions());
- try {
- this.rebalanceCallback.onPartitionsRevoked(this, this.subscriptions.assignedPartitions());
- } catch (Exception e) {
- log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
- + " failed on partition revocation: ", e);
- }
-
- // get new assigned partitions from the coordinator
- assignPartitions();
-
- // execute the user's callback after rebalance
- log.debug("Setting newly assigned partitions {}", this.subscriptions.assignedPartitions());
- try {
- this.rebalanceCallback.onPartitionsAssigned(this, this.subscriptions.assignedPartitions());
- } catch (Exception e) {
- log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
- + " failed on partition assignment: ", e);
- }
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ callback.onPartitionsRevoked(KafkaConsumer.this, partitions);
+ }
+ };
}
/**
@@ -1077,267 +1081,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* defined
*/
private void updateFetchPositions(Set<TopicPartition> partitions) {
- // first refresh the committed positions in case they are not up-to-date
- refreshCommittedOffsets(partitions);
-
- // reset the fetch position to the committed position
- for (TopicPartition tp : partitions) {
- // Skip if we already have a fetch position
- if (subscriptions.fetched(tp) != null)
- continue;
-
- // TODO: If there are several offsets to reset, we could submit offset requests in parallel
- if (subscriptions.isOffsetResetNeeded(tp)) {
- resetOffset(tp);
- } else if (subscriptions.committed(tp) == null) {
- // There's no committed position, so we need to reset with the default strategy
- subscriptions.needOffsetReset(tp);
- resetOffset(tp);
- } else {
- log.debug("Resetting offset for partition {} to the committed offset {}",
- tp, subscriptions.committed(tp));
- subscriptions.seek(tp, subscriptions.committed(tp));
- }
- }
- }
+ // refresh commits for all assigned partitions
+ coordinator.refreshCommittedOffsetsIfNeeded();
- /**
- * Reset offsets for the given partition using the offset reset strategy.
- *
- * @param partition The given partition that needs reset offset
- * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined
- */
- private void resetOffset(TopicPartition partition) {
- OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
- final long timestamp;
- if (strategy == OffsetResetStrategy.EARLIEST)
- timestamp = EARLIEST_OFFSET_TIMESTAMP;
- else if (strategy == OffsetResetStrategy.LATEST)
- timestamp = LATEST_OFFSET_TIMESTAMP;
- else
- throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
-
- log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase());
- long offset = listOffset(partition, timestamp);
- this.subscriptions.seek(partition, offset);
- }
-
- /**
- * Fetch a single offset before the given timestamp for the partition.
- *
- * @param partition The partition that needs fetching offset.
- * @param timestamp The timestamp for fetching offset.
- * @return The offset of the message that is published before the given timestamp
- */
- private long listOffset(TopicPartition partition, long timestamp) {
- while (true) {
- RequestFuture<Long> future = fetcher.listOffset(partition, timestamp);
-
- if (!future.isDone())
- pollFuture(future, requestTimeoutMs);
-
- if (future.isDone()) {
- if (future.succeeded())
- return future.value();
- handleRequestFailure(future);
- }
- }
- }
-
- /**
- * Refresh the committed offsets for given set of partitions and update the cache
- */
- private void refreshCommittedOffsets(Set<TopicPartition> partitions) {
- // we only need to fetch latest committed offset from coordinator if there
- // is some commit process in progress, otherwise our current
- // committed cache is up-to-date
- if (subscriptions.refreshCommitsNeeded()) {
- // contact coordinator to fetch committed offsets
- Map<TopicPartition, Long> offsets = fetchCommittedOffsets(partitions);
-
- // update the position with the offsets
- for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
- TopicPartition tp = entry.getKey();
- this.subscriptions.committed(tp, entry.getValue());
- }
- }
- }
-
- /**
- * Block until we have received a partition assignment from the coordinator.
- */
- private void assignPartitions() {
- // Ensure that there are no pending requests to the coordinator. This is important
- // in particular to avoid resending a pending JoinGroup request.
- awaitCoordinatorInFlightRequests();
-
- while (subscriptions.partitionAssignmentNeeded()) {
- RequestFuture<Void> future = coordinator.assignPartitions(time.milliseconds());
-
- // Block indefinitely for the join group request (which can take as long as a session timeout)
- if (!future.isDone())
- pollFuture(future);
-
- if (future.failed())
- handleRequestFailure(future);
- }
- }
-
- /**
- * Block until the coordinator for this group is known.
- */
- private void ensureCoordinatorKnown() {
- while (coordinator.coordinatorUnknown()) {
- RequestFuture<Void> future = coordinator.discoverConsumerCoordinator();
-
- if (!future.isDone())
- pollFuture(future, requestTimeoutMs);
-
- if (future.failed())
- handleRequestFailure(future);
- }
- }
-
- /**
- * Block until any pending requests to the coordinator have been handled.
- */
- public void awaitCoordinatorInFlightRequests() {
- while (coordinator.hasInFlightRequests()) {
- long now = time.milliseconds();
- pollClient(-1, now);
- }
- }
-
- /**
- * Lookup the committed offsets for a set of partitions. This will block until the coordinator has
- * responded to the offset fetch request.
- * @param partitions List of partitions to get offsets for
- * @return Map from partition to its respective offset
- */
- private Map<TopicPartition, Long> fetchCommittedOffsets(Set<TopicPartition> partitions) {
- while (true) {
- long now = time.milliseconds();
- RequestFuture<Map<TopicPartition, Long>> future = coordinator.fetchOffsets(partitions, now);
-
- if (!future.isDone())
- pollFuture(future, requestTimeoutMs);
-
- if (future.isDone()) {
- if (future.succeeded())
- return future.value();
- handleRequestFailure(future);
- }
- }
- }
-
- /**
- * Commit offsets. This call blocks (regardless of commitType) until the coordinator
- * can receive the commit request. Once the request has been made, however, only the
- * synchronous commits will wait for a successful response from the coordinator.
- * @param offsets Offsets to commit.
- * @param commitType Commit policy
- */
- private void commitOffsets(Map<TopicPartition, Long> offsets, CommitType commitType) {
- if (commitType == CommitType.ASYNC) {
- commitOffsetsAsync(offsets);
- } else {
- commitOffsetsSync(offsets);
- }
- }
-
- private void commitOffsetsAsync(Map<TopicPartition, Long> offsets) {
- while (true) {
- long now = time.milliseconds();
- RequestFuture<Void> future = coordinator.commitOffsets(offsets, now);
-
- if (!future.isDone() || future.succeeded())
- return;
-
- handleRequestFailure(future);
- }
- }
-
- private void commitOffsetsSync(Map<TopicPartition, Long> offsets) {
- while (true) {
- long now = time.milliseconds();
- RequestFuture<Void> future = coordinator.commitOffsets(offsets, now);
-
- if (!future.isDone())
- pollFuture(future, requestTimeoutMs);
-
- if (future.isDone()) {
- if (future.succeeded())
- return;
- else
- handleRequestFailure(future);
- }
- }
- }
-
- private void handleRequestFailure(RequestFuture<?> future) {
- if (future.hasException())
- throw future.exception();
-
- switch (future.retryAction()) {
- case BACKOFF:
- Utils.sleep(retryBackoffMs);
- break;
- case POLL:
- pollClient(retryBackoffMs, time.milliseconds());
- break;
- case FIND_COORDINATOR:
- ensureCoordinatorKnown();
- break;
- case REFRESH_METADATA:
- awaitMetadataUpdate();
- break;
- case NOOP:
- // Do nothing (retry now)
- }
- }
-
- /**
- * Poll until a result is ready or timeout expires
- * @param future The future to poll for
- * @param timeout The time in milliseconds to wait for the result
- */
- private void pollFuture(RequestFuture<?> future, long timeout) {
- // TODO: Update this code for KAFKA-2120, which adds request timeout to NetworkClient
- // In particular, we must ensure that "timed out" requests will not have their callbacks
- // invoked at a later time.
- long remaining = timeout;
- while (!future.isDone() && remaining >= 0) {
- long start = time.milliseconds();
- pollClient(remaining, start);
- if (future.isDone()) return;
- remaining -= time.milliseconds() - start;
- }
- }
-
- /**
- * Poll indefinitely until the result is ready.
- * @param future The future to poll for.
- */
- private void pollFuture(RequestFuture<?> future) {
- while (!future.isDone()) {
- long now = time.milliseconds();
- pollClient(-1, now);
- }
- }
-
- /**
- * Poll for IO.
- * @param timeout The maximum time to wait for IO to become available
- * @param now The current time in milliseconds
- * @throws ConsumerWakeupException if {@link #wakeup()} is invoked while the poll is active
- */
- private void pollClient(long timeout, long now) {
- this.client.poll(timeout, now);
-
- if (wakeup.get()) {
- wakeup.set(false);
- throw new ConsumerWakeupException();
- }
+ // then do any offset lookups in case some positions are not known
+ fetcher.updateFetchPositions(partitions);
}
/*
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 46e26a6..c14eed1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -106,16 +106,29 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
}
@Override
- public synchronized void commit(Map<TopicPartition, Long> offsets, CommitType commitType) {
+ public synchronized void commit(Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback) {
ensureNotClosed();
for (Entry<TopicPartition, Long> entry : offsets.entrySet())
subscriptions.committed(entry.getKey(), entry.getValue());
+ if (callback != null) {
+ callback.onComplete(offsets, null);
+ }
}
@Override
- public synchronized void commit(CommitType commitType) {
+ public synchronized void commit(Map<TopicPartition, Long> offsets, CommitType commitType) {
+ commit(offsets, commitType, null);
+ }
+
+ @Override
+ public synchronized void commit(CommitType commitType, ConsumerCommitCallback callback) {
ensureNotClosed();
- commit(this.subscriptions.allConsumed(), commitType);
+ commit(this.subscriptions.allConsumed(), commitType, callback);
+ }
+
+ @Override
+ public synchronized void commit(CommitType commitType) {
+ commit(commitType, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/99c0686b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
new file mode 100644
index 0000000..9517d9d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.RequestSend;
+import org.apache.kafka.common.utils.Time;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Higher level consumer access to the network layer with basic support for futures and
+ * task scheduling. NOT thread-safe!
+ *
+ * TODO: The current implementation is simplistic in that it provides a facility for queueing requests
+ * prior to delivery, but it makes no effort to retry requests which cannot be sent at the time
+ * {@link #poll(long)} is called. This makes the behavior of the queue predictable and easy to
+ * understand, but there are opportunities to provide timeout or retry capabilities in the future.
+ * How we do this may depend on KAFKA-2120, so for now, we retain the simplistic behavior.
+ */
+public class ConsumerNetworkClient implements Closeable {
+ private final KafkaClient client;
+ private final AtomicBoolean wakeup = new AtomicBoolean(false);
+ private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue();
+ private final Map<Node, List<ClientRequest>> unsent = new HashMap<Node, List<ClientRequest>>();
+ private final Metadata metadata;
+ private final Time time;
+ private final long retryBackoffMs;
+
+ public ConsumerNetworkClient(KafkaClient client,
+ Metadata metadata,
+ Time time,
+ long retryBackoffMs) {
+ this.client = client;
+ this.metadata = metadata;
+ this.time = time;
+ this.retryBackoffMs = retryBackoffMs;
+ }
+
+ /**
+ * Schedule a new task to be executed at the given time. This is "best-effort" scheduling and
+ * should only be used for coarse synchronization.
+ * @param task The task to be scheduled
+ * @param at The time it should run
+ */
+ public void schedule(DelayedTask task, long at) {
+ delayedTasks.add(task, at);
+ }
+
+ /**
+ * Unschedule a task. This will remove all instances of the task from the task queue.
+ * This is a no-op if the task is not scheduled.
+ * @param task The task to be unscheduled.
+ */
+ public void unschedule(DelayedTask task) {
+ delayedTasks.remove(task);
+ }
+
+ /**
+ * Send a new request. Note that the request is not actually transmitted on the
+ * network until one of the {@link #poll(long)} variants is invoked. At this
+ * point the request will either be transmitted successfully or will fail.
+ * Use the returned future to obtain the result of the send.
+ * @param node The destination of the request
+ * @param api The Kafka API call
+ * @param request The request payload
+ * @return A future which indicates the result of the send.
+ */
+ public RequestFuture<ClientResponse> send(Node node,
+ ApiKeys api,
+ AbstractRequest request) {
+ long now = time.milliseconds();
+ RequestFutureCompletionHandler future = new RequestFutureCompletionHandler();
+ RequestHeader header = client.nextRequestHeader(api);
+ RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
+ put(node, new ClientRequest(now, true, send, future));
+ return future;
+ }
+
+ private void put(Node node, ClientRequest request) {
+ List<ClientRequest> nodeUnsent = unsent.get(node);
+ if (nodeUnsent == null) {
+ nodeUnsent = new ArrayList<ClientRequest>();
+ unsent.put(node, nodeUnsent);
+ }
+ nodeUnsent.add(request);
+ }
+
+ public Node leastLoadedNode() {
+ return client.leastLoadedNode(time.milliseconds());
+ }
+
+ /**
+ * Block until the metadata has been refreshed.
+ */
+ public void awaitMetadataUpdate() {
+ int version = this.metadata.requestUpdate();
+ do {
+ poll(Long.MAX_VALUE);
+ } while (this.metadata.version() == version);
+ }
+
+ /**
+ * Wakeup an active poll. This will cause the polling thread to throw an exception either
+ * on the current poll if one is active, or the next poll.
+ */
+ public void wakeup() {
+ this.wakeup.set(true);
+ this.client.wakeup();
+ }
+
+ /**
+ * Block indefinitely until the given request future has finished.
+ * @param future The request future to await.
+ * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread
+ */
+ public void poll(RequestFuture<?> future) {
+ while (!future.isDone())
+ poll(Long.MAX_VALUE);
+ }
+
+ /**
+ * Block until the provided request future request has finished or the timeout has expired.
+ * @param future The request future to wait for
+ * @param timeout The maximum duration (in ms) to wait for the request
+ * @return true if the future is done, false otherwise
+ * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread
+ */
+ public boolean poll(RequestFuture<?> future, long timeout) {
+ long now = time.milliseconds();
+ long deadline = now + timeout;
+ while (!future.isDone() && now < deadline) {
+ poll(deadline - now, now);
+ now = time.milliseconds();
+ }
+ return future.isDone();
+ }
+
+ /**
+ * Poll for any network IO. All send requests will either be transmitted on the network
+ * or failed when this call completes.
+ * @param timeout The maximum time to wait for an IO event.
+ * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread
+ */
+ public void poll(long timeout) {
+ poll(timeout, time.milliseconds());
+ }
+
+ private void poll(long timeout, long now) {
+ // send all the requests we can send now
+ pollUnsentRequests(now);
+
+ // ensure we don't poll any longer than the deadline for
+ // the next scheduled task
+ timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
+ clientPoll(timeout, now);
+
+ // execute scheduled tasks
+ now = time.milliseconds();
+ delayedTasks.poll(now);
+
+ // try again to send requests since buffer space may have been
+ // cleared or a connect finished in the poll
+ pollUnsentRequests(now);
+
+ // fail all requests that couldn't be sent
+ clearUnsentRequests(now);
+
+ }
+
+ /**
+ * Block until all pending requests from the given node have finished.
+ * @param node The node to await requests from
+ */
+ public void awaitPendingRequests(Node node) {
+ while (pendingRequestCount(node) > 0)
+ poll(retryBackoffMs);
+ }
+
+ /**
+ * Get the count of pending requests to the given node. This includes both request that
+ * have been transmitted (i.e. in-flight requests) and those which are awaiting transmission.
+ * @param node The node in question
+ * @return The number of pending requests
+ */
+ public int pendingRequestCount(Node node) {
+ List<ClientRequest> pending = unsent.get(node);
+ int unsentCount = pending == null ? 0 : pending.size();
+ return unsentCount + client.inFlightRequestCount(node.idString());
+ }
+
+ /**
+ * Get the total count of pending requests from all nodes. This includes both requests that
+ * have been transmitted (i.e. in-flight requests) and those which are awaiting transmission.
+ * @return The total count of pending requests
+ */
+ public int pendingRequestCount() {
+ int total = 0;
+ for (List<ClientRequest> requests: unsent.values())
+ total += requests.size();
+ return total + client.inFlightRequestCount();
+ }
+
+ private void pollUnsentRequests(long now) {
+ while (trySend(now))
+ clientPoll(0, now);
+ }
+
+ private void clearUnsentRequests(long now) {
+ // clear all unsent requests and fail their corresponding futures
+ for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
+ Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
+ while (iterator.hasNext()) {
+ ClientRequest request = iterator.next();
+ RequestFutureCompletionHandler handler =
+ (RequestFutureCompletionHandler) request.callback();
+ handler.raise(SendFailedException.INSTANCE);
+ iterator.remove();
+ }
+ }
+ unsent.clear();
+ }
+
+ private boolean trySend(long now) {
+ // send any requests that can be sent now
+ boolean requestsSent = false;
+ for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
+ Node node = requestEntry.getKey();
+ Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
+ while (iterator.hasNext()) {
+ ClientRequest request = iterator.next();
+ if (client.ready(node, now)) {
+ client.send(request);
+ iterator.remove();
+ requestsSent = true;
+ } else if (client.connectionFailed(node)) {
+ RequestFutureCompletionHandler handler =
+ (RequestFutureCompletionHandler) request.callback();
+ handler.onComplete(new ClientResponse(request, now, true, null));
+ iterator.remove();
+ }
+ }
+ }
+ return requestsSent;
+ }
+
+ private void clientPoll(long timeout, long now) {
+ client.poll(timeout, now);
+ if (wakeup.get()) {
+ clearUnsentRequests(now);
+ wakeup.set(false);
+ throw new ConsumerWakeupException();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ client.close();
+ }
+
+ public static class RequestFutureCompletionHandler
+ extends RequestFuture<ClientResponse>
+ implements RequestCompletionHandler {
+
+ @Override
+ public void onComplete(ClientResponse response) {
+ complete(response);
+ }
+ }
+}