You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/21 21:08:52 UTC
[8/8] kafka git commit: KAFKA-2464: client-side assignment for new
consumer
KAFKA-2464: client-side assignment for new consumer
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Jiangjie Qin, Onur Karaman, Ewen Cheslack-Postava, Guozhang Wang
Closes #165 from hachikuji/KAFKA-2464
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/86eb74d9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/86eb74d9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/86eb74d9
Branch: refs/heads/trunk
Commit: 86eb74d9236c586af5889fe79f4b9e066c9c2af3
Parents: 6e747d4
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Oct 21 12:13:42 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Oct 21 12:13:42 2015 -0700
----------------------------------------------------------------------
.../kafka/clients/consumer/ConsumerConfig.java | 9 +-
.../kafka/clients/consumer/KafkaConsumer.java | 47 +-
.../kafka/clients/consumer/RangeAssignor.java | 97 ++
.../clients/consumer/RoundRobinAssignor.java | 114 +++
.../consumer/internals/AbstractCoordinator.java | 638 +++++++++++++
.../internals/AbstractPartitionAssignor.java | 90 ++
.../consumer/internals/ConsumerCoordinator.java | 595 ++++++++++++
.../internals/ConsumerNetworkClient.java | 9 +
.../consumer/internals/ConsumerProtocol.java | 162 ++++
.../clients/consumer/internals/Coordinator.java | 848 -----------------
.../clients/consumer/internals/Fetcher.java | 31 +-
.../consumer/internals/PartitionAssignor.java | 117 +++
.../consumer/internals/RequestFuture.java | 16 +-
.../consumer/internals/SubscriptionState.java | 44 +-
.../java/org/apache/kafka/common/Cluster.java | 10 +
...onsumerCoordinatorNotAvailableException.java | 40 -
.../GroupCoordinatorNotAvailableException.java | 40 +
.../NotCoordinatorForConsumerException.java | 40 -
.../errors/NotCoordinatorForGroupException.java | 40 +
.../errors/UnknownConsumerIdException.java | 33 -
.../common/errors/UnknownMemberIdException.java | 33 +
.../apache/kafka/common/protocol/ApiKeys.java | 5 +-
.../apache/kafka/common/protocol/Errors.java | 22 +-
.../apache/kafka/common/protocol/Protocol.java | 97 +-
.../kafka/common/requests/AbstractRequest.java | 6 +-
.../requests/ConsumerMetadataRequest.java | 65 --
.../requests/ConsumerMetadataResponse.java | 70 --
.../common/requests/GroupMetadataRequest.java | 65 ++
.../common/requests/GroupMetadataResponse.java | 70 ++
.../kafka/common/requests/HeartbeatRequest.java | 16 +-
.../common/requests/HeartbeatResponse.java | 6 +-
.../kafka/common/requests/JoinGroupRequest.java | 96 +-
.../common/requests/JoinGroupResponse.java | 110 ++-
.../common/requests/OffsetCommitRequest.java | 34 +-
.../common/requests/OffsetCommitResponse.java | 6 +-
.../common/requests/OffsetFetchResponse.java | 4 +-
.../kafka/common/requests/SyncGroupRequest.java | 118 +++
.../common/requests/SyncGroupResponse.java | 71 ++
.../org/apache/kafka/common/utils/Utils.java | 28 +
.../org/apache/kafka/clients/MetadataTest.java | 1 -
.../clients/consumer/RangeAssignorTest.java | 217 +++++
.../consumer/RoundRobinAssignorTest.java | 209 +++++
.../internals/ConsumerCoordinatorTest.java | 749 +++++++++++++++
.../internals/ConsumerNetworkClientTest.java | 2 +-
.../internals/ConsumerProtocolTest.java | 118 +++
.../consumer/internals/CoordinatorTest.java | 635 -------------
.../clients/consumer/internals/FetcherTest.java | 4 +-
.../internals/MockPartitionAssignor.java | 49 +
.../common/requests/RequestResponseTest.java | 16 +-
.../kafka/copycat/util/KafkaBasedLogTest.java | 2 +-
.../src/main/scala/kafka/admin/AclCommand.scala | 22 +-
.../main/scala/kafka/admin/TopicCommand.scala | 7 +-
.../kafka/api/ConsumerMetadataRequest.scala | 80 --
.../kafka/api/ConsumerMetadataResponse.scala | 58 --
.../scala/kafka/api/GroupMetadataRequest.scala | 80 ++
.../scala/kafka/api/GroupMetadataResponse.scala | 58 ++
.../scala/kafka/api/OffsetCommitRequest.scala | 16 +-
core/src/main/scala/kafka/api/RequestKeys.scala | 5 +-
.../main/scala/kafka/client/ClientUtils.scala | 4 +-
.../kafka/common/OffsetMetadataAndError.scala | 5 +-
core/src/main/scala/kafka/common/Topic.scala | 4 +-
.../scala/kafka/consumer/SimpleConsumer.scala | 4 +-
.../kafka/coordinator/ConsumerCoordinator.scala | 535 -----------
.../coordinator/ConsumerGroupMetadata.scala | 133 ---
.../kafka/coordinator/ConsumerMetadata.scala | 50 -
.../kafka/coordinator/CoordinatorMetadata.scala | 160 +---
.../kafka/coordinator/DelayedHeartbeat.scala | 12 +-
.../scala/kafka/coordinator/DelayedJoin.scala | 40 +
.../kafka/coordinator/DelayedRebalance.scala | 40 -
.../kafka/coordinator/GroupCoordinator.scala | 632 +++++++++++++
.../scala/kafka/coordinator/GroupMetadata.scala | 209 +++++
.../kafka/coordinator/MemberMetadata.scala | 99 ++
.../kafka/coordinator/PartitionAssignor.scala | 125 ---
.../javaapi/ConsumerMetadataResponse.scala | 47 -
.../kafka/javaapi/GroupMetadataResponse.scala | 47 +
.../kafka/security/auth/ResourceType.scala | 6 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 144 +--
.../main/scala/kafka/server/KafkaConfig.scala | 16 +-
.../main/scala/kafka/server/KafkaServer.scala | 6 +-
.../main/scala/kafka/server/OffsetManager.scala | 16 +-
.../kafka/api/BaseConsumerTest.scala | 25 +-
.../kafka/api/ConsumerBounceTest.scala | 10 +-
.../kafka/api/IntegrationTestHarness.scala | 12 +-
.../integration/kafka/api/QuotasTest.scala | 1 -
.../integration/kafka/api/SslConsumerTest.scala | 22 -
.../scala/other/kafka/TestOffsetManager.scala | 4 +-
.../scala/unit/kafka/admin/AclCommandTest.scala | 8 +-
.../unit/kafka/admin/TopicCommandTest.scala | 8 +-
.../api/RequestResponseSerializationTest.scala | 10 +-
.../unit/kafka/consumer/TopicFilterTest.scala | 10 +-
.../ConsumerCoordinatorResponseTest.scala | 447 ---------
.../coordinator/ConsumerGroupMetadataTest.scala | 172 ----
.../coordinator/CoordinatorMetadataTest.scala | 160 +---
.../GroupCoordinatorResponseTest.scala | 907 +++++++++++++++++++
.../kafka/coordinator/GroupMetadataTest.scala | 249 +++++
.../kafka/coordinator/MemberMetadataTest.scala | 90 ++
.../coordinator/PartitionAssignorTest.scala | 305 -------
.../security/auth/SimpleAclAuthorizerTest.scala | 4 +-
.../unit/kafka/server/KafkaConfigTest.scala | 4 +-
.../unit/kafka/server/OffsetCommitTest.scala | 4 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 6 +-
101 files changed, 6634 insertions(+), 4428 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 1894822..5cc0419 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -17,9 +17,9 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.config.SSLConfigs;
import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.serialization.Deserializer;
import java.util.HashMap;
import java.util.Map;
@@ -78,7 +78,7 @@ public class ConsumerConfig extends AbstractConfig {
* <code>partition.assignment.strategy</code>
*/
public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
- private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The friendly name of the partition assignment strategy that the server will use to distribute partition ownership amongst consumer instances when group management is used";
+ private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used";
/**
* <code>auto.offset.reset</code>
@@ -182,9 +182,8 @@ public class ConsumerConfig extends AbstractConfig {
Importance.HIGH,
HEARTBEAT_INTERVAL_MS_DOC)
.define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
- Type.STRING,
- "range",
- in("range", "roundrobin"),
+ Type.LIST,
+ RangeAssignor.class.getName(),
Importance.MEDIUM,
PARTITION_ASSIGNMENT_STRATEGY_DOC)
.define(METADATA_MAX_AGE_CONFIG,
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 2f7f153..cd166f0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -16,9 +16,10 @@ import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
-import org.apache.kafka.clients.consumer.internals.Coordinator;
+import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
@@ -43,7 +44,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -403,7 +403,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private static final String JMX_PREFIX = "kafka.consumer";
private String clientId;
- private final Coordinator coordinator;
+ private final ConsumerCoordinator coordinator;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
private final Fetcher<K, V> fetcher;
@@ -416,7 +416,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final long retryBackoffMs;
private long requestTimeoutMs;
private boolean closed = false;
- private Metadata.Listener metadataListener;
// currentThread holds the threadId of the current thread accessing KafkaConsumer
// and is used to prevent multi-threaded access
@@ -531,11 +530,15 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
this.subscriptions = new SubscriptionState(offsetResetStrategy);
- this.coordinator = new Coordinator(this.client,
+ List<PartitionAssignor> assignors = config.getConfiguredInstances(
+ ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+ PartitionAssignor.class);
+ this.coordinator = new ConsumerCoordinator(this.client,
config.getString(ConsumerConfig.GROUP_ID_CONFIG),
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
- config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
+ assignors,
+ this.metadata,
this.subscriptions,
metrics,
metricGrpPrefix,
@@ -543,7 +546,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.time,
requestTimeoutMs,
retryBackoffMs,
- new Coordinator.DefaultOffsetCommitCallback(),
+ new ConsumerCoordinator.DefaultOffsetCommitCallback(),
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
if (keyDeserializer == null) {
@@ -652,7 +655,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
try {
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
this.subscriptions.subscribe(topics, listener);
- metadata.setTopics(topics);
+ metadata.setTopics(subscriptions.groupSubscription());
} finally {
release();
}
@@ -699,22 +702,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
acquire();
try {
log.debug("Subscribed to pattern: {}", pattern);
- metadataListener = new Metadata.Listener() {
- @Override
- public void onMetadataUpdate(Cluster cluster) {
- final List<String> topicsToSubscribe = new ArrayList<>();
-
- for (String topic : cluster.topics())
- if (subscriptions.getSubscribedPattern().matcher(topic).matches())
- topicsToSubscribe.add(topic);
-
- subscriptions.changeSubscription(topicsToSubscribe);
- metadata.setTopics(topicsToSubscribe);
- }
- };
this.subscriptions.subscribe(pattern, listener);
this.metadata.needMetadataForAllTopics(true);
- this.metadata.addListener(metadataListener);
} finally {
release();
}
@@ -729,7 +718,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.subscriptions.unsubscribe();
this.coordinator.resetGeneration();
this.metadata.needMetadataForAllTopics(false);
- this.metadata.removeListener(metadataListener);
} finally {
release();
}
@@ -1079,12 +1067,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
try {
Cluster cluster = this.metadata.fetch();
List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
- if (parts == null) {
- metadata.add(topic);
- client.awaitMetadataUpdate();
- parts = metadata.fetch().partitionsForTopic(topic);
- }
- return parts;
+ if (parts != null)
+ return parts;
+
+ Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topic), requestTimeoutMs);
+ return topicMetadata.get(topic);
} finally {
release();
}
@@ -1101,7 +1088,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public Map<String, List<PartitionInfo>> listTopics() {
acquire();
try {
- return fetcher.getAllTopics(requestTimeoutMs);
+ return fetcher.getAllTopicMetadata(requestTimeoutMs);
} finally {
release();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
new file mode 100644
index 0000000..f23151c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
+ * and the consumers in lexicographic order. We then divide the number of partitions by the total number of
+ * consumers to determine the number of partitions to assign to each consumer. If it does not evenly
+ * divide, then the first few consumers will have one extra partition.
+ *
+ * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
+ * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
+ *
+ * The assignment will be:
+ * C0: [t0p0, t0p1, t1p0, t1p1]
+ * C1: [t0p2, t1p2]
+ */
+public class RangeAssignor extends AbstractPartitionAssignor {
+
+ @Override
+ public String name() {
+ return "range";
+ }
+
+ private List<TopicPartition> partitions(String topic,
+ int numPartitions) {
+ List<TopicPartition> partitions = new ArrayList<>();
+ for (int i = 0; i < numPartitions; i++)
+ partitions.add(new TopicPartition(topic, i));
+ return partitions;
+ }
+
+ private Map<String, List<String>> consumersPerTopic(Map<String, List<String>> consumerMetadata) {
+ Map<String, List<String>> res = new HashMap<>();
+ for (Map.Entry<String, List<String>> subscriptionEntry : consumerMetadata.entrySet()) {
+ String consumerId = subscriptionEntry.getKey();
+ for (String topic : subscriptionEntry.getValue())
+ put(res, topic, consumerId);
+ }
+ return res;
+ }
+
+ @Override
+ public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+ Map<String, List<String>> subscriptions) {
+ Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
+ Map<String, List<TopicPartition>> assignment = new HashMap<>();
+ for (String memberId : subscriptions.keySet())
+ assignment.put(memberId, new ArrayList<TopicPartition>());
+
+ for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
+ String topic = topicEntry.getKey();
+ List<String> consumersForTopic = topicEntry.getValue();
+
+ Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
+ if (numPartitionsForTopic == null)
+ continue;
+
+ Collections.sort(consumersForTopic);
+
+ int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
+ int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
+
+ List<TopicPartition> partitions = partitions(topic, numPartitionsForTopic);
+ for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
+ int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
+ int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
+ assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
+ }
+ }
+ return assignment;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
new file mode 100644
index 0000000..c5ea2bb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * The roundrobin assignor lays out all the available partitions and all the available consumers. It
+ * then proceeds to do a roundrobin assignment from partition to consumer. If the subscriptions of all consumer
+ * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts
+ * will be within a delta of exactly one across all consumers.)
+ *
+ * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
+ * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
+ *
+ * The assignment will be:
+ * C0: [t0p0, t0p2, t1p1]
+ * C1: [t0p1, t1p0, t1p2]
+ */
+public class RoundRobinAssignor extends AbstractPartitionAssignor {
+
+ @Override
+ public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+ Map<String, List<String>> subscriptions) {
+ Map<String, List<TopicPartition>> assignment = new HashMap<>();
+ for (String memberId : subscriptions.keySet())
+ assignment.put(memberId, new ArrayList<TopicPartition>());
+
+ CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));
+ for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
+ final String topic = partition.topic();
+ while (!subscriptions.get(assigner.peek()).contains(topic))
+ assigner.next();
+ assignment.get(assigner.next()).add(partition);
+ }
+ return assignment;
+ }
+
+
+ public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic,
+ Map<String, List<String>> subscriptions) {
+ SortedSet<String> topics = new TreeSet<>();
+ for (List<String> subscription : subscriptions.values())
+ topics.addAll(subscription);
+
+ List<TopicPartition> allPartitions = new ArrayList<>();
+ for (String topic : topics) {
+ Integer partitions = partitionsPerTopic.get(topic);
+ for (int partition = 0; partition < partitions; partition++) {
+ allPartitions.add(new TopicPartition(topic, partition));
+ }
+ }
+ return allPartitions;
+ }
+
+ @Override
+ public String name() {
+ return "roundrobin";
+ }
+
+ private static class CircularIterator<T> implements Iterator<T> {
+ int i = 0;
+ private List<T> list;
+
+ public CircularIterator(List<T> list) {
+ if (list.isEmpty()) {
+ throw new IllegalArgumentException("CircularIterator can only be used on non-empty lists");
+ }
+ this.list = list;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public T next() {
+ T next = list.get(i);
+ i = (i + 1) % list.size();
+ return next;
+ }
+
+ public T peek() {
+ return list.get(i);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
new file mode 100644
index 0000000..1ffd2bb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -0,0 +1,638 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.requests.GroupMetadataRequest;
+import org.apache.kafka.common.requests.GroupMetadataResponse;
+import org.apache.kafka.common.requests.HeartbeatRequest;
+import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.SyncGroupRequest;
+import org.apache.kafka.common.requests.SyncGroupResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * AbstractCoordinator implements group management for a single group member by interacting with
+ * a designated Kafka broker (the coordinator). Group semantics are provided by extending this class.
+ * See {@link ConsumerCoordinator} for example usage.
+ *
+ * From a high level, Kafka's group management protocol consists of the following sequence of actions:
+ *
+ * <ol>
+ * <li>Group Registration: Group members register with the coordinator providing their own metadata
+ * (such as the set of topics they are interested in).</li>
+ * <li>Group/Leader Selection: The coordinator select the members of the group and chooses one member
+ * as the leader.</li>
+ * <li>State Assignment: The leader collects the metadata from all the members of the group and
+ * assigns state.</li>
+ * <li>Group Stabilization: Each member receives the state assigned by the leader and begins
+ * processing.</li>
+ * </ol>
+ *
+ * To leverage this protocol, an implementation must define the format of metadata provided by each
+ * member for group registration in {@link #metadata()} and the format of the state assignment provided
+ * by the leader in {@link #doSync(String, String, Map)} and becomes available to members in
+ * {@link #onJoin(int, String, String, ByteBuffer)}.
+ *
+ */
+public abstract class AbstractCoordinator {
+
+ private static final Logger log = LoggerFactory.getLogger(AbstractCoordinator.class);
+
+ private final Heartbeat heartbeat;
+ private final HeartbeatTask heartbeatTask;
+ private final int sessionTimeoutMs;
+ private final GroupCoordinatorMetrics sensors;
+ protected final String groupId;
+ protected final ConsumerNetworkClient client;
+ protected final Time time;
+ protected final long retryBackoffMs;
+ protected final long requestTimeoutMs;
+
+ private boolean rejoinNeeded = true;
+ protected Node coordinator;
+ protected String memberId;
+ protected String protocol;
+ protected int generation;
+
+ /**
+ * Initialize the coordination manager.
+ */
+ public AbstractCoordinator(ConsumerNetworkClient client,
+ String groupId,
+ int sessionTimeoutMs,
+ int heartbeatIntervalMs,
+ Metrics metrics,
+ String metricGrpPrefix,
+ Map<String, String> metricTags,
+ Time time,
+ long requestTimeoutMs,
+ long retryBackoffMs) {
+ this.client = client;
+ this.time = time;
+ this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID;
+ this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
+ this.groupId = groupId;
+ this.coordinator = null;
+ this.sessionTimeoutMs = sessionTimeoutMs;
+ this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds());
+ this.heartbeatTask = new HeartbeatTask();
+ this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
+ this.requestTimeoutMs = requestTimeoutMs;
+ this.retryBackoffMs = retryBackoffMs;
+ }
+
+ /**
+ * Unique identifier for the class of protocols implements (e.g. "consumer" or "copycat").
+ * @return Non-null protocol type namej
+ */
+ protected abstract String protocolType();
+
+ /**
+ * Get the current list of protocols and their associated metadata supported
+ * by the local member. The order of the protocols in the map indicates the preference
+ * of the protocol (the first entry is the most preferred). The coordinator takes this
+ * preference into account when selecting the generation protocol (generally more preferred
+ * protocols will be selected as long as all members support them and there is no disagreement
+ * on the preference).
+ * @return Non-empty map of supported protocols and metadata
+ */
+ protected abstract LinkedHashMap<String, ByteBuffer> metadata();
+
+ /**
+ * Invoked when a group member has successfully joined a group.
+ * @param generation The generation that was joined
+ * @param memberId The identifier for the local member in the group
+ * @param protocol The protocol selected by the coordinator
+ * @param memberAssignment The assignment propagated from the group leader
+ */
+ protected abstract void onJoin(int generation,
+ String memberId,
+ String protocol,
+ ByteBuffer memberAssignment);
+
+ /**
+ * Perform synchronization for the group. This is used by the leader to push state to all the members
+ * of the group (e.g. to push partition assignments in the case of the new consumer)
+ * @param leaderId The id of the leader (which is this member)
+ * @param allMemberMetadata Metadata from all members of the group
+ * @return A map from each member to their state assignment
+ */
+ protected abstract Map<String, ByteBuffer> doSync(String leaderId,
+ String protocol,
+ Map<String, ByteBuffer> allMemberMetadata);
+
+ /**
+ * Invoked when the group is left (whether because of shutdown, metadata change, stale generation, etc.)
+ * @param generation The generation that was left
+ * @param memberId The identifier of the local member in the group
+ */
+ protected abstract void onLeave(int generation, String memberId);
+
+
+ /**
+ * Block until the coordinator for this group is known.
+ */
+ public void ensureCoordinatorKnown() {
+ while (coordinatorUnknown()) {
+ RequestFuture<Void> future = sendGroupMetadataRequest();
+ client.poll(future, requestTimeoutMs);
+
+ if (future.failed())
+ client.awaitMetadataUpdate();
+ }
+ }
+
+ /**
+ * Check whether the group should be rejoined (e.g. if metadata changes)
+ * @return true if it should, false otherwise
+ */
+ protected boolean needRejoin() {
+ return rejoinNeeded;
+ }
+
+ /**
+ * Reset the generation/memberId tracked by this member
+ */
+ public void resetGeneration() {
+ this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID;
+ this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
+ rejoinNeeded = true;
+ }
+
+ /**
+ * Ensure that the group is active (i.e. joined and synced)
+ */
+ public void ensureActiveGroup() {
+ if (!needRejoin())
+ return;
+
+ // onLeave only invoked if we have a valid current generation
+ onLeave(generation, memberId);
+
+ while (needRejoin()) {
+ ensureCoordinatorKnown();
+
+ // ensure that there are no pending requests to the coordinator. This is important
+ // in particular to avoid resending a pending JoinGroup request.
+ if (client.pendingRequestCount(this.coordinator) > 0) {
+ client.awaitPendingRequests(this.coordinator);
+ continue;
+ }
+
+ RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
+ client.poll(future);
+
+ if (future.succeeded()) {
+ onJoin(generation, memberId, protocol, future.value());
+ heartbeatTask.reset();
+ } else {
+ if (future.exception() instanceof UnknownMemberIdException)
+ continue;
+ else if (!future.isRetriable())
+ throw future.exception();
+ Utils.sleep(retryBackoffMs);
+ }
+ }
+ }
+
+ private class HeartbeatTask implements DelayedTask {
+
+ public void reset() {
+ // start or restart the heartbeat task to be executed at the next chance
+ long now = time.milliseconds();
+ heartbeat.resetSessionTimeout(now);
+ client.unschedule(this);
+ client.schedule(this, now);
+ }
+
+ @Override
+ public void run(final long now) {
+ if (generation < 0 || needRejoin() || coordinatorUnknown()) {
+ // no need to send the heartbeat we're not using auto-assignment or if we are
+ // awaiting a rebalance
+ return;
+ }
+
+ if (heartbeat.sessionTimeoutExpired(now)) {
+ // we haven't received a successful heartbeat in one session interval
+ // so mark the coordinator dead
+ coordinatorDead();
+ return;
+ }
+
+ if (!heartbeat.shouldHeartbeat(now)) {
+ // we don't need to heartbeat now, so reschedule for when we do
+ client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
+ } else {
+ heartbeat.sentHeartbeat(now);
+ RequestFuture<Void> future = sendHeartbeatRequest();
+ future.addListener(new RequestFutureListener<Void>() {
+ @Override
+ public void onSuccess(Void value) {
+ long now = time.milliseconds();
+ heartbeat.receiveHeartbeat(now);
+ long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
+ client.schedule(HeartbeatTask.this, nextHeartbeatTime);
+ }
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
+ }
+ });
+ }
+ }
+ }
+
+ /**
+ * Send a request to get a new partition assignment. This is a non-blocking call which sends
+ * a JoinGroup request to the coordinator (if it is available). The returned future must
+ * be polled to see if the request completed successfully.
+ * @return A request future whose completion indicates the result of the JoinGroup request.
+ */
+ private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
+ if (coordinatorUnknown())
+ return RequestFuture.coordinatorNotAvailable();
+
+ // send a join group request to the coordinator
+ log.debug("(Re-)joining group {}", groupId);
+
+ List<JoinGroupRequest.GroupProtocol> protocols = new ArrayList<>();
+ for (Map.Entry<String, ByteBuffer> metadataEntry : metadata().entrySet())
+ protocols.add(new JoinGroupRequest.GroupProtocol(metadataEntry.getKey(), metadataEntry.getValue()));
+
+ JoinGroupRequest request = new JoinGroupRequest(
+ groupId,
+ this.sessionTimeoutMs,
+ this.memberId,
+ protocolType(),
+ protocols);
+
+ // create the request for the coordinator
+ log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.coordinator.id());
+ return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
+ .compose(new JoinGroupResponseHandler());
+ }
+
+
+ private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
+
+ @Override
+ public JoinGroupResponse parse(ClientResponse response) {
+ return new JoinGroupResponse(response.responseBody());
+ }
+
+ @Override
+ public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
+ // process the response
+ short errorCode = joinResponse.errorCode();
+ if (errorCode == Errors.NONE.code()) {
+ log.debug("Joined group: {}", joinResponse.toStruct());
+ AbstractCoordinator.this.memberId = joinResponse.memberId();
+ AbstractCoordinator.this.generation = joinResponse.generationId();
+ AbstractCoordinator.this.rejoinNeeded = false;
+ AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
+ sensors.joinLatency.record(response.requestLatencyMs());
+ performSync(joinResponse).chain(future);
+ } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) {
+ // reset the member id and retry immediately
+ AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
+ log.info("Attempt to join group {} failed due to unknown member id, resetting and retrying.",
+ groupId);
+ future.raise(Errors.UNKNOWN_MEMBER_ID);
+ } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
+ || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
+ // re-discover the coordinator and retry with backoff
+ coordinatorDead();
+ log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.",
+ groupId);
+ future.raise(Errors.forCode(errorCode));
+ } else if (errorCode == Errors.INCONSISTENT_GROUP_PROTOCOL.code()
+ || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()) {
+ // log the error and re-throw the exception
+ Errors error = Errors.forCode(errorCode);
+ log.error("Attempt to join group {} failed due to: {}",
+ groupId, error.exception().getMessage());
+ future.raise(error);
+ } else {
+ // unexpected error, throw the exception
+ future.raise(new KafkaException("Unexpected error in join group response: "
+ + Errors.forCode(joinResponse.errorCode()).exception().getMessage()));
+ }
+ }
+ }
+
+ private RequestFuture<ByteBuffer> performSync(JoinGroupResponse joinResponse) {
+ if (joinResponse.isLeader()) {
+ try {
+ // perform the leader synchronization and send back the assignment for the group
+ Map<String, ByteBuffer> groupAssignment = doSync(joinResponse.leaderId(), joinResponse.groupProtocol(),
+ joinResponse.members());
+
+ SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
+ log.debug("Issuing leader SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id());
+ return sendSyncGroupRequest(request);
+ } catch (RuntimeException e) {
+ return RequestFuture.failure(e);
+ }
+ } else {
+ // send follower's sync group with an empty assignment
+ SyncGroupRequest request = new SyncGroupRequest(groupId, generation,
+ memberId, Collections.<String, ByteBuffer>emptyMap());
+ log.debug("Issuing follower SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id());
+ return sendSyncGroupRequest(request);
+ }
+ }
+
+ private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest request) {
+ if (coordinatorUnknown())
+ return RequestFuture.coordinatorNotAvailable();
+ return client.send(coordinator, ApiKeys.SYNC_GROUP, request)
+ .compose(new SyncGroupRequestHandler());
+ }
+
+ private class SyncGroupRequestHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
+
+ @Override
+ public SyncGroupResponse parse(ClientResponse response) {
+ return new SyncGroupResponse(response.responseBody());
+ }
+
+ @Override
+ public void handle(SyncGroupResponse syncResponse,
+ RequestFuture<ByteBuffer> future) {
+ short errorCode = syncResponse.errorCode();
+ if (errorCode == Errors.NONE.code()) {
+ try {
+ future.complete(syncResponse.memberAssignment());
+ sensors.syncLatency.record(response.requestLatencyMs());
+ } catch (SchemaException e) {
+ future.raise(e);
+ }
+ } else {
+ AbstractCoordinator.this.rejoinNeeded = true;
+ future.raise(Errors.forCode(errorCode));
+ }
+ }
+ }
+
+ /**
+ * Discover the current coordinator for the group. Sends a GroupMetadata request to
+ * one of the brokers. The returned future should be polled to get the result of the request.
+ * @return A request future which indicates the completion of the metadata request
+ */
+ private RequestFuture<Void> sendGroupMetadataRequest() {
+ // initiate the group metadata request
+ // find a node to ask about the coordinator
+ Node node = this.client.leastLoadedNode();
+ if (node == null) {
+ // TODO: If there are no brokers left, perhaps we should use the bootstrap set
+ // from configuration?
+ return RequestFuture.noBrokersAvailable();
+ } else {
+ // create a group metadata request
+ log.debug("Issuing group metadata request to broker {}", node.id());
+ GroupMetadataRequest metadataRequest = new GroupMetadataRequest(this.groupId);
+ return client.send(node, ApiKeys.GROUP_METADATA, metadataRequest)
+ .compose(new RequestFutureAdapter<ClientResponse, Void>() {
+ @Override
+ public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
+ handleGroupMetadataResponse(response, future);
+ }
+ });
+ }
+ }
+
+ private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {
+ log.debug("Group metadata response {}", resp);
+
+ // parse the response to get the coordinator info if it is not disconnected,
+ // otherwise we need to request metadata update
+ if (resp.wasDisconnected()) {
+ future.raise(new DisconnectException());
+ } else if (!coordinatorUnknown()) {
+ // We already found the coordinator, so ignore the request
+ future.complete(null);
+ } else {
+ GroupMetadataResponse groupMetadataResponse = new GroupMetadataResponse(resp.responseBody());
+ // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
+ // for the coordinator in the underlying network client layer
+ // TODO: this needs to be better handled in KAFKA-1935
+ if (groupMetadataResponse.errorCode() == Errors.NONE.code()) {
+ this.coordinator = new Node(Integer.MAX_VALUE - groupMetadataResponse.node().id(),
+ groupMetadataResponse.node().host(),
+ groupMetadataResponse.node().port());
+
+ // start sending heartbeats only if we have a valid generation
+ if (generation > 0)
+ heartbeatTask.reset();
+ future.complete(null);
+ } else {
+ future.raise(Errors.forCode(groupMetadataResponse.errorCode()));
+ }
+ }
+ }
+
+ /**
+ * Check if we know who the coordinator is.
+ * @return true if the coordinator is unknown
+ */
+ public boolean coordinatorUnknown() {
+ return this.coordinator == null;
+ }
+
+
+ /**
+ * Mark the current coordinator as dead.
+ */
+ protected void coordinatorDead() {
+ if (this.coordinator != null) {
+ log.info("Marking the coordinator {} dead.", this.coordinator.id());
+ this.coordinator = null;
+ }
+ }
+
+ /**
+ * Send a heartbeat request now (visible only for testing).
+ */
+ public RequestFuture<Void> sendHeartbeatRequest() {
+ HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.memberId);
+ return client.send(coordinator, ApiKeys.HEARTBEAT, req)
+ .compose(new HeartbeatCompletionHandler());
+ }
+
+ private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
+ @Override
+ public HeartbeatResponse parse(ClientResponse response) {
+ return new HeartbeatResponse(response.responseBody());
+ }
+
+ @Override
+ public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
+ sensors.heartbeatLatency.record(response.requestLatencyMs());
+ short error = heartbeatResponse.errorCode();
+ if (error == Errors.NONE.code()) {
+ log.debug("Received successful heartbeat response.");
+ future.complete(null);
+ } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
+ || error == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
+ log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
+ coordinatorDead();
+ future.raise(Errors.forCode(error));
+ } else if (error == Errors.REBALANCE_IN_PROGRESS.code()) {
+ log.info("Attempt to heart beat failed since the group is rebalancing, try to re-join group.");
+ AbstractCoordinator.this.rejoinNeeded = true;
+ future.raise(Errors.REBALANCE_IN_PROGRESS);
+ } else if (error == Errors.ILLEGAL_GENERATION.code()) {
+ log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
+ AbstractCoordinator.this.rejoinNeeded = true;
+ future.raise(Errors.ILLEGAL_GENERATION);
+ } else if (error == Errors.UNKNOWN_MEMBER_ID.code()) {
+ log.info("Attempt to heart beat failed since member id is not valid, reset it and try to re-join group.");
+ memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
+ AbstractCoordinator.this.rejoinNeeded = true;
+ future.raise(Errors.UNKNOWN_MEMBER_ID);
+ } else {
+ future.raise(new KafkaException("Unexpected error in heartbeat response: "
+ + Errors.forCode(error).exception().getMessage()));
+ }
+ }
+ }
+
+ protected abstract class CoordinatorResponseHandler<R, T>
+ extends RequestFutureAdapter<ClientResponse, T> {
+ protected ClientResponse response;
+
+ public abstract R parse(ClientResponse response);
+
+ public abstract void handle(R response, RequestFuture<T> future);
+
+ @Override
+ public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
+ this.response = clientResponse;
+
+ if (clientResponse.wasDisconnected()) {
+ int correlation = response.request().request().header().correlationId();
+ log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected",
+ response.request(),
+ correlation,
+ response.request().request().destination());
+
+ // mark the coordinator as dead
+ coordinatorDead();
+ future.raise(new DisconnectException());
+ return;
+ }
+
+ R response = parse(clientResponse);
+ handle(response, future);
+ }
+
+ }
+
+ private class GroupCoordinatorMetrics {
+ public final Metrics metrics;
+ public final String metricGrpName;
+
+ public final Sensor heartbeatLatency;
+ public final Sensor joinLatency;
+ public final Sensor syncLatency;
+
+ public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
+ this.metrics = metrics;
+ this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
+
+ this.heartbeatLatency = metrics.sensor("heartbeat-latency");
+ this.heartbeatLatency.add(new MetricName("heartbeat-response-time-max",
+ this.metricGrpName,
+ "The max time taken to receive a response to a heartbeat request",
+ tags), new Max());
+ this.heartbeatLatency.add(new MetricName("heartbeat-rate",
+ this.metricGrpName,
+ "The average number of heartbeats per second",
+ tags), new Rate(new Count()));
+
+ this.joinLatency = metrics.sensor("join-latency");
+ this.joinLatency.add(new MetricName("join-time-avg",
+ this.metricGrpName,
+ "The average time taken for a group rejoin",
+ tags), new Avg());
+ this.joinLatency.add(new MetricName("join-time-max",
+ this.metricGrpName,
+ "The max time taken for a group rejoin",
+ tags), new Avg());
+ this.joinLatency.add(new MetricName("join-rate",
+ this.metricGrpName,
+ "The number of group joins per second",
+ tags), new Rate(new Count()));
+
+ this.syncLatency = metrics.sensor("sync-latency");
+ this.syncLatency.add(new MetricName("sync-time-avg",
+ this.metricGrpName,
+ "The average time taken for a group sync",
+ tags), new Avg());
+ this.syncLatency.add(new MetricName("sync-time-max",
+ this.metricGrpName,
+ "The max time taken for a group sync",
+ tags), new Avg());
+ this.syncLatency.add(new MetricName("sync-rate",
+ this.metricGrpName,
+ "The number of group syncs per second",
+ tags), new Rate(new Count()));
+
+ Measurable lastHeartbeat =
+ new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
+ }
+ };
+ metrics.addMetric(new MetricName("last-heartbeat-seconds-ago",
+ this.metricGrpName,
+ "The number of seconds since the last controller heartbeat",
+ tags),
+ lastHeartbeat);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
new file mode 100644
index 0000000..12fa913
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Abstract assignor implementation which does some common grunt work (in particular collecting
+ * partition counts which are always needed in assignors).
+ */
+public abstract class AbstractPartitionAssignor implements PartitionAssignor {
+ private static final Logger log = LoggerFactory.getLogger(AbstractPartitionAssignor.class);
+
+ /**
+ * Perform the group assignment given the partition counts and member subscriptions
+ * @param partitionsPerTopic The number of partitions for each subscribed topic (may be empty for some topics)
+ * @param subscriptions Map from the memberId to their respective topic subscription
+ * @return Map from each member to the
+ */
+ public abstract Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+ Map<String, List<String>> subscriptions);
+
+ @Override
+ public Subscription subscription(Set<String> topics) {
+ return new Subscription(new ArrayList<>(topics));
+ }
+
+ @Override
+ public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
+ Set<String> allSubscribedTopics = new HashSet<>();
+ Map<String, List<String>> topicSubscriptions = new HashMap<>();
+ for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
+ List<String> topics = subscriptionEntry.getValue().topics();
+ allSubscribedTopics.addAll(topics);
+ topicSubscriptions.put(subscriptionEntry.getKey(), topics);
+ }
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ for (String topic : allSubscribedTopics) {
+ Integer numPartitions = metadata.partitionCountForTopic(topic);
+ if (numPartitions != null)
+ partitionsPerTopic.put(topic, numPartitions);
+ else
+ log.debug("Skipping assignment for topic {} since no metadata is available", topic);
+ }
+
+ Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, topicSubscriptions);
+
+ // this class has maintains no user data, so just wrap the results
+ Map<String, Assignment> assignments = new HashMap<>();
+ for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet())
+ assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue()));
+ return assignments;
+ }
+
+ @Override
+ public void onAssignment(Assignment assignment) {
+ // this assignor maintains no internal state, so nothing to do
+ }
+
+ protected static <K, V> void put(Map<K, List<V>> map, K key, V value) {
+ List<V> list = map.get(key);
+ if (list == null) {
+ list = new ArrayList<>();
+ map.put(key, list);
+ }
+ list.add(value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
new file mode 100644
index 0000000..fc7e819
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -0,0 +1,595 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class manages the coordination process with the consumer coordinator.
+ */
+public final class ConsumerCoordinator extends AbstractCoordinator implements Closeable {
+
+ private static final Logger log = LoggerFactory.getLogger(ConsumerCoordinator.class);
+
+ private final Map<String, PartitionAssignor> protocolMap;
+ private final org.apache.kafka.clients.Metadata metadata;
+ private final MetadataSnapshot metadataSnapshot;
+ private final ConsumerCoordinatorMetrics sensors;
+ private final SubscriptionState subscriptions;
+ private final OffsetCommitCallback defaultOffsetCommitCallback;
+ private final boolean autoCommitEnabled;
+
+ /**
+ * Initialize the coordination manager.
+ */
+ public ConsumerCoordinator(ConsumerNetworkClient client,
+ String groupId,
+ int sessionTimeoutMs,
+ int heartbeatIntervalMs,
+ List<PartitionAssignor> assignors,
+ Metadata metadata,
+ SubscriptionState subscriptions,
+ Metrics metrics,
+ String metricGrpPrefix,
+ Map<String, String> metricTags,
+ Time time,
+ long requestTimeoutMs,
+ long retryBackoffMs,
+ OffsetCommitCallback defaultOffsetCommitCallback,
+ boolean autoCommitEnabled,
+ long autoCommitIntervalMs) {
+ super(client,
+ groupId,
+ sessionTimeoutMs,
+ heartbeatIntervalMs,
+ metrics,
+ metricGrpPrefix,
+ metricTags,
+ time,
+ requestTimeoutMs,
+ retryBackoffMs);
+ this.metadata = metadata;
+
+ this.metadata.requestUpdate();
+ this.metadataSnapshot = new MetadataSnapshot();
+ this.subscriptions = subscriptions;
+ this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
+ this.autoCommitEnabled = autoCommitEnabled;
+
+ this.protocolMap = new HashMap<>();
+ for (PartitionAssignor assignor : assignors)
+ this.protocolMap.put(assignor.name(), assignor);
+
+ addMetadataListener();
+
+ if (autoCommitEnabled)
+ scheduleAutoCommitTask(autoCommitIntervalMs);
+
+ this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
+ }
+
+ @Override
+ public String protocolType() {
+ return "consumer";
+ }
+
+ @Override
+ public LinkedHashMap<String, ByteBuffer> metadata() {
+ LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>();
+ for (PartitionAssignor assignor : protocolMap.values()) {
+ Subscription subscription = assignor.subscription(subscriptions.subscription());
+ metadata.put(assignor.name(), ConsumerProtocol.serializeSubscription(subscription));
+ }
+ return metadata;
+ }
+
+ private void addMetadataListener() {
+ this.metadata.addListener(new Metadata.Listener() {
+ @Override
+ public void onMetadataUpdate(Cluster cluster) {
+ if (subscriptions.hasPatternSubscription()) {
+ final List<String> topicsToSubscribe = new ArrayList<>();
+
+ for (String topic : cluster.topics())
+ if (subscriptions.getSubscribedPattern().matcher(topic).matches())
+ topicsToSubscribe.add(topic);
+
+ subscriptions.changeSubscription(topicsToSubscribe);
+ metadata.setTopics(subscriptions.groupSubscription());
+ }
+
+ // check if there are any changes to the metadata which should trigger a rebalance
+ if (metadataSnapshot.update(subscriptions, cluster) && subscriptions.partitionsAutoAssigned())
+ subscriptions.needReassignment();
+ }
+ });
+ }
+
+ @Override
+ protected void onJoin(int generation,
+ String memberId,
+ String assignmentStrategy,
+ ByteBuffer assignmentBuffer) {
+ PartitionAssignor assignor = protocolMap.get(assignmentStrategy);
+ if (assignor == null)
+ throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
+
+ Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
+
+ // set the flag to refresh last committed offsets
+ subscriptions.needRefreshCommits();
+
+ // update partition assignment
+ subscriptions.changePartitionAssignment(assignment.partitions());
+
+ // give the assignor a chance to update internal state based on the received assignment
+ assignor.onAssignment(assignment);
+
+ // execute the user's callback after rebalance
+ ConsumerRebalanceListener listener = subscriptions.listener();
+ log.debug("Setting newly assigned partitions {}", subscriptions.assignedPartitions());
+ try {
+ Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
+ listener.onPartitionsAssigned(assigned);
+ } catch (Exception e) {
+ log.error("User provided listener " + listener.getClass().getName()
+ + " failed on partition assignment: ", e);
+ }
+ }
+
+ @Override
+ protected Map<String, ByteBuffer> doSync(String leaderId,
+ String assignmentStrategy,
+ Map<String, ByteBuffer> allSubscriptions) {
+ PartitionAssignor assignor = protocolMap.get(protocol);
+ if (assignor == null)
+ throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
+
+ Set<String> allSubscribedTopics = new HashSet<>();
+ Map<String, Subscription> subscriptions = new HashMap<>();
+ for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
+ Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
+ subscriptions.put(subscriptionEntry.getKey(), subscription);
+ allSubscribedTopics.addAll(subscription.topics());
+ }
+
+ // the leader will begin watching for changes to any of the topics the group is interested in,
+ // which ensures that all metadata changes will eventually be seen
+ this.subscriptions.groupSubscribe(allSubscribedTopics);
+ metadata.setTopics(this.subscriptions.groupSubscription());
+ client.ensureFreshMetadata();
+
+ log.debug("Performing {} assignment for subscriptions {}", assignor.name(), subscriptions);
+
+ Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);
+
+ log.debug("Finished assignment: {}", assignment);
+
+ Map<String, ByteBuffer> groupAssignment = new HashMap<>();
+ for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {
+ ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
+ groupAssignment.put(assignmentEntry.getKey(), buffer);
+ }
+
+ return groupAssignment;
+ }
+
+ @Override
+ protected void onLeave(int generation, String memberId) {
+ // commit offsets prior to rebalance if auto-commit enabled
+ maybeAutoCommitOffsetsSync();
+
+ // execute the user's callback before rebalance
+ ConsumerRebalanceListener listener = subscriptions.listener();
+ log.debug("Revoking previously assigned partitions {}", subscriptions.assignedPartitions());
+ try {
+ Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
+ listener.onPartitionsRevoked(revoked);
+ } catch (Exception e) {
+ log.error("User provided listener " + listener.getClass().getName()
+ + " failed on partition revocation: ", e);
+ }
+
+ subscriptions.needReassignment();
+ }
+
+ @Override
+ public boolean needRejoin() {
+ return subscriptions.partitionsAutoAssigned() &&
+ (super.needRejoin() || subscriptions.partitionAssignmentNeeded());
+ }
+
+ /**
+ * Refresh the committed offsets for provided partitions.
+ */
+ public void refreshCommittedOffsetsIfNeeded() {
+ if (subscriptions.refreshCommitsNeeded()) {
+ Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());
+ for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
+ TopicPartition tp = entry.getKey();
+ // verify assignment is still active
+ if (subscriptions.isAssigned(tp))
+ this.subscriptions.committed(tp, entry.getValue());
+ }
+ this.subscriptions.commitsRefreshed();
+ }
+ }
+
+ /**
+ * Fetch the current committed offsets from the coordinator for a set of partitions.
+ * @param partitions The partitions to fetch offsets for
+ * @return A map from partition to the committed offset
+ */
+ public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) {
+ while (true) {
+ ensureCoordinatorKnown();
+
+ // contact coordinator to fetch committed offsets
+ RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions);
+ client.poll(future);
+
+ if (future.succeeded())
+ return future.value();
+
+ if (!future.isRetriable())
+ throw future.exception();
+
+ Utils.sleep(retryBackoffMs);
+ }
+ }
+
+ /**
+ * Ensure that we have a valid partition assignment from the coordinator.
+ */
+ public void ensurePartitionAssignment() {
+ if (subscriptions.partitionsAutoAssigned())
+ ensureActiveGroup();
+ }
+
+ @Override
+ public void close() {
+ // commit offsets prior to closing if auto-commit enabled
+ while (true) {
+ try {
+ maybeAutoCommitOffsetsSync();
+ return;
+ } catch (ConsumerWakeupException e) {
+ // ignore wakeups while closing to ensure we have a chance to commit
+ continue;
+ }
+ }
+ }
+
+ public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
+ this.subscriptions.needRefreshCommits();
+ RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
+ final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
+ future.addListener(new RequestFutureListener<Void>() {
+ @Override
+ public void onSuccess(Void value) {
+ cb.onComplete(offsets, null);
+ }
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ cb.onComplete(offsets, e);
+ }
+ });
+ }
+
+ public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
+ if (offsets.isEmpty())
+ return;
+
+ while (true) {
+ ensureCoordinatorKnown();
+
+ RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
+ client.poll(future);
+
+ if (future.succeeded()) {
+ return;
+ }
+
+ if (!future.isRetriable()) {
+ throw future.exception();
+ }
+
+ Utils.sleep(retryBackoffMs);
+ }
+ }
+
+ private void scheduleAutoCommitTask(final long interval) {
+ DelayedTask task = new DelayedTask() {
+ public void run(long now) {
+ commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
+ @Override
+ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+ if (exception != null)
+ log.error("Auto offset commit failed.", exception);
+ }
+ });
+ client.schedule(this, now + interval);
+ }
+ };
+ client.schedule(task, time.milliseconds() + interval);
+ }
+
+ private void maybeAutoCommitOffsetsSync() {
+ if (autoCommitEnabled) {
+ try {
+ commitOffsetsSync(subscriptions.allConsumed());
+ } catch (ConsumerWakeupException e) {
+ // rethrow wakeups since they are triggered by the user
+ throw e;
+ } catch (Exception e) {
+ // consistent with async auto-commit failures, we do not propagate the exception
+ log.error("Auto offset commit failed.", e);
+ }
+ }
+ }
+
+ /**
+ * Commit offsets for the specified list of topics and partitions. This is a non-blocking call
+ * which returns a request future that can be polled in the case of a synchronous commit or ignored in the
+ * asynchronous case.
+ *
+ * @param offsets The list of offsets per partition that should be committed.
+ * @return A request future whose value indicates whether the commit was successful or not
+ */
+ private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
+ if (coordinatorUnknown())
+ return RequestFuture.coordinatorNotAvailable();
+
+ if (offsets.isEmpty())
+ return RequestFuture.voidSuccess();
+
+ // create the offset commit request
+ Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
+ for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
+ OffsetAndMetadata offsetAndMetadata = entry.getValue();
+ offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(
+ offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
+ }
+
+ OffsetCommitRequest req = new OffsetCommitRequest(this.groupId,
+ this.generation,
+ this.memberId,
+ OffsetCommitRequest.DEFAULT_RETENTION_TIME,
+ offsetData);
+
+ return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req)
+ .compose(new OffsetCommitResponseHandler(offsets));
+ }
+
+ public static class DefaultOffsetCommitCallback implements OffsetCommitCallback {
+ @Override
+ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+ if (exception != null)
+ log.error("Offset commit failed.", exception);
+ }
+ }
+
+ private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {
+
+ private final Map<TopicPartition, OffsetAndMetadata> offsets;
+
+ public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) {
+ this.offsets = offsets;
+ }
+
+ @Override
+ public OffsetCommitResponse parse(ClientResponse response) {
+ return new OffsetCommitResponse(response.responseBody());
+ }
+
+ @Override
+ public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
+ sensors.commitLatency.record(response.requestLatencyMs());
+ for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
+ TopicPartition tp = entry.getKey();
+ OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
+ long offset = offsetAndMetadata.offset();
+
+ short errorCode = entry.getValue();
+ if (errorCode == Errors.NONE.code()) {
+ log.debug("Committed offset {} for partition {}", offset, tp);
+ if (subscriptions.isAssigned(tp))
+ // update the local cache only if the partition is still assigned
+ subscriptions.committed(tp, offsetAndMetadata);
+ } else {
+ if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
+ || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
+ coordinatorDead();
+ } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()
+ || errorCode == Errors.ILLEGAL_GENERATION.code()) {
+ // need to re-join group
+ subscriptions.needReassignment();
+ }
+
+ log.error("Error committing partition {} at offset {}: {}",
+ tp,
+ offset,
+ Errors.forCode(errorCode).exception().getMessage());
+
+ future.raise(Errors.forCode(errorCode));
+ return;
+ }
+ }
+
+ future.complete(null);
+ }
+ }
+
+ /**
+ * Fetch the committed offsets for a set of partitions. This is a non-blocking call. The
+ * returned future can be polled to get the actual offsets returned from the broker.
+ *
+ * @param partitions The set of partitions to get offsets for.
+ * @return A request future containing the committed offsets.
+ */
+ private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
+ if (coordinatorUnknown())
+ return RequestFuture.coordinatorNotAvailable();
+
+ log.debug("Fetching committed offsets for partitions: {}", Utils.join(partitions, ", "));
+ // construct the request
+ OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
+
+ // send the request with a callback
+ return client.send(coordinator, ApiKeys.OFFSET_FETCH, request)
+ .compose(new OffsetFetchResponseHandler());
+ }
+
+ private class OffsetFetchResponseHandler extends CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, OffsetAndMetadata>> {
+
+ @Override
+ public OffsetFetchResponse parse(ClientResponse response) {
+ return new OffsetFetchResponse(response.responseBody());
+ }
+
+ @Override
+ public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
+ Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size());
+ for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
+ TopicPartition tp = entry.getKey();
+ OffsetFetchResponse.PartitionData data = entry.getValue();
+ if (data.hasError()) {
+ log.debug("Error fetching offset for topic-partition {}: {}", tp, Errors.forCode(data.errorCode)
+ .exception()
+ .getMessage());
+ if (data.errorCode == Errors.OFFSET_LOAD_IN_PROGRESS.code()) {
+ // just retry
+ future.raise(Errors.OFFSET_LOAD_IN_PROGRESS);
+ } else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
+ // re-discover the coordinator and retry
+ coordinatorDead();
+ future.raise(Errors.NOT_COORDINATOR_FOR_GROUP);
+ } else if (data.errorCode == Errors.UNKNOWN_MEMBER_ID.code()
+ || data.errorCode == Errors.ILLEGAL_GENERATION.code()) {
+ // need to re-join group
+ subscriptions.needReassignment();
+ future.raise(Errors.forCode(data.errorCode));
+ } else {
+ future.raise(new KafkaException("Unexpected error in fetch offset response: "
+ + Errors.forCode(data.errorCode).exception().getMessage()));
+ }
+ return;
+ } else if (data.offset >= 0) {
+ // record the position with the offset (-1 indicates no committed offset to fetch)
+ offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata));
+ } else {
+ log.debug("No committed offset for partition " + tp);
+ }
+ }
+
+ future.complete(offsets);
+ }
+ }
+
+ private class ConsumerCoordinatorMetrics {
+ public final Metrics metrics;
+ public final String metricGrpName;
+
+ public final Sensor commitLatency;
+
+ public ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
+ this.metrics = metrics;
+ this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
+
+ this.commitLatency = metrics.sensor("commit-latency");
+ this.commitLatency.add(new MetricName("commit-latency-avg",
+ this.metricGrpName,
+ "The average time taken for a commit request",
+ tags), new Avg());
+ this.commitLatency.add(new MetricName("commit-latency-max",
+ this.metricGrpName,
+ "The max time taken for a commit request",
+ tags), new Max());
+ this.commitLatency.add(new MetricName("commit-rate",
+ this.metricGrpName,
+ "The number of commit calls per second",
+ tags), new Rate(new Count()));
+
+ Measurable numParts =
+ new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return subscriptions.assignedPartitions().size();
+ }
+ };
+ metrics.addMetric(new MetricName("assigned-partitions",
+ this.metricGrpName,
+ "The number of partitions currently assigned to this consumer",
+ tags),
+ numParts);
+ }
+ }
+
+ private static class MetadataSnapshot {
+ private Map<String, Integer> partitionsPerTopic = new HashMap<>();
+
+ public boolean update(SubscriptionState subscription, Cluster cluster) {
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ for (String topic : subscription.groupSubscription())
+ partitionsPerTopic.put(topic, cluster.partitionCountForTopic(topic));
+
+ if (!partitionsPerTopic.equals(this.partitionsPerTopic)) {
+ this.partitionsPerTopic = partitionsPerTopic;
+ return true;
+ }
+
+ return false;
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 4153eb3..fbfe54a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -127,6 +127,15 @@ public class ConsumerNetworkClient implements Closeable {
}
/**
+ * Ensure our metadata is fresh (if an update is expected, this will block
+ * until it has completed).
+ */
+ public void ensureFreshMetadata() {
+ if (this.metadata.timeToNextUpdate(time.milliseconds()) == 0)
+ awaitMetadataUpdate();
+ }
+
+ /**
* Wakeup an active poll. This will cause the polling thread to throw an exception either
* on the current poll if one is active, or the next poll.
*/