You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/03 23:40:25 UTC
[1/3] kafka git commit: KAFKA-2687: Add support for ListGroups and
DescribeGroup APIs
Repository: kafka
Updated Branches:
refs/heads/trunk f413143ed -> 596c203af
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala
index 0a5bb3c..88eb9ae 100644
--- a/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala
@@ -23,15 +23,18 @@ import org.junit.Test
import org.scalatest.junit.JUnitSuite
class MemberMetadataTest extends JUnitSuite {
+ val groupId = "groupId"
+ val clientId = "clientId"
+ val clientHost = "clientHost"
+ val memberId = "memberId"
+ val sessionTimeoutMs = 10000
+
@Test
def testMatchesSupportedProtocols {
- val groupId = "groupId"
- val memberId = "memberId"
- val sessionTimeoutMs = 10000
val protocols = List(("range", Array.empty[Byte]))
- val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, protocols)
+ val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocols)
assertTrue(member.matches(protocols))
assertFalse(member.matches(List(("range", Array[Byte](0)))))
assertFalse(member.matches(List(("roundrobin", Array.empty[Byte]))))
@@ -40,48 +43,36 @@ class MemberMetadataTest extends JUnitSuite {
@Test
def testVoteForPreferredProtocol {
- val groupId = "groupId"
- val memberId = "memberId"
- val sessionTimeoutMs = 10000
val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))
- val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, protocols)
+ val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocols)
assertEquals("range", member.vote(Set("range", "roundrobin")))
assertEquals("roundrobin", member.vote(Set("blah", "roundrobin")))
}
@Test
def testMetadata {
- val groupId = "groupId"
- val memberId = "memberId"
- val sessionTimeoutMs = 10000
val protocols = List(("range", Array[Byte](0)), ("roundrobin", Array[Byte](1)))
- val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, protocols)
+ val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocols)
assertTrue(util.Arrays.equals(Array[Byte](0), member.metadata("range")))
assertTrue(util.Arrays.equals(Array[Byte](1), member.metadata("roundrobin")))
}
@Test(expected = classOf[IllegalArgumentException])
def testMetadataRaisesOnUnsupportedProtocol {
- val groupId = "groupId"
- val memberId = "memberId"
- val sessionTimeoutMs = 10000
val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))
- val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, protocols)
+ val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocols)
member.metadata("blah")
fail()
}
@Test(expected = classOf[IllegalArgumentException])
def testVoteRaisesOnNoSupportedProtocols {
- val groupId = "groupId"
- val memberId = "memberId"
- val sessionTimeoutMs = 10000
val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))
- val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, protocols)
+ val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocols)
member.vote(Set("blah"))
fail()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 4e5e776..31f743b 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -17,7 +17,7 @@
package kafka.server
-import kafka.api.{GroupMetadataRequest, OffsetCommitRequest, OffsetFetchRequest}
+import kafka.api.{GroupCoordinatorRequest, OffsetCommitRequest, OffsetFetchRequest}
import kafka.consumer.SimpleConsumer
import kafka.common.{OffsetMetadata, OffsetMetadataAndError, OffsetAndMetadata, ErrorMapping, TopicAndPartition}
import kafka.utils._
@@ -56,7 +56,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
time = new MockTime()
server = TestUtils.createServer(KafkaConfig.fromProps(config), time)
simpleConsumer = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64*1024, "test-client")
- val consumerMetadataRequest = GroupMetadataRequest(group)
+ val consumerMetadataRequest = GroupCoordinatorRequest(group)
Stream.continually {
val consumerMetadataResponse = simpleConsumer.send(consumerMetadataRequest)
consumerMetadataResponse.coordinatorOpt.isDefined
[3/3] kafka git commit: KAFKA-2687: Add support for ListGroups and
DescribeGroup APIs
Posted by gu...@apache.org.
KAFKA-2687: Add support for ListGroups and DescribeGroup APIs
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Guozhang Wang, Jun Rao
Closes #388 from hachikuji/K2687
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/596c203a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/596c203a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/596c203a
Branch: refs/heads/trunk
Commit: 596c203af1f33360c04f4be7c466310d11343f78
Parents: f413143
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Nov 3 14:46:04 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Nov 3 14:46:04 2015 -0800
----------------------------------------------------------------------
.../kafka/clients/consumer/ConsumerConfig.java | 34 +--
.../consumer/internals/AbstractCoordinator.java | 20 +-
.../consumer/internals/ConsumerCoordinator.java | 2 +-
.../consumer/internals/ConsumerProtocol.java | 2 +
.../kafka/clients/producer/ProducerConfig.java | 70 +++---
.../apache/kafka/common/config/ConfigDef.java | 17 ++
.../apache/kafka/common/config/SaslConfigs.java | 8 +
.../apache/kafka/common/config/SslConfigs.java | 17 ++
.../apache/kafka/common/protocol/ApiKeys.java | 6 +-
.../apache/kafka/common/protocol/Errors.java | 3 +-
.../apache/kafka/common/protocol/Protocol.java | 82 ++++++-
.../kafka/common/protocol/types/Struct.java | 8 +
.../kafka/common/requests/AbstractRequest.java | 8 +-
.../common/requests/DescribeGroupsRequest.java | 68 ++++++
.../common/requests/DescribeGroupsResponse.java | 224 +++++++++++++++++
.../requests/GroupCoordinatorRequest.java | 65 +++++
.../requests/GroupCoordinatorResponse.java | 70 ++++++
.../common/requests/GroupMetadataRequest.java | 65 -----
.../common/requests/GroupMetadataResponse.java | 70 ------
.../common/requests/ListGroupsRequest.java | 57 +++++
.../common/requests/ListGroupsResponse.java | 107 ++++++++
.../internals/ConsumerCoordinatorTest.java | 4 +-
.../common/requests/RequestResponseTest.java | 45 +++-
.../distributed/WorkerCoordinatorTest.java | 4 +-
.../main/scala/kafka/admin/AdminClient.scala | 242 +++++++++++++++++++
.../kafka/api/GroupCoordinatorRequest.scala | 80 ++++++
.../kafka/api/GroupCoordinatorResponse.scala | 58 +++++
.../scala/kafka/api/GroupMetadataRequest.scala | 80 ------
.../scala/kafka/api/GroupMetadataResponse.scala | 58 -----
core/src/main/scala/kafka/api/RequestKeys.scala | 10 +-
.../main/scala/kafka/client/ClientUtils.scala | 4 +-
.../main/scala/kafka/common/ErrorMapping.scala | 1 +
.../scala/kafka/consumer/SimpleConsumer.scala | 4 +-
.../kafka/coordinator/GroupCoordinator.scala | 104 ++++++--
.../scala/kafka/coordinator/GroupMetadata.scala | 36 ++-
.../coordinator/GroupMetadataManager.scala | 38 +--
.../kafka/coordinator/MemberMetadata.scala | 24 +-
.../javaapi/GroupCoordinatorResponse.scala | 47 ++++
.../kafka/javaapi/GroupMetadataResponse.scala | 47 ----
.../scala/kafka/network/RequestChannel.scala | 3 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 90 +++++--
.../integration/kafka/api/AdminClientTest.scala | 114 +++++++++
.../kafka/api/AuthorizerIntegrationTest.scala | 8 +-
.../scala/other/kafka/TestOffsetManager.scala | 4 +-
.../api/RequestResponseSerializationTest.scala | 10 +-
.../GroupCoordinatorResponseTest.scala | 144 +++++++++--
.../kafka/coordinator/GroupMetadataTest.scala | 23 +-
.../kafka/coordinator/MemberMetadataTest.scala | 31 +--
.../unit/kafka/server/OffsetCommitTest.scala | 4 +-
49 files changed, 1765 insertions(+), 555 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index b366efd..4131352 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -17,8 +17,6 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.HashMap;
@@ -286,27 +284,6 @@ public class ConsumerConfig extends AbstractConfig {
Type.CLASS,
Importance.HIGH,
VALUE_DESERIALIZER_CLASS_DOC)
- .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
- .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
- .define(SslConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
- .define(SslConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
- .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
- .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
- .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
- .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
- .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
- .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
- .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
- .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
- .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
- .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
- .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
- .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
- .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
- .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
- .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
- .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
- .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,
Type.INT,
40 * 1000,
@@ -318,7 +295,16 @@ public class ConsumerConfig extends AbstractConfig {
Type.LONG,
9 * 60 * 1000,
Importance.MEDIUM,
- CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC);
+ CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
+
+ // security support
+ .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+ Type.STRING,
+ CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
+ Importance.MEDIUM,
+ CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+ .withClientSslSupport()
+ .withClientSaslSupport();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index d8f3c25..9cf825c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -30,8 +30,8 @@ import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.GroupMetadataRequest;
-import org.apache.kafka.common.requests.GroupMetadataResponse;
+import org.apache.kafka.common.requests.GroupCoordinatorRequest;
+import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
@@ -450,8 +450,8 @@ public abstract class AbstractCoordinator {
} else {
// create a group metadata request
log.debug("Issuing group metadata request to broker {}", node.id());
- GroupMetadataRequest metadataRequest = new GroupMetadataRequest(this.groupId);
- return client.send(node, ApiKeys.GROUP_METADATA, metadataRequest)
+ GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
+ return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
.compose(new RequestFutureAdapter<ClientResponse, Void>() {
@Override
public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
@@ -472,14 +472,14 @@ public abstract class AbstractCoordinator {
// We already found the coordinator, so ignore the request
future.complete(null);
} else {
- GroupMetadataResponse groupMetadataResponse = new GroupMetadataResponse(resp.responseBody());
+ GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody());
// use MAX_VALUE - node.id as the coordinator id to mimic separate connections
// for the coordinator in the underlying network client layer
// TODO: this needs to be better handled in KAFKA-1935
- if (groupMetadataResponse.errorCode() == Errors.NONE.code()) {
- this.coordinator = new Node(Integer.MAX_VALUE - groupMetadataResponse.node().id(),
- groupMetadataResponse.node().host(),
- groupMetadataResponse.node().port());
+ if (groupCoordinatorResponse.errorCode() == Errors.NONE.code()) {
+ this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
+ groupCoordinatorResponse.node().host(),
+ groupCoordinatorResponse.node().port());
client.tryConnect(coordinator);
@@ -488,7 +488,7 @@ public abstract class AbstractCoordinator {
heartbeatTask.reset();
future.complete(null);
} else {
- future.raise(Errors.forCode(groupMetadataResponse.errorCode()));
+ future.raise(Errors.forCode(groupCoordinatorResponse.errorCode()));
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 97d25c3..5b5d6ff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -119,7 +119,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
@Override
public String protocolType() {
- return "consumer";
+ return ConsumerProtocol.PROTOCOL_TYPE;
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index 0020993..4728a50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -54,6 +54,8 @@ import java.util.Map;
*/
public class ConsumerProtocol {
+ public static final String PROTOCOL_TYPE = "consumer";
+
public static final String VERSION_KEY_NAME = "version";
public static final String TOPICS_KEY_NAME = "topics";
public static final String TOPIC_KEY_NAME = "topic";
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index e9d9fad..3eaea09 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -12,23 +12,22 @@
*/
package org.apache.kafka.clients.producer;
-import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
-import static org.apache.kafka.common.config.ConfigDef.Range.between;
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.serialization.Serializer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
/**
* Configuration for the Kafka Producer. Documentation for these configurations can be found in the <a
* href="http://kafka.apache.org/documentation.html#newproducerconfigs">Kafka documentation</a>
@@ -262,32 +261,33 @@ public class ProducerConfig extends AbstractConfig {
atLeast(1),
Importance.LOW,
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
- .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
- .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
- .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
- .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
- .define(SslConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
- .define(SslConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
- .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
- .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
- .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
- .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
- .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
- .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
- .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
- .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
- .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
- .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
- .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
- .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
- .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
- .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
- .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
- .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
- .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
+ .define(KEY_SERIALIZER_CLASS_CONFIG,
+ Type.CLASS,
+ Importance.HIGH,
+ KEY_SERIALIZER_CLASS_DOC)
+ .define(VALUE_SERIALIZER_CLASS_CONFIG,
+ Type.CLASS,
+ Importance.HIGH,
+ VALUE_SERIALIZER_CLASS_DOC)
/* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
- .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
- .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.internals.DefaultPartitioner", Importance.MEDIUM, PARTITIONER_CLASS_DOC);
+ .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
+ Type.LONG,
+ 9 * 60 * 1000,
+ Importance.MEDIUM,
+ CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
+ .define(PARTITIONER_CLASS_CONFIG,
+ Type.CLASS,
+ DefaultPartitioner.class.getName(),
+ Importance.MEDIUM, PARTITIONER_CLASS_DOC)
+
+ // security support
+ .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+ Type.STRING,
+ CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
+ Importance.MEDIUM,
+ CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+ .withClientSslSupport()
+ .withClientSaslSupport();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 2e820dd..f01ed28 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -151,6 +151,23 @@ public class ConfigDef {
return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, required);
}
+ /**
+ * Add standard SSL client configuration options.
+ * @return this
+ */
+ public ConfigDef withClientSslSupport() {
+ SslConfigs.addClientSslSupport(this);
+ return this;
+ }
+
+ /**
+ * Add standard SASL client configuration options.
+ * @return this
+ */
+ public ConfigDef withClientSaslSupport() {
+ SaslConfigs.addClientSaslSupport(this);
+ return this;
+ }
/**
* Parse and validate configs against this configuration definition. The input is a map of configs. It is expected
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
index 657c6d3..0046868 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
@@ -49,4 +49,12 @@ public class SaslConfigs {
"By default, principal names of the form <username>/<hostname>@<REALM> are mapped to <username>.";
public static final List<String> DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = Collections.singletonList("DEFAULT");
+ public static void addClientSaslSupport(ConfigDef config) {
+ config.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
+ .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
+ .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
+ .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
+ .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
index 60e1eb3..8f93301 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
@@ -96,4 +96,21 @@ public class SslConfigs {
+ " unlike requested , if this option is set client can choose not to provide authentication information about itself"
+ " <li><code>ssl.client.auth=none</code> This means client authentication is not needed.";
+ public static void addClientSslSupport(ConfigDef config) {
+ config.define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigDef.Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, ConfigDef.Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
+ .define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
+ .define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
+ .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
+ .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
+ .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
+ .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
+ .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
+ .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
+ .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
+ .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
+ .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
+ .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
+ .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
+ .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index af7b266..5bd3c96 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -30,11 +30,13 @@ public enum ApiKeys {
CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
OFFSET_COMMIT(8, "OffsetCommit"),
OFFSET_FETCH(9, "OffsetFetch"),
- GROUP_METADATA(10, "GroupMetadata"),
+ GROUP_COORDINATOR(10, "GroupCoordinator"),
JOIN_GROUP(11, "JoinGroup"),
HEARTBEAT(12, "Heartbeat"),
LEAVE_GROUP(13, "LeaveGroup"),
- SYNC_GROUP(14, "SyncGroup");
+ SYNC_GROUP(14, "SyncGroup"),
+ DESCRIBE_GROUPS(15, "DescribeGroups"),
+ LIST_GROUPS(16, "ListGroups");
private static ApiKeys[] codeToType;
public static final int MAX_API_KEY;
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 2c9cb20..d4eb1f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -86,7 +86,8 @@ public enum Errors {
new ApiException("The session timeout is not within an acceptable range.")),
INVALID_COMMIT_OFFSET_SIZE(28,
new ApiException("The committing offset data size is not valid")),
- AUTHORIZATION_FAILED(29, new ApiException("Request is not authorized.")),
+ AUTHORIZATION_FAILED(29,
+ new ApiException("Request is not authorized.")),
REBALANCE_IN_PROGRESS(30,
new RebalanceInProgressException("The group is rebalancing, so a rejoin is needed."));
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 36094b0..00560db 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -388,18 +388,71 @@ public class Protocol {
public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1};
public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1};
- /* Consumer metadata api */
- public static final Schema GROUP_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
- STRING,
- "The unique group id."));
+ /* List groups api */
+ public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();
- public static final Schema GROUP_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
- new Field("coordinator",
- BROKER,
- "Host and port information for the coordinator for a consumer group."));
+ public static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema(new Field("group_id", STRING),
+ new Field("protocol_type", STRING));
+ public static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
+ new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
- public static final Schema[] GROUP_METADATA_REQUEST = new Schema[] {GROUP_METADATA_REQUEST_V0};
- public static final Schema[] GROUP_METADATA_RESPONSE = new Schema[] {GROUP_METADATA_RESPONSE_V0};
+ public static final Schema[] LIST_GROUPS_REQUEST = new Schema[] {LIST_GROUPS_REQUEST_V0};
+ public static final Schema[] LIST_GROUPS_RESPONSE = new Schema[] {LIST_GROUPS_RESPONSE_V0};
+
+ /* Describe group api */
+ public static final Schema DESCRIBE_GROUPS_REQUEST_V0 = new Schema(new Field("group_ids",
+ new ArrayOf(STRING),
+ "List of groupIds to request metadata for (an empty groupId array will return empty group metadata)."));
+
+ public static final Schema DESCRIBE_GROUPS_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id",
+ STRING,
+ "The memberId assigned by the coordinator"),
+ new Field("client_id",
+ STRING,
+ "The client id used in the member's latest join group request"),
+ new Field("client_host",
+ STRING,
+ "The client host used in the request session corresponding to the member's join group."),
+ new Field("member_metadata",
+ BYTES,
+ "The metadata corresponding to the current group protocol in use (will only be present if the group is stable)."),
+ new Field("member_assignment",
+ BYTES,
+ "The current assignment provided by the group leader (will only be present if the group is stable)."));
+
+ public static final Schema DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0 = new Schema(new Field("error_code", INT16),
+ new Field("group_id",
+ STRING),
+ new Field("state",
+ STRING,
+ "The current state of the group (one of: Dead, Stable, AwaitingSync, or PreparingRebalance, or empty if there is no active group)"),
+ new Field("protocol_type",
+ STRING,
+ "The current group protocol type (will be empty if the there is no active group)"),
+ new Field("protocol",
+ STRING,
+ "The current group protocol (only provided if the group is Stable)"),
+ new Field("members",
+ new ArrayOf(DESCRIBE_GROUPS_RESPONSE_MEMBER_V0),
+ "Current group members (only provided if the group is not Dead)"));
+
+ public static final Schema DESCRIBE_GROUPS_RESPONSE_V0 = new Schema(new Field("groups", new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
+
+ public static final Schema[] DESCRIBE_GROUPS_REQUEST = new Schema[] {DESCRIBE_GROUPS_REQUEST_V0};
+ public static final Schema[] DESCRIBE_GROUPS_RESPONSE = new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0};
+
+ /* Group coordinator api */
+ public static final Schema GROUP_COORDINATOR_REQUEST_V0 = new Schema(new Field("group_id",
+ STRING,
+ "The unique group id."));
+
+ public static final Schema GROUP_COORDINATOR_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
+ new Field("coordinator",
+ BROKER,
+ "Host and port information for the coordinator for a consumer group."));
+
+ public static final Schema[] GROUP_COORDINATOR_REQUEST = new Schema[] {GROUP_COORDINATOR_REQUEST_V0};
+ public static final Schema[] GROUP_COORDINATOR_RESPONSE = new Schema[] {GROUP_COORDINATOR_RESPONSE_V0};
/* Controlled shutdown api */
public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = new Schema(new Field("broker_id",
@@ -616,12 +669,13 @@ public class Protocol {
REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_REQUEST;
REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST;
REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST;
- REQUESTS[ApiKeys.GROUP_METADATA.id] = GROUP_METADATA_REQUEST;
+ REQUESTS[ApiKeys.GROUP_COORDINATOR.id] = GROUP_COORDINATOR_REQUEST;
REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST;
REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST;
REQUESTS[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_REQUEST;
REQUESTS[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_REQUEST;
-
+ REQUESTS[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_REQUEST;
+ REQUESTS[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_REQUEST;
RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -633,11 +687,13 @@ public class Protocol {
RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_RESPONSE;
RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE;
RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE;
- RESPONSES[ApiKeys.GROUP_METADATA.id] = GROUP_METADATA_RESPONSE;
+ RESPONSES[ApiKeys.GROUP_COORDINATOR.id] = GROUP_COORDINATOR_RESPONSE;
RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE;
RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE;
RESPONSES[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_RESPONSE;
RESPONSES[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_RESPONSE;
+ RESPONSES[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_RESPONSE;
+ RESPONSES[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_RESPONSE;
/* set the maximum version of each api */
for (ApiKeys api : ApiKeys.values())
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index ef2525e..54c3deb 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -98,6 +98,14 @@ public class Struct {
return (Struct) get(name);
}
+ public Byte getByte(Field field) {
+ return (Byte) get(field);
+ }
+
+ public byte getByte(String name) {
+ return (Byte) get(name);
+ }
+
public Short getShort(Field field) {
return (Short) get(field);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 03e77a5..8dfa3f6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -49,8 +49,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
return OffsetCommitRequest.parse(buffer, versionId);
case OFFSET_FETCH:
return OffsetFetchRequest.parse(buffer, versionId);
- case GROUP_METADATA:
- return GroupMetadataRequest.parse(buffer, versionId);
+ case GROUP_COORDINATOR:
+ return GroupCoordinatorRequest.parse(buffer, versionId);
case JOIN_GROUP:
return JoinGroupRequest.parse(buffer, versionId);
case HEARTBEAT:
@@ -67,6 +67,10 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
return UpdateMetadataRequest.parse(buffer, versionId);
case LEADER_AND_ISR:
return LeaderAndIsrRequest.parse(buffer, versionId);
+ case DESCRIBE_GROUPS:
+ return DescribeGroupsRequest.parse(buffer, versionId);
+ case LIST_GROUPS:
+ return ListGroupsRequest.parse(buffer, versionId);
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
new file mode 100644
index 0000000..a545cca
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DescribeGroupsRequest extends AbstractRequest {
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.DESCRIBE_GROUPS.id);
+ private static final String GROUP_IDS_KEY_NAME = "group_ids";
+
+ private final List<String> groupIds;
+
+ public DescribeGroupsRequest(List<String> groupIds) {
+ super(new Struct(CURRENT_SCHEMA));
+ struct.set(GROUP_IDS_KEY_NAME, groupIds.toArray());
+ this.groupIds = groupIds;
+ }
+
+ public DescribeGroupsRequest(Struct struct) {
+ super(struct);
+ this.groupIds = new ArrayList<>();
+ for (Object groupId : struct.getArray(GROUP_IDS_KEY_NAME))
+ this.groupIds.add((String) groupId);
+ }
+
+ public List<String> groupIds() {
+ return groupIds;
+ }
+
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ switch (versionId) {
+ case 0:
+ return DescribeGroupsResponse.fromError(Errors.forException(e), groupIds);
+
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.DESCRIBE_GROUPS.id)));
+ }
+ }
+
+ public static DescribeGroupsRequest parse(ByteBuffer buffer, int versionId) {
+ return new DescribeGroupsRequest(ProtoUtils.parseRequest(ApiKeys.DESCRIBE_GROUPS.id, versionId, buffer));
+ }
+
+ public static DescribeGroupsRequest parse(ByteBuffer buffer) {
+ return new DescribeGroupsRequest((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
new file mode 100644
index 0000000..c71e7d2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DescribeGroupsResponse extends AbstractRequestResponse {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.DESCRIBE_GROUPS.id);
+
+ private static final String GROUPS_KEY_NAME = "groups";
+
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+ private static final String GROUP_ID_KEY_NAME = "group_id";
+ private static final String GROUP_STATE_KEY_NAME = "state";
+ private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
+ private static final String PROTOCOL_KEY_NAME = "protocol";
+
+ private static final String MEMBERS_KEY_NAME = "members";
+ private static final String MEMBER_ID_KEY_NAME = "member_id";
+ private static final String CLIENT_ID_KEY_NAME = "client_id";
+ private static final String CLIENT_HOST_KEY_NAME = "client_host";
+ private static final String MEMBER_METADATA_KEY_NAME = "member_metadata";
+ private static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
+
+ public static final String UNKNOWN_STATE = "";
+ public static final String UNKNOWN_PROTOCOL_TYPE = "";
+ public static final String UNKNOWN_PROTOCOL = "";
+
+ /**
+ * Possible per-group error codes:
+ *
+ * GROUP_LOAD_IN_PROGRESS (14)
+ * GROUP_COORDINATOR_NOT_AVAILABLE (15)
+ * NOT_COORDINATOR_FOR_GROUP (16)
+ * AUTHORIZATION_FAILED (29)
+ */
+
+ private final Map<String, GroupMetadata> groups;
+
+ public DescribeGroupsResponse(Map<String, GroupMetadata> groups) {
+ super(new Struct(CURRENT_SCHEMA));
+
+ List<Struct> groupStructs = new ArrayList<>();
+ for (Map.Entry<String, GroupMetadata> groupEntry : groups.entrySet()) {
+ Struct groupStruct = struct.instance(GROUPS_KEY_NAME);
+ GroupMetadata group = groupEntry.getValue();
+ groupStruct.set(GROUP_ID_KEY_NAME, groupEntry.getKey());
+ groupStruct.set(ERROR_CODE_KEY_NAME, group.errorCode);
+ groupStruct.set(GROUP_STATE_KEY_NAME, group.state);
+ groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType);
+ groupStruct.set(PROTOCOL_KEY_NAME, group.protocol);
+ List<Struct> membersList = new ArrayList<>();
+ for (GroupMember member : group.members) {
+ Struct memberStruct = groupStruct.instance(MEMBERS_KEY_NAME);
+ memberStruct.set(MEMBER_ID_KEY_NAME, member.memberId);
+ memberStruct.set(CLIENT_ID_KEY_NAME, member.clientId);
+ memberStruct.set(CLIENT_HOST_KEY_NAME, member.clientHost);
+ memberStruct.set(MEMBER_METADATA_KEY_NAME, member.memberMetadata);
+ memberStruct.set(MEMBER_ASSIGNMENT_KEY_NAME, member.memberAssignment);
+ membersList.add(memberStruct);
+ }
+ groupStruct.set(MEMBERS_KEY_NAME, membersList.toArray());
+ groupStructs.add(groupStruct);
+ }
+ struct.set(GROUPS_KEY_NAME, groupStructs.toArray());
+ this.groups = groups;
+ }
+
+ public DescribeGroupsResponse(Struct struct) {
+ super(struct);
+ this.groups = new HashMap<>();
+ for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
+ Struct groupStruct = (Struct) groupObj;
+
+ String groupId = groupStruct.getString(GROUP_ID_KEY_NAME);
+ short errorCode = groupStruct.getShort(ERROR_CODE_KEY_NAME);
+ String state = groupStruct.getString(GROUP_STATE_KEY_NAME);
+ String protocolType = groupStruct.getString(PROTOCOL_TYPE_KEY_NAME);
+ String protocol = groupStruct.getString(PROTOCOL_KEY_NAME);
+
+ List<GroupMember> members = new ArrayList<>();
+ for (Object memberObj : groupStruct.getArray(MEMBERS_KEY_NAME)) {
+ Struct memberStruct = (Struct) memberObj;
+ String memberId = memberStruct.getString(MEMBER_ID_KEY_NAME);
+ String clientId = memberStruct.getString(CLIENT_ID_KEY_NAME);
+ String clientHost = memberStruct.getString(CLIENT_HOST_KEY_NAME);
+ ByteBuffer memberMetadata = memberStruct.getBytes(MEMBER_METADATA_KEY_NAME);
+ ByteBuffer memberAssignment = memberStruct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
+ members.add(new GroupMember(memberId, clientId, clientHost,
+ memberMetadata, memberAssignment));
+ }
+ this.groups.put(groupId, new GroupMetadata(errorCode, state, protocolType, protocol, members));
+ }
+ }
+
+ public Map<String, GroupMetadata> groups() {
+ return groups;
+ }
+
+
+ public static class GroupMetadata {
+ private final short errorCode;
+ private final String state;
+ private final String protocolType;
+ private final String protocol;
+ private final List<GroupMember> members;
+
+ public GroupMetadata(short errorCode,
+ String state,
+ String protocolType,
+ String protocol,
+ List<GroupMember> members) {
+ this.errorCode = errorCode;
+ this.state = state;
+ this.protocolType = protocolType;
+ this.protocol = protocol;
+ this.members = members;
+ }
+
+ public short errorCode() {
+ return errorCode;
+ }
+
+ public String state() {
+ return state;
+ }
+
+ public String protocolType() {
+ return protocolType;
+ }
+
+ public String protocol() {
+ return protocol;
+ }
+
+ public List<GroupMember> members() {
+ return members;
+ }
+
+ public static GroupMetadata forError(Errors error) {
+ return new DescribeGroupsResponse.GroupMetadata(
+ error.code(),
+ DescribeGroupsResponse.UNKNOWN_STATE,
+ DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
+ DescribeGroupsResponse.UNKNOWN_PROTOCOL,
+ Collections.<DescribeGroupsResponse.GroupMember>emptyList());
+ }
+ }
+
+ public static class GroupMember {
+ private final String memberId;
+ private final String clientId;
+ private final String clientHost;
+ private final ByteBuffer memberMetadata;
+ private final ByteBuffer memberAssignment;
+
+ public GroupMember(String memberId,
+ String clientId,
+ String clientHost,
+ ByteBuffer memberMetadata,
+ ByteBuffer memberAssignment) {
+ this.memberId = memberId;
+ this.clientId = clientId;
+ this.clientHost = clientHost;
+ this.memberMetadata = memberMetadata;
+ this.memberAssignment = memberAssignment;
+ }
+
+ public String memberId() {
+ return memberId;
+ }
+
+ public String clientId() {
+ return clientId;
+ }
+
+ public String clientHost() {
+ return clientHost;
+ }
+
+ public ByteBuffer memberMetadata() {
+ return memberMetadata;
+ }
+
+ public ByteBuffer memberAssignment() {
+ return memberAssignment;
+ }
+ }
+
+ public static DescribeGroupsResponse parse(ByteBuffer buffer) {
+ return new DescribeGroupsResponse((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+
+ public static DescribeGroupsResponse fromError(Errors error, List<String> groupIds) {
+ GroupMetadata errorMetadata = GroupMetadata.forError(error);
+ Map<String, GroupMetadata> groups = new HashMap<>();
+ for (String groupId : groupIds)
+ groups.put(groupId, errorMetadata);
+ return new DescribeGroupsResponse(groups);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
new file mode 100644
index 0000000..8c56e7f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class GroupCoordinatorRequest extends AbstractRequest {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.GROUP_COORDINATOR.id);
+ private static final String GROUP_ID_KEY_NAME = "group_id";
+
+ private final String groupId;
+
+ public GroupCoordinatorRequest(String groupId) {
+ super(new Struct(CURRENT_SCHEMA));
+
+ struct.set(GROUP_ID_KEY_NAME, groupId);
+ this.groupId = groupId;
+ }
+
+ public GroupCoordinatorRequest(Struct struct) {
+ super(struct);
+ groupId = struct.getString(GROUP_ID_KEY_NAME);
+ }
+
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ switch (versionId) {
+ case 0:
+ return new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.GROUP_COORDINATOR.id)));
+ }
+ }
+
+ public String groupId() {
+ return groupId;
+ }
+
+ public static GroupCoordinatorRequest parse(ByteBuffer buffer, int versionId) {
+ return new GroupCoordinatorRequest(ProtoUtils.parseRequest(ApiKeys.GROUP_COORDINATOR.id, versionId, buffer));
+ }
+
+ public static GroupCoordinatorRequest parse(ByteBuffer buffer) {
+ return new GroupCoordinatorRequest((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
new file mode 100644
index 0000000..c28de70
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class GroupCoordinatorResponse extends AbstractRequestResponse {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.GROUP_COORDINATOR.id);
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+ private static final String COORDINATOR_KEY_NAME = "coordinator";
+
+ // coordinator level field names
+ private static final String NODE_ID_KEY_NAME = "node_id";
+ private static final String HOST_KEY_NAME = "host";
+ private static final String PORT_KEY_NAME = "port";
+
+ private final short errorCode;
+ private final Node node;
+
+ public GroupCoordinatorResponse(short errorCode, Node node) {
+ super(new Struct(CURRENT_SCHEMA));
+ struct.set(ERROR_CODE_KEY_NAME, errorCode);
+ Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
+ coordinator.set(NODE_ID_KEY_NAME, node.id());
+ coordinator.set(HOST_KEY_NAME, node.host());
+ coordinator.set(PORT_KEY_NAME, node.port());
+ struct.set(COORDINATOR_KEY_NAME, coordinator);
+ this.errorCode = errorCode;
+ this.node = node;
+ }
+
+ public GroupCoordinatorResponse(Struct struct) {
+ super(struct);
+ errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+ Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
+ int nodeId = broker.getInt(NODE_ID_KEY_NAME);
+ String host = broker.getString(HOST_KEY_NAME);
+ int port = broker.getInt(PORT_KEY_NAME);
+ node = new Node(nodeId, host, port);
+ }
+
+ public short errorCode() {
+ return errorCode;
+ }
+
+ public Node node() {
+ return node;
+ }
+
+ public static GroupCoordinatorResponse parse(ByteBuffer buffer) {
+ return new GroupCoordinatorResponse((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java
deleted file mode 100644
index fd54c5a..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class GroupMetadataRequest extends AbstractRequest {
-
- private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.GROUP_METADATA.id);
- private static final String GROUP_ID_KEY_NAME = "group_id";
-
- private final String groupId;
-
- public GroupMetadataRequest(String groupId) {
- super(new Struct(CURRENT_SCHEMA));
-
- struct.set(GROUP_ID_KEY_NAME, groupId);
- this.groupId = groupId;
- }
-
- public GroupMetadataRequest(Struct struct) {
- super(struct);
- groupId = struct.getString(GROUP_ID_KEY_NAME);
- }
-
- @Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
- switch (versionId) {
- case 0:
- return new GroupMetadataResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
- default:
- throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
- versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.GROUP_METADATA.id)));
- }
- }
-
- public String groupId() {
- return groupId;
- }
-
- public static GroupMetadataRequest parse(ByteBuffer buffer, int versionId) {
- return new GroupMetadataRequest(ProtoUtils.parseRequest(ApiKeys.GROUP_METADATA.id, versionId, buffer));
- }
-
- public static GroupMetadataRequest parse(ByteBuffer buffer) {
- return new GroupMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java
deleted file mode 100644
index a5ef478..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class GroupMetadataResponse extends AbstractRequestResponse {
-
- private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.GROUP_METADATA.id);
- private static final String ERROR_CODE_KEY_NAME = "error_code";
- private static final String COORDINATOR_KEY_NAME = "coordinator";
-
- // coordinator level field names
- private static final String NODE_ID_KEY_NAME = "node_id";
- private static final String HOST_KEY_NAME = "host";
- private static final String PORT_KEY_NAME = "port";
-
- private final short errorCode;
- private final Node node;
-
- public GroupMetadataResponse(short errorCode, Node node) {
- super(new Struct(CURRENT_SCHEMA));
- struct.set(ERROR_CODE_KEY_NAME, errorCode);
- Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
- coordinator.set(NODE_ID_KEY_NAME, node.id());
- coordinator.set(HOST_KEY_NAME, node.host());
- coordinator.set(PORT_KEY_NAME, node.port());
- struct.set(COORDINATOR_KEY_NAME, coordinator);
- this.errorCode = errorCode;
- this.node = node;
- }
-
- public GroupMetadataResponse(Struct struct) {
- super(struct);
- errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
- Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
- int nodeId = broker.getInt(NODE_ID_KEY_NAME);
- String host = broker.getString(HOST_KEY_NAME);
- int port = broker.getInt(PORT_KEY_NAME);
- node = new Node(nodeId, host, port);
- }
-
- public short errorCode() {
- return errorCode;
- }
-
- public Node node() {
- return node;
- }
-
- public static GroupMetadataResponse parse(ByteBuffer buffer) {
- return new GroupMetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
new file mode 100644
index 0000000..439720f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+public class ListGroupsRequest extends AbstractRequest {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LIST_GROUPS.id);
+
+ public ListGroupsRequest() {
+ super(new Struct(CURRENT_SCHEMA));
+ }
+
+ public ListGroupsRequest(Struct struct) {
+ super(struct);
+ }
+
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ switch (versionId) {
+ case 0:
+ short errorCode = Errors.forException(e).code();
+ return new ListGroupsResponse(errorCode, Collections.<ListGroupsResponse.Group>emptyList());
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LIST_GROUPS.id)));
+ }
+ }
+
+ public static ListGroupsRequest parse(ByteBuffer buffer, int versionId) {
+ return new ListGroupsRequest(ProtoUtils.parseRequest(ApiKeys.LIST_GROUPS.id, versionId, buffer));
+ }
+
+ public static ListGroupsRequest parse(ByteBuffer buffer) {
+ return new ListGroupsRequest((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
new file mode 100644
index 0000000..d07f0d1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class ListGroupsResponse extends AbstractRequestResponse {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LIST_GROUPS.id);
+
+ public static final String ERROR_CODE_KEY_NAME = "error_code";
+ public static final String GROUPS_KEY_NAME = "groups";
+ public static final String GROUP_ID_KEY_NAME = "group_id";
+ public static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
+
+ /**
+ * Possible error codes:
+ *
+ * GROUP_COORDINATOR_NOT_AVAILABLE (15)
+ * AUTHORIZATION_FAILED (29)
+ */
+
+ private final short errorCode;
+ private final List<Group> groups;
+
+ public ListGroupsResponse(short errorCode, List<Group> groups) {
+ super(new Struct(CURRENT_SCHEMA));
+ struct.set(ERROR_CODE_KEY_NAME, errorCode);
+ List<Struct> groupList = new ArrayList<>();
+ for (Group group : groups) {
+ Struct groupStruct = struct.instance(GROUPS_KEY_NAME);
+ groupStruct.set(GROUP_ID_KEY_NAME, group.groupId);
+ groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType);
+ groupList.add(groupStruct);
+ }
+ struct.set(GROUPS_KEY_NAME, groupList.toArray());
+ this.errorCode = errorCode;
+ this.groups = groups;
+ }
+
+ public ListGroupsResponse(Struct struct) {
+ super(struct);
+ this.errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+ this.groups = new ArrayList<>();
+ for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
+ Struct groupStruct = (Struct) groupObj;
+ String groupId = groupStruct.getString(GROUP_ID_KEY_NAME);
+ String protocolType = groupStruct.getString(PROTOCOL_TYPE_KEY_NAME);
+ this.groups.add(new Group(groupId, protocolType));
+ }
+ }
+
+ public List<Group> groups() {
+ return groups;
+ }
+
+ public short errorCode() {
+ return errorCode;
+ }
+
+ public static class Group {
+ private final String groupId;
+ private final String protocolType;
+
+ public Group(String groupId, String protocolType) {
+ this.groupId = groupId;
+ this.protocolType = protocolType;
+ }
+
+ public String groupId() {
+ return groupId;
+ }
+
+ public String protocolType() {
+ return protocolType;
+ }
+
+ }
+
+ public static ListGroupsResponse parse(ByteBuffer buffer) {
+ return new ListGroupsResponse((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+
+ public static ListGroupsResponse fromError(Errors error) {
+ return new ListGroupsResponse(error.code(), Collections.<Group>emptyList());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 2029e92..8667f22 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -32,7 +32,7 @@ import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.requests.GroupMetadataResponse;
+import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
@@ -729,7 +729,7 @@ public class ConsumerCoordinatorTest {
}
private Struct consumerMetadataResponse(Node node, short error) {
- GroupMetadataResponse response = new GroupMetadataResponse(error, node);
+ GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node);
return response.toStruct();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index fb21802..761b9db 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -28,6 +28,7 @@ import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -43,9 +44,9 @@ public class RequestResponseTest {
List<AbstractRequestResponse> requestResponseList = Arrays.asList(
createRequestHeader(),
createResponseHeader(),
- createConsumerMetadataRequest(),
- createConsumerMetadataRequest().getErrorResponse(0, new UnknownServerException()),
- createConsumerMetadataResponse(),
+ createGroupCoordinatorRequest(),
+ createGroupCoordinatorRequest().getErrorResponse(0, new UnknownServerException()),
+ createGroupCoordinatorResponse(),
createControlledShutdownRequest(),
createControlledShutdownResponse(),
createControlledShutdownRequest().getErrorResponse(1, new UnknownServerException()),
@@ -61,6 +62,12 @@ public class RequestResponseTest {
createLeaveGroupRequest(),
createLeaveGroupRequest().getErrorResponse(0, new UnknownServerException()),
createLeaveGroupResponse(),
+ createListGroupsRequest(),
+ createListGroupsRequest().getErrorResponse(0, new UnknownServerException()),
+ createListGroupsResponse(),
+ createDescribeGroupRequest(),
+ createDescribeGroupRequest().getErrorResponse(0, new UnknownServerException()),
+ createDescribeGroupResponse(),
createListOffsetRequest(),
createListOffsetRequest().getErrorResponse(0, new UnknownServerException()),
createListOffsetResponse(),
@@ -150,12 +157,12 @@ public class RequestResponseTest {
return new ResponseHeader(10);
}
- private AbstractRequest createConsumerMetadataRequest() {
- return new GroupMetadataRequest("test-group");
+ private AbstractRequest createGroupCoordinatorRequest() {
+ return new GroupCoordinatorRequest("test-group");
}
- private AbstractRequestResponse createConsumerMetadataResponse() {
- return new GroupMetadataResponse(Errors.NONE.code(), new Node(10, "host1", 2014));
+ private AbstractRequestResponse createGroupCoordinatorResponse() {
+ return new GroupCoordinatorResponse(Errors.NONE.code(), new Node(10, "host1", 2014));
}
private AbstractRequest createFetchRequest() {
@@ -193,6 +200,30 @@ public class RequestResponseTest {
return new JoinGroupResponse(Errors.NONE.code(), 1, "range", "consumer1", "leader", members);
}
+ private AbstractRequest createListGroupsRequest() {
+ return new ListGroupsRequest();
+ }
+
+ private AbstractRequestResponse createListGroupsResponse() {
+ List<ListGroupsResponse.Group> groups = Arrays.asList(new ListGroupsResponse.Group("test-group", "consumer"));
+ return new ListGroupsResponse(Errors.NONE.code(), groups);
+ }
+
+ private AbstractRequest createDescribeGroupRequest() {
+ return new DescribeGroupsRequest(Collections.singletonList("test-group"));
+ }
+
+ private AbstractRequestResponse createDescribeGroupResponse() {
+ String clientId = "consumer-1";
+ String clientHost = "localhost";
+ ByteBuffer empty = ByteBuffer.allocate(0);
+ DescribeGroupsResponse.GroupMember member = new DescribeGroupsResponse.GroupMember("memberId",
+ clientId, clientHost, empty, empty);
+ DescribeGroupsResponse.GroupMetadata metadata = new DescribeGroupsResponse.GroupMetadata(Errors.NONE.code(),
+ "STABLE", "consumer", "roundrobin", Arrays.asList(member));
+ return new DescribeGroupsResponse(Collections.singletonMap("test-group", metadata));
+ }
+
private AbstractRequest createLeaveGroupRequest() {
return new LeaveGroupRequest("group1", "consumer1");
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
index ca53674..ac9df44 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
@@ -25,7 +25,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.requests.GroupMetadataResponse;
+import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
@@ -386,7 +386,7 @@ public class WorkerCoordinatorTest {
private Struct groupMetadataResponse(Node node, short error) {
- GroupMetadataResponse response = new GroupMetadataResponse(error, node);
+ GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node);
return response.toStruct();
}
[2/3] kafka git commit: KAFKA-2687: Add support for ListGroups and
DescribeGroup APIs
Posted by gu...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
new file mode 100644
index 0000000..ddd3114
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package kafka.admin
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicInteger
+
+import kafka.common.KafkaException
+import kafka.coordinator.{GroupOverview, GroupSummary, MemberSummary}
+import kafka.utils.Logging
+import org.apache.kafka.clients._
+import org.apache.kafka.clients.consumer.internals.{SendFailedException, ConsumerProtocol, ConsumerNetworkClient, RequestFuture}
+import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SaslConfigs, SslConfigs}
+import org.apache.kafka.common.errors.DisconnectException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.Selector
+import org.apache.kafka.common.protocol.types.Struct
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.{SystemTime, Time, Utils}
+import org.apache.kafka.common.{TopicPartition, Cluster, Node}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+class AdminClient(val time: Time,
+ val requestTimeoutMs: Int,
+ val client: ConsumerNetworkClient,
+ val bootstrapBrokers: List[Node]) extends Logging {
+
+ private def send(target: Node,
+ api: ApiKeys,
+ request: AbstractRequest): Struct = {
+ var now = time.milliseconds()
+ val deadline = now + requestTimeoutMs
+ var future: RequestFuture[ClientResponse] = null
+
+ do {
+ future = client.send(target, api, request)
+ client.poll(future)
+
+ if (future.succeeded())
+ return if (future.value().wasDisconnected()) {
+ throw new DisconnectException()
+ } else {
+ future.value().responseBody()
+ }
+
+ now = time.milliseconds()
+ } while (now < deadline && future.exception().isInstanceOf[SendFailedException])
+
+ throw future.exception()
+ }
+
+ private def sendAnyNode(api: ApiKeys, request: AbstractRequest): Struct = {
+ bootstrapBrokers.foreach {
+ case broker =>
+ try {
+ return send(broker, api, request)
+ } catch {
+ case e: Exception =>
+ debug(s"Request ${api} failed against node ${broker}", e)
+ }
+ }
+ throw new RuntimeException(s"Request ${api} failed on brokers ${bootstrapBrokers}")
+ }
+
+ private def findCoordinator(groupId: String): Node = {
+ val request = new GroupCoordinatorRequest(groupId)
+ val responseBody = sendAnyNode(ApiKeys.GROUP_COORDINATOR, request)
+ val response = new GroupCoordinatorResponse(responseBody)
+ Errors.forCode(response.errorCode()).maybeThrow()
+ response.node()
+ }
+
+ def listGroups(node: Node): List[GroupOverview] = {
+ val responseBody = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest())
+ val response = new ListGroupsResponse(responseBody)
+ Errors.forCode(response.errorCode()).maybeThrow()
+ response.groups().map(group => GroupOverview(group.groupId(), group.protocolType())).toList
+ }
+
+ private def findAllBrokers(): List[Node] = {
+ val request = new MetadataRequest(List[String]())
+ val responseBody = sendAnyNode(ApiKeys.METADATA, request)
+ val response = new MetadataResponse(responseBody)
+ if (!response.errors().isEmpty)
+ debug(s"Metadata request contained errors: ${response.errors()}")
+ response.cluster().nodes().asScala.toList
+ }
+
+ def listAllGroups(): Map[Node, List[GroupOverview]] = {
+ findAllBrokers.map {
+ case broker =>
+ broker -> {
+ try {
+ listGroups(broker)
+ } catch {
+ case e: Exception =>
+ debug(s"Failed to find groups from broker ${broker}", e)
+ List[GroupOverview]()
+ }
+ }
+ }.toMap
+ }
+
+ def listAllConsumerGroups(): Map[Node, List[GroupOverview]] = {
+ listAllGroups().mapValues { groups =>
+ groups.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
+ }
+ }
+
+ def listAllGroupsFlattened(): List[GroupOverview] = {
+ listAllGroups.values.flatten.toList
+ }
+
+ def listAllConsumerGroupsFlattened(): List[GroupOverview] = {
+ listAllGroupsFlattened.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
+ }
+
+ def describeGroup(groupId: String): GroupSummary = {
+ val coordinator = findCoordinator(groupId)
+ val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS, new DescribeGroupsRequest(List(groupId).asJava))
+ val response = new DescribeGroupsResponse(responseBody)
+ val metadata = response.groups().get(groupId)
+ if (metadata == null)
+ throw new KafkaException(s"Response from broker contained no metadata for group ${groupId}")
+
+ Errors.forCode(metadata.errorCode()).maybeThrow()
+ val members = metadata.members().map {
+ case member =>
+ val metadata = Utils.readBytes(member.memberMetadata())
+ val assignment = Utils.readBytes(member.memberAssignment())
+ MemberSummary(member.memberId(), member.clientId(), member.clientHost(), metadata, assignment)
+ }.toList
+ GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members)
+ }
+
+ def describeConsumerGroup(groupId: String): Map[String, List[TopicPartition]] = {
+ val group = describeGroup(groupId)
+ if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
+ throw new IllegalArgumentException(s"Group ${groupId} is not a consumer group")
+
+ group.members.map {
+ case member =>
+ val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
+ member.memberId -> assignment.partitions().asScala.toList
+ }.toMap
+ }
+
+}
+
+object AdminClient {
+ val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
+ val DefaultRequestTimeoutMs = 5000
+ val DefaultMaxInFlightRequestsPerConnection = 100
+ val DefaultReconnectBackoffMs = 50
+ val DefaultSendBufferBytes = 128 * 1024
+ val DefaultReceiveBufferBytes = 32 * 1024
+ val DefaultRetryBackoffMs = 100
+ val AdminClientIdSequence = new AtomicInteger(1)
+ val AdminConfigDef = {
+ val config = new ConfigDef()
+ .define(
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+ Type.LIST,
+ Importance.HIGH,
+ CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+ .define(
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+ ConfigDef.Type.STRING,
+ CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
+ ConfigDef.Importance.MEDIUM,
+ CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+ .withClientSslSupport()
+ .withClientSaslSupport()
+ config
+ }
+
+ class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals, false)
+
+ def createSimplePlaintext(brokerUrl: String): AdminClient = {
+ val config = Map(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> brokerUrl)
+ create(new AdminConfig(config))
+ }
+
+ def create(props: Map[String, _]): AdminClient = create(new AdminConfig(props))
+
+ def create(config: AdminConfig): AdminClient = {
+ val time = new SystemTime
+ val metrics = new Metrics(time)
+ val metadata = new Metadata
+ val channelBuilder = ClientUtils.createChannelBuilder(config.values())
+
+ val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+ val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls)
+ val bootstrapCluster = Cluster.bootstrap(brokerAddresses)
+ metadata.update(bootstrapCluster, 0)
+
+ val selector = new Selector(
+ DefaultConnectionMaxIdleMs,
+ metrics,
+ time,
+ "admin",
+ Map[String, String](),
+ channelBuilder)
+
+ val networkClient = new NetworkClient(
+ selector,
+ metadata,
+ "admin-" + AdminClientIdSequence.getAndIncrement(),
+ DefaultMaxInFlightRequestsPerConnection,
+ DefaultReconnectBackoffMs,
+ DefaultSendBufferBytes,
+ DefaultReceiveBufferBytes,
+ DefaultRequestTimeoutMs,
+ time)
+
+ val highLevelClient = new ConsumerNetworkClient(
+ networkClient,
+ metadata,
+ time,
+ DefaultRetryBackoffMs)
+
+ new AdminClient(
+ time,
+ DefaultRequestTimeoutMs,
+ highLevelClient,
+ bootstrapCluster.nodes().asScala.toList)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
new file mode 100644
index 0000000..43e78f5
--- /dev/null
+++ b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+
+import kafka.common.ErrorMapping
+import kafka.network.{RequestOrResponseSend, RequestChannel}
+import kafka.network.RequestChannel.Response
+
+object GroupCoordinatorRequest {
+ val CurrentVersion = 0.shortValue
+ val DefaultClientId = ""
+
+ def readFrom(buffer: ByteBuffer) = {
+ // envelope
+ val versionId = buffer.getShort
+ val correlationId = buffer.getInt
+ val clientId = ApiUtils.readShortString(buffer)
+
+ // request
+ val group = ApiUtils.readShortString(buffer)
+ GroupCoordinatorRequest(group, versionId, correlationId, clientId)
+ }
+
+}
+
+case class GroupCoordinatorRequest(group: String,
+ versionId: Short = GroupCoordinatorRequest.CurrentVersion,
+ correlationId: Int = 0,
+ clientId: String = GroupCoordinatorRequest.DefaultClientId)
+ extends RequestOrResponse(Some(RequestKeys.GroupCoordinatorKey)) {
+
+ def sizeInBytes =
+ 2 + /* versionId */
+ 4 + /* correlationId */
+ ApiUtils.shortStringLength(clientId) +
+ ApiUtils.shortStringLength(group)
+
+ def writeTo(buffer: ByteBuffer) {
+ // envelope
+ buffer.putShort(versionId)
+ buffer.putInt(correlationId)
+ ApiUtils.writeShortString(buffer, clientId)
+
+ // consumer metadata request
+ ApiUtils.writeShortString(buffer, group)
+ }
+
+ override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+ // return ConsumerCoordinatorNotAvailable for all uncaught errors
+ val errorResponse = GroupCoordinatorResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId)
+ requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
+ }
+
+ def describe(details: Boolean) = {
+ val consumerMetadataRequest = new StringBuilder
+ consumerMetadataRequest.append("Name: " + this.getClass.getSimpleName)
+ consumerMetadataRequest.append("; Version: " + versionId)
+ consumerMetadataRequest.append("; CorrelationId: " + correlationId)
+ consumerMetadataRequest.append("; ClientId: " + clientId)
+ consumerMetadataRequest.append("; Group: " + group)
+ consumerMetadataRequest.toString()
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala b/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
new file mode 100644
index 0000000..4cd7db8
--- /dev/null
+++ b/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.cluster.BrokerEndPoint
+import kafka.common.ErrorMapping
+
+object GroupCoordinatorResponse {
+ val CurrentVersion = 0
+
+ private val NoBrokerEndpointOpt = Some(BrokerEndPoint(id = -1, host = "", port = -1))
+
+ def readFrom(buffer: ByteBuffer) = {
+ val correlationId = buffer.getInt
+ val errorCode = buffer.getShort
+ val broker = BrokerEndPoint.readFrom(buffer)
+ val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
+ Some(broker)
+ else
+ None
+
+ GroupCoordinatorResponse(coordinatorOpt, errorCode, correlationId)
+ }
+
+}
+
+case class GroupCoordinatorResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int)
+ extends RequestOrResponse() {
+
+ def sizeInBytes =
+ 4 + /* correlationId */
+ 2 + /* error code */
+ coordinatorOpt.orElse(GroupCoordinatorResponse.NoBrokerEndpointOpt).get.sizeInBytes
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putInt(correlationId)
+ buffer.putShort(errorCode)
+ coordinatorOpt.orElse(GroupCoordinatorResponse.NoBrokerEndpointOpt).foreach(_.writeTo(buffer))
+ }
+
+ def describe(details: Boolean) = toString
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/api/GroupMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupMetadataRequest.scala b/core/src/main/scala/kafka/api/GroupMetadataRequest.scala
deleted file mode 100644
index 075ddb5..0000000
--- a/core/src/main/scala/kafka/api/GroupMetadataRequest.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-
-import kafka.common.ErrorMapping
-import kafka.network.{RequestOrResponseSend, RequestChannel}
-import kafka.network.RequestChannel.Response
-
-object GroupMetadataRequest {
- val CurrentVersion = 0.shortValue
- val DefaultClientId = ""
-
- def readFrom(buffer: ByteBuffer) = {
- // envelope
- val versionId = buffer.getShort
- val correlationId = buffer.getInt
- val clientId = ApiUtils.readShortString(buffer)
-
- // request
- val group = ApiUtils.readShortString(buffer)
- GroupMetadataRequest(group, versionId, correlationId, clientId)
- }
-
-}
-
-case class GroupMetadataRequest(group: String,
- versionId: Short = GroupMetadataRequest.CurrentVersion,
- correlationId: Int = 0,
- clientId: String = GroupMetadataRequest.DefaultClientId)
- extends RequestOrResponse(Some(RequestKeys.GroupMetadataKey)) {
-
- def sizeInBytes =
- 2 + /* versionId */
- 4 + /* correlationId */
- ApiUtils.shortStringLength(clientId) +
- ApiUtils.shortStringLength(group)
-
- def writeTo(buffer: ByteBuffer) {
- // envelope
- buffer.putShort(versionId)
- buffer.putInt(correlationId)
- ApiUtils.writeShortString(buffer, clientId)
-
- // consumer metadata request
- ApiUtils.writeShortString(buffer, group)
- }
-
- override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
- // return ConsumerCoordinatorNotAvailable for all uncaught errors
- val errorResponse = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId)
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
- }
-
- def describe(details: Boolean) = {
- val consumerMetadataRequest = new StringBuilder
- consumerMetadataRequest.append("Name: " + this.getClass.getSimpleName)
- consumerMetadataRequest.append("; Version: " + versionId)
- consumerMetadataRequest.append("; CorrelationId: " + correlationId)
- consumerMetadataRequest.append("; ClientId: " + clientId)
- consumerMetadataRequest.append("; Group: " + group)
- consumerMetadataRequest.toString()
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/api/GroupMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupMetadataResponse.scala b/core/src/main/scala/kafka/api/GroupMetadataResponse.scala
deleted file mode 100644
index 2d65917..0000000
--- a/core/src/main/scala/kafka/api/GroupMetadataResponse.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-import kafka.cluster.BrokerEndPoint
-import kafka.common.ErrorMapping
-
-object GroupMetadataResponse {
- val CurrentVersion = 0
-
- private val NoBrokerEndpointOpt = Some(BrokerEndPoint(id = -1, host = "", port = -1))
-
- def readFrom(buffer: ByteBuffer) = {
- val correlationId = buffer.getInt
- val errorCode = buffer.getShort
- val broker = BrokerEndPoint.readFrom(buffer)
- val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
- Some(broker)
- else
- None
-
- GroupMetadataResponse(coordinatorOpt, errorCode, correlationId)
- }
-
-}
-
-case class GroupMetadataResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int)
- extends RequestOrResponse() {
-
- def sizeInBytes =
- 4 + /* correlationId */
- 2 + /* error code */
- coordinatorOpt.orElse(GroupMetadataResponse.NoBrokerEndpointOpt).get.sizeInBytes
-
- def writeTo(buffer: ByteBuffer) {
- buffer.putInt(correlationId)
- buffer.putShort(errorCode)
- coordinatorOpt.orElse(GroupMetadataResponse.NoBrokerEndpointOpt).foreach(_.writeTo(buffer))
- }
-
- def describe(details: Boolean) = toString
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index 669b63a..2363099 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -33,12 +33,17 @@ object RequestKeys {
val ControlledShutdownKey: Short = 7
val OffsetCommitKey: Short = 8
val OffsetFetchKey: Short = 9
- val GroupMetadataKey: Short = 10
+ val GroupCoordinatorKey: Short = 10
val JoinGroupKey: Short = 11
val HeartbeatKey: Short = 12
val LeaveGroupKey: Short = 13
val SyncGroupKey: Short = 14
+ val DescribeGroupsKey: Short = 15
+ val ListGroupsKey: Short = 16
+ // NOTE: this map only includes the server-side request/response handlers. Newer
+ // request types should only use the client-side versions which are parsed with
+ // o.a.k.common.requests.AbstractRequest.getRequest()
val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
FetchKey -> ("Fetch", FetchRequest.readFrom),
@@ -49,8 +54,7 @@ object RequestKeys {
UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom),
ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom),
OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom),
- OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom),
- GroupMetadataKey -> ("GroupMetadata", GroupMetadataRequest.readFrom)
+ OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom)
)
def nameForKey(key: Short): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 36b5b3b..2f836c0 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -151,9 +151,9 @@ object ClientUtils extends Logging{
if (!queryChannel.isConnected)
queryChannel = channelToAnyBroker(zkUtils)
debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group))
- queryChannel.send(GroupMetadataRequest(group))
+ queryChannel.send(GroupCoordinatorRequest(group))
val response = queryChannel.receive()
- val consumerMetadataResponse = GroupMetadataResponse.readFrom(response.payload())
+ val consumerMetadataResponse = GroupCoordinatorResponse.readFrom(response.payload())
debug("Consumer metadata response: " + consumerMetadataResponse.toString)
if (consumerMetadataResponse.errorCode == ErrorMapping.NoError)
coordinatorOpt = consumerMetadataResponse.coordinatorOpt
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 81cb51b..6f53fac 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -60,6 +60,7 @@ object ErrorMapping {
// 27: COMMITTING_PARTITIONS_NOT_ASSIGNED
// 28: INVALID_COMMIT_OFFSET_SIZE
val AuthorizationCode: Short = 29
+ // 30: REBALANCE_IN_PROGRESS
private val exceptionToCode =
Map[Class[Throwable], Short](
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 5b1aead..e15aca4 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -112,9 +112,9 @@ class SimpleConsumer(val host: String,
TopicMetadataResponse.readFrom(response.payload())
}
- def send(request: GroupMetadataRequest): GroupMetadataResponse = {
+ def send(request: GroupCoordinatorRequest): GroupCoordinatorResponse = {
val response = sendRequest(request)
- GroupMetadataResponse.readFrom(response.payload())
+ GroupCoordinatorResponse.readFrom(response.payload())
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 97ce22b..2015371 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -107,6 +107,7 @@ class GroupCoordinator(val brokerId: Int,
def handleJoinGroup(groupId: String,
memberId: String,
clientId: String,
+ clientHost: String,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
@@ -132,10 +133,10 @@ class GroupCoordinator(val brokerId: Int,
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
} else {
group = groupManager.addGroup(groupId, protocolType)
- doJoinGroup(group, memberId, clientId, sessionTimeoutMs, protocolType, protocols, responseCallback)
+ doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
}
} else {
- doJoinGroup(group, memberId, clientId, sessionTimeoutMs, protocolType, protocols, responseCallback)
+ doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
}
}
}
@@ -143,6 +144,7 @@ class GroupCoordinator(val brokerId: Int,
private def doJoinGroup(group: GroupMetadata,
memberId: String,
clientId: String,
+ clientHost: String,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
@@ -166,7 +168,7 @@ class GroupCoordinator(val brokerId: Int,
case PreparingRebalance =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
- addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback)
+ addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
} else {
val member = group.get(memberId)
updateMemberAndRebalance(group, member, protocols, responseCallback)
@@ -174,7 +176,7 @@ class GroupCoordinator(val brokerId: Int,
case AwaitingSync =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
- addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback)
+ addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
} else {
val member = group.get(memberId)
if (member.matches(protocols)) {
@@ -201,7 +203,7 @@ class GroupCoordinator(val brokerId: Int,
case Stable =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
// if the member id is unknown, register the member to the group
- addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback)
+ addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
} else {
val member = group.get(memberId)
if (memberId == group.leaderId || !member.matches(protocols)) {
@@ -269,13 +271,30 @@ class GroupCoordinator(val brokerId: Int,
group.get(memberId).awaitingSyncCallback = responseCallback
completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
- // if this is the leader, then we can transition to stable and
- // propagate the assignment to any awaiting members
+ // if this is the leader, then we can attempt to persist state and transition to stable
if (memberId == group.leaderId) {
- group.transitionTo(Stable)
- // persist the group metadata and upon finish propagate the assignment
- groupManager.storeGroup(group, groupAssignment)
+ // fill any missing members with an empty assignment
+ val missing = group.allMembers -- groupAssignment.keySet
+ val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
+
+ // persist the group metadata and upon finish transition to stable and propagate the assignment
+ groupManager.storeGroup(group, assignment, (errorCode: Short) => {
+ group synchronized {
+ // another member may have joined the group while we were awaiting this callback,
+ // so we must ensure we are still in the AwaitingSync state when it gets invoked.
+ // if we have transitioned to another state, then we shouldn't do anything
+ if (group.is(AwaitingSync)) {
+ if (errorCode != Errors.NONE.code) {
+ resetAndPropagateAssignmentError(group, errorCode)
+ maybePrepareRebalance(group)
+ } else if (group.is(AwaitingSync)) {
+ setAndPropagateAssignment(group, assignment)
+ group.transitionTo(Stable)
+ }
+ }
+ }
+ })
}
case Stable =>
@@ -413,6 +432,34 @@ class GroupCoordinator(val brokerId: Int,
}
}
+ def handleListGroups(): (Errors, List[GroupOverview]) = {
+ if (!isActive.get) {
+ (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
+ } else {
+ val errorCode = if (groupManager.isLoading()) Errors.GROUP_LOAD_IN_PROGRESS else Errors.NONE
+ (errorCode, groupManager.currentGroups.map(_.overview).toList)
+ }
+ }
+
+ def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = {
+ if (!isActive.get) {
+ (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, GroupCoordinator.EmptyGroup)
+ } else if (!isCoordinatorForGroup(groupId)) {
+ (Errors.NOT_COORDINATOR_FOR_GROUP, GroupCoordinator.EmptyGroup)
+ } else if (isCoordinatorLoadingInProgress(groupId)) {
+ (Errors.GROUP_LOAD_IN_PROGRESS, GroupCoordinator.EmptyGroup)
+ } else {
+ val group = groupManager.getGroup(groupId)
+ if (group == null) {
+ (Errors.NONE, GroupCoordinator.DeadGroup)
+ } else {
+ group synchronized {
+ (Errors.NONE, group.summary)
+ }
+ }
+ }
+ }
+
def handleGroupImmigration(offsetTopicPartitionId: Int) = {
groupManager.loadGroupsForPartition(offsetTopicPartitionId)
}
@@ -421,6 +468,27 @@ class GroupCoordinator(val brokerId: Int,
groupManager.removeGroupsForPartition(offsetTopicPartitionId)
}
+ private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) {
+ assert(group.is(AwaitingSync))
+ group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
+ propagateAssignment(group, Errors.NONE.code)
+ }
+
+ private def resetAndPropagateAssignmentError(group: GroupMetadata, errorCode: Short) {
+ assert(group.is(AwaitingSync))
+ group.allMemberMetadata.foreach(_.assignment = Array.empty[Byte])
+ propagateAssignment(group, errorCode)
+ }
+
+ private def propagateAssignment(group: GroupMetadata, errorCode: Short) {
+ for (member <- group.allMemberMetadata) {
+ if (member.awaitingSyncCallback != null) {
+ member.awaitingSyncCallback(member.assignment, errorCode)
+ member.awaitingSyncCallback = null
+ }
+ }
+ }
+
private def validGroupId(groupId: String): Boolean = {
groupId != null && !groupId.isEmpty
}
@@ -458,12 +526,13 @@ class GroupCoordinator(val brokerId: Int,
private def addMemberAndRebalance(sessionTimeoutMs: Int,
clientId: String,
+ clientHost: String,
protocols: List[(String, Array[Byte])],
group: GroupMetadata,
callback: JoinCallback) = {
// use the client-id with a random id suffix as the member-id
val memberId = clientId + "-" + group.generateMemberIdSuffix
- val member = new MemberMetadata(memberId, group.groupId, sessionTimeoutMs, protocols)
+ val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, sessionTimeoutMs, protocols)
member.awaitingJoinCallback = callback
group.add(member.memberId, member)
maybePrepareRebalance(group)
@@ -488,11 +557,9 @@ class GroupCoordinator(val brokerId: Int,
private def prepareRebalance(group: GroupMetadata) {
// if any members are awaiting sync, cancel their request and have them rejoin
- if (group.is(AwaitingSync)) {
- groupManager.propagateAssignment(group, Errors.REBALANCE_IN_PROGRESS.code)
- }
+ if (group.is(AwaitingSync))
+ resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS.code)
- group.allMembers.foreach(_.assignment = null)
group.transitionTo(PreparingRebalance)
info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))
@@ -544,7 +611,7 @@ class GroupCoordinator(val brokerId: Int,
info("Stabilized group %s generation %s".format(group.groupId, group.generationId))
// trigger the awaiting join group response callback for all the members after rebalancing
- for (member <- group.allMembers) {
+ for (member <- group.allMemberMetadata) {
assert(member.awaitingJoinCallback != null)
val joinResult = JoinGroupResult(
members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
@@ -595,6 +662,11 @@ class GroupCoordinator(val brokerId: Int,
object GroupCoordinator {
+ val EmptyGroup = GroupSummary(NoState, NoProtocolType, NoProtocol, NoMembers)
+ val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers)
+ val NoMembers = List[MemberSummary]()
+ val NoState = ""
+ val NoProtocolType = ""
val NoProtocol = ""
val NoLeader = ""
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
index 652a3a4..ece9ce0 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
@@ -93,6 +93,20 @@ private object GroupMetadata {
}
/**
+ * Case class used to represent group metadata for the ListGroups API
+ */
+case class GroupOverview(groupId: String,
+ protocolType: String)
+
+/**
+ * Case class used to represent group metadata for the DescribeGroup API
+ */
+case class GroupSummary(state: String,
+ protocolType: String,
+ protocol: String,
+ members: List[MemberSummary])
+
+/**
* Group contains the following metadata:
*
* Membership metadata:
@@ -144,7 +158,9 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList
- def allMembers = members.values.toList
+ def allMembers = members.keySet
+
+ def allMemberMetadata = members.values.toList
def rebalanceTimeout = members.values.foldLeft(0) {(timeout, member) =>
timeout.max(member.sessionTimeoutMs)
@@ -168,7 +184,7 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
val candidates = candidateProtocols
// let each member vote for one of the protocols and choose the one with the most votes
- val votes: List[(String, Int)] = allMembers
+ val votes: List[(String, Int)] = allMemberMetadata
.map(_.vote(candidates))
.groupBy(identity)
.mapValues(_.size)
@@ -179,7 +195,7 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
private def candidateProtocols = {
// get the set of protocols that are commonly supported by all members
- allMembers
+ allMemberMetadata
.map(_.protocols)
.reduceLeft((commonProtocols, protocols) => commonProtocols & protocols)
}
@@ -201,6 +217,20 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
members.map{ case (memberId, memberMetadata) => (memberId, memberMetadata.metadata(protocol))}.toMap
}
+ def summary: GroupSummary = {
+ if (is(Stable)) {
+ val members = this.members.values.map{ member => member.summary(protocol) }.toList
+ GroupSummary(state.toString, protocolType, protocol, members)
+ } else {
+ val members = this.members.values.map{ member => member.summaryNoMetadata() }.toList
+ GroupSummary(state.toString, protocolType, GroupCoordinator.NoProtocol, members)
+ }
+ }
+
+ def overview: GroupOverview = {
+ GroupOverview(groupId, protocolType)
+ }
+
private def assertValidTransition(targetState: GroupState) {
if (!GroupMetadata.validPreviousStates(targetState).contains(state))
throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state"
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 81ed548..0052b5d 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -95,12 +95,16 @@ class GroupMetadataManager(val brokerId: Int,
}
)
+ def currentGroups(): Iterable[GroupMetadata] = groupsCache.values
+
def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
def isGroupLocal(groupId: String): Boolean = loadingPartitions synchronized ownedPartitions.contains(partitionFor(groupId))
def isGroupLoading(groupId: String): Boolean = loadingPartitions synchronized loadingPartitions.contains(partitionFor(groupId))
+ def isLoading(): Boolean = loadingPartitions synchronized !loadingPartitions.isEmpty
+
/**
* Get the group associated with the given groupId, or null if not found
*/
@@ -158,7 +162,8 @@ class GroupMetadataManager(val brokerId: Int,
}
def storeGroup(group: GroupMetadata,
- groupAssignment: Map[String, Array[Byte]]) {
+ groupAssignment: Map[String, Array[Byte]],
+ responseCallback: Short => Unit) {
// construct the message to append
val message = new Message(
key = GroupMetadataManager.groupMetadataKey(group.groupId),
@@ -208,12 +213,7 @@ class GroupMetadataManager(val brokerId: Int,
}
}
- for (member <- group.allMembers) {
- member.assignment = groupAssignment.getOrElse(member.memberId, Array.empty[Byte])
- }
-
- // propagate the assignments
- propagateAssignment(group, responseCode)
+ responseCallback(responseCode)
}
// call replica manager to append the group message
@@ -225,16 +225,7 @@ class GroupMetadataManager(val brokerId: Int,
putCacheCallback)
}
- def propagateAssignment(group: GroupMetadata,
- errorCode: Short) {
- val hasError = errorCode != Errors.NONE.code
- for (member <- group.allMembers) {
- if (member.awaitingSyncCallback != null) {
- member.awaitingSyncCallback(if (hasError) Array.empty else member.assignment, errorCode)
- member.awaitingSyncCallback = null
- }
- }
- }
+
/**
* Store offsets by appending it to the replicated log and then inserting to cache
@@ -657,10 +648,14 @@ object GroupMetadataManager {
private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
private val MEMBER_METADATA_V0 = new Schema(new Field("member_id", STRING),
+ new Field("client_id", STRING),
+ new Field("client_host", STRING),
new Field("session_timeout", INT32),
new Field("subscription", BYTES),
new Field("assignment", BYTES))
private val MEMBER_METADATA_MEMBER_ID_V0 = MEMBER_METADATA_V0.get("member_id")
+ private val MEMBER_METADATA_CLIENT_ID_V0 = MEMBER_METADATA_V0.get("client_id")
+ private val MEMBER_METADATA_CLIENT_HOST_V0 = MEMBER_METADATA_V0.get("client_host")
private val MEMBER_METADATA_SESSION_TIMEOUT_V0 = MEMBER_METADATA_V0.get("session_timeout")
private val MEMBER_METADATA_SUBSCRIPTION_V0 = MEMBER_METADATA_V0.get("subscription")
private val MEMBER_METADATA_ASSIGNMENT_V0 = MEMBER_METADATA_V0.get("assignment")
@@ -787,10 +782,12 @@ object GroupMetadataManager {
value.set(GROUP_METADATA_PROTOCOL_V0, groupMetadata.protocol)
value.set(GROUP_METADATA_LEADER_V0, groupMetadata.leaderId)
- val memberArray = groupMetadata.allMembers.map {
+ val memberArray = groupMetadata.allMemberMetadata.map {
case memberMetadata =>
val memberStruct = value.instance(GROUP_METADATA_MEMBERS_V0)
memberStruct.set(MEMBER_METADATA_MEMBER_ID_V0, memberMetadata.memberId)
+ memberStruct.set(MEMBER_METADATA_CLIENT_ID_V0, memberMetadata.clientId)
+ memberStruct.set(MEMBER_METADATA_CLIENT_HOST_V0, memberMetadata.clientHost)
memberStruct.set(MEMBER_METADATA_SESSION_TIMEOUT_V0, memberMetadata.sessionTimeoutMs)
val metadata = memberMetadata.metadata(groupMetadata.protocol)
@@ -901,10 +898,13 @@ object GroupMetadataManager {
case memberMetadataObj =>
val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
val memberId = memberMetadata.get(MEMBER_METADATA_MEMBER_ID_V0).asInstanceOf[String]
+ val clientId = memberMetadata.get(MEMBER_METADATA_CLIENT_ID_V0).asInstanceOf[String]
+ val clientHost = memberMetadata.get(MEMBER_METADATA_CLIENT_HOST_V0).asInstanceOf[String]
val sessionTimeout = memberMetadata.get(MEMBER_METADATA_SESSION_TIMEOUT_V0).asInstanceOf[Int]
val subscription = Utils.toArray(memberMetadata.get(MEMBER_METADATA_SUBSCRIPTION_V0).asInstanceOf[ByteBuffer])
- val member = new MemberMetadata(memberId, groupId, sessionTimeout, List((group.protocol, subscription)))
+ val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeout,
+ List((group.protocol, subscription)))
member.assignment = Utils.toArray(memberMetadata.get(MEMBER_METADATA_ASSIGNMENT_V0).asInstanceOf[ByteBuffer])
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
index 6a76241..80782c8 100644
--- a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
@@ -23,6 +23,13 @@ import kafka.utils.nonthreadsafe
import scala.collection.Map
+
+case class MemberSummary(memberId: String,
+ clientId: String,
+ clientHost: String,
+ metadata: Array[Byte],
+ assignment: Array[Byte])
+
/**
* Member metadata contains the following metadata:
*
@@ -46,15 +53,14 @@ import scala.collection.Map
@nonthreadsafe
private[coordinator] class MemberMetadata(val memberId: String,
val groupId: String,
+ val clientId: String,
+ val clientHost: String,
val sessionTimeoutMs: Int,
var supportedProtocols: List[(String, Array[Byte])]) {
- // NOTE: we need to add memory barrier to assignment and awaitingSyncCallback
- // since they can be accessed in the append callback thread that does not
- // hold on the group object lock
- @volatile var assignment: Array[Byte] = null
+ var assignment: Array[Byte] = Array.empty[Byte]
var awaitingJoinCallback: JoinGroupResult => Unit = null
- @volatile var awaitingSyncCallback: (Array[Byte], Short) => Unit = null
+ var awaitingSyncCallback: (Array[Byte], Short) => Unit = null
var latestHeartbeat: Long = -1
var isLeaving: Boolean = false
@@ -87,6 +93,14 @@ private[coordinator] class MemberMetadata(val memberId: String,
return true
}
+ def summary(protocol: String): MemberSummary = {
+ MemberSummary(memberId, clientId, clientHost, metadata(protocol), assignment)
+ }
+
+ def summaryNoMetadata(): MemberSummary = {
+ MemberSummary(memberId, clientId, clientHost, Array.empty[Byte], Array.empty[Byte])
+ }
+
/**
* Vote for one of the potential group protocols. This takes into account the protocol preference as
* indicated by the order of supported protocols and returns the first one also contained in the set
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala b/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
new file mode 100644
index 0000000..0e14758
--- /dev/null
+++ b/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import java.nio.ByteBuffer
+import kafka.cluster.BrokerEndPoint
+
+class GroupCoordinatorResponse(private val underlying: kafka.api.GroupCoordinatorResponse) {
+
+ def errorCode = underlying.errorCode
+
+ def coordinator: BrokerEndPoint = {
+ import kafka.javaapi.Implicits._
+ underlying.coordinatorOpt
+ }
+
+ override def equals(other: Any) = canEqual(other) && {
+ val otherConsumerMetadataResponse = other.asInstanceOf[kafka.javaapi.GroupCoordinatorResponse]
+ this.underlying.equals(otherConsumerMetadataResponse.underlying)
+ }
+
+ def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.GroupCoordinatorResponse]
+
+ override def hashCode = underlying.hashCode
+
+ override def toString = underlying.toString
+
+}
+
+object GroupCoordinatorResponse {
+ def readFrom(buffer: ByteBuffer) = new GroupCoordinatorResponse(kafka.api.GroupCoordinatorResponse.readFrom(buffer))
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala
deleted file mode 100644
index b94aa01..0000000
--- a/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.javaapi
-
-import java.nio.ByteBuffer
-import kafka.cluster.BrokerEndPoint
-
-class GroupMetadataResponse(private val underlying: kafka.api.GroupMetadataResponse) {
-
- def errorCode = underlying.errorCode
-
- def coordinator: BrokerEndPoint = {
- import kafka.javaapi.Implicits._
- underlying.coordinatorOpt
- }
-
- override def equals(other: Any) = canEqual(other) && {
- val otherConsumerMetadataResponse = other.asInstanceOf[kafka.javaapi.GroupMetadataResponse]
- this.underlying.equals(otherConsumerMetadataResponse.underlying)
- }
-
- def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.GroupMetadataResponse]
-
- override def hashCode = underlying.hashCode
-
- override def toString = underlying.toString
-
-}
-
-object GroupMetadataResponse {
- def readFrom(buffer: ByteBuffer) = new GroupMetadataResponse(kafka.api.GroupMetadataResponse.readFrom(buffer))
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 9fce77e..9ea4079 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -65,7 +65,8 @@ object RequestChannel extends Logging {
RequestKeys.deserializerForKey(requestId)(buffer)
else
null
- // for client-side request / response format
+ // if we failed to find a server-side mapping, then try using the
+ // client-side request / response format
val header: RequestHeader =
if (requestObj == null) {
buffer.rewind
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 35c5956..0a2e0b9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -18,6 +18,7 @@
package kafka.server
import java.nio.ByteBuffer
+import java.util
import kafka.admin.AdminUtils
import kafka.api._
@@ -31,13 +32,12 @@ import kafka.network.RequestChannel.{Session, Response}
import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write}
import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.kafka.common.requests.{HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse}
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
+import org.apache.kafka.common.requests.{GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse}
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.Node
import scala.collection._
-
-
/**
* Logic to handle the various Kafka requests
*/
@@ -74,11 +74,13 @@ class KafkaApis(val requestChannel: RequestChannel,
case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
- case RequestKeys.GroupMetadataKey => handleGroupMetadataRequest(request)
+ case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)
case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)
case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)
case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request)
case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request)
+ case RequestKeys.DescribeGroupsKey => handleDescribeGroupRequest(request)
+ case RequestKeys.ListGroupsKey => handleListGroupsRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
@@ -676,34 +678,73 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
}
- /*
- * Handle a consumer metadata request
- */
- def handleGroupMetadataRequest(request: RequestChannel.Request) {
- val groupMetadataRequest = request.requestObj.asInstanceOf[GroupMetadataRequest]
+ def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
+ val groupCoordinatorRequest = request.body.asInstanceOf[GroupCoordinatorRequest]
+ val responseHeader = new ResponseHeader(request.header.correlationId)
- if (!authorize(request.session, Read, new Resource(Group, groupMetadataRequest.group))) {
- val response = GroupMetadataResponse(None, ErrorMapping.AuthorizationCode, groupMetadataRequest.correlationId)
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
+ if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) {
+ val responseBody = new GroupCoordinatorResponse(Errors.AUTHORIZATION_FAILED.code, Node.noNode)
+ requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
} else {
- val partition = coordinator.partitionFor(groupMetadataRequest.group)
+ val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
// get metadata (and create the topic if necessary)
val offsetsTopicMetadata = getTopicMetadata(Set(GroupCoordinator.GroupMetadataTopicName), request.securityProtocol).head
+ val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).flatMap {
+ partitionMetadata => partitionMetadata.leader
+ }
- val errorResponse = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, groupMetadataRequest.correlationId)
-
- val response =
- offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).map { partitionMetadata =>
- partitionMetadata.leader.map { leader =>
- GroupMetadataResponse(Some(leader), ErrorMapping.NoError, groupMetadataRequest.correlationId)
- }.getOrElse(errorResponse)
- }.getOrElse(errorResponse)
+ val responseBody = coordinatorEndpoint match {
+ case None =>
+ new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode())
+ case Some(endpoint) =>
+ new GroupCoordinatorResponse(Errors.NONE.code, new Node(endpoint.id, endpoint.host, endpoint.port))
+ }
trace("Sending consumer metadata %s for correlation id %d to client %s."
- .format(response, groupMetadataRequest.correlationId, groupMetadataRequest.clientId))
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
+ .format(responseBody, request.header.correlationId, request.header.clientId))
+ requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+ }
+ }
+
+ def handleDescribeGroupRequest(request: RequestChannel.Request) {
+ import JavaConverters._
+
+ val describeRequest = request.body.asInstanceOf[DescribeGroupsRequest]
+ val responseHeader = new ResponseHeader(request.header.correlationId)
+
+ val groups = describeRequest.groupIds().asScala.map {
+ case groupId =>
+ if (!authorize(request.session, Describe, new Resource(Group, groupId))) {
+ groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.AUTHORIZATION_FAILED)
+ } else {
+ val (error, summary) = coordinator.handleDescribeGroup(groupId)
+ val members = summary.members.map { member =>
+ val metadata = ByteBuffer.wrap(member.metadata)
+ val assignment = ByteBuffer.wrap(member.assignment)
+ new DescribeGroupsResponse.GroupMember(member.memberId, member.clientId, member.clientHost, metadata, assignment)
+ }
+ groupId -> new DescribeGroupsResponse.GroupMetadata(error.code, summary.state, summary.protocolType,
+ summary.protocol, members.asJava)
+ }
+ }.toMap
+
+ val responseBody = new DescribeGroupsResponse(groups.asJava)
+ requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+ }
+
+ def handleListGroupsRequest(request: RequestChannel.Request) {
+ import JavaConverters._
+
+ val responseHeader = new ResponseHeader(request.header.correlationId)
+ val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) {
+ ListGroupsResponse.fromError(Errors.AUTHORIZATION_FAILED)
+ } else {
+ val (error, groups) = coordinator.handleListGroups()
+ val allGroups = groups.map{ group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
+ new ListGroupsResponse(error.code, allGroups.asJava)
}
+ requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
}
def handleJoinGroupRequest(request: RequestChannel.Request) {
@@ -740,6 +781,7 @@ class KafkaApis(val requestChannel: RequestChannel,
joinGroupRequest.groupId(),
joinGroupRequest.memberId(),
request.header.clientId(),
+ request.session.host,
joinGroupRequest.sessionTimeout(),
joinGroupRequest.protocolType(),
protocols,
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
new file mode 100644
index 0000000..97b49dd
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package integration.kafka.api
+
+import kafka.admin.AdminClient
+import kafka.api.IntegrationTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{TestUtils, Logging}
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.TopicPartition
+import org.junit.{Before, Test}
+import org.junit.Assert._
+import scala.collection.JavaConversions._
+
+class AdminClientTest extends IntegrationTestHarness with Logging {
+
+ val producerCount = 1
+ val consumerCount = 2
+ val serverCount = 3
+ val groupId = "my-test"
+ val clientId = "consumer-498"
+
+ val topic = "topic"
+ val part = 0
+ val tp = new TopicPartition(topic, part)
+ val part2 = 1
+ val tp2 = new TopicPartition(topic, part2)
+
+ var client: AdminClient = null
+
+ // configure the servers and clients
+ this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
+ this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
+ this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+ this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
+ this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
+ this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+ this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
+ this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+ this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
+
+ @Before
+ override def setUp() {
+ super.setUp
+ client = AdminClient.createSimplePlaintext(this.brokerList)
+ TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers)
+ }
+
+ @Test
+ def testListGroups() {
+ consumers(0).subscribe(List(topic))
+ TestUtils.waitUntilTrue(() => {
+ consumers(0).poll(0)
+ !consumers(0).assignment().isEmpty
+ }, "Expected non-empty assignment")
+
+ val groups = client.listAllGroupsFlattened
+ assertFalse(groups.isEmpty)
+ val group = groups(0)
+ assertEquals(groupId, group.groupId)
+ assertEquals("consumer", group.protocolType)
+ }
+
+ @Test
+ def testDescribeGroup() {
+ consumers(0).subscribe(List(topic))
+ TestUtils.waitUntilTrue(() => {
+ consumers(0).poll(0)
+ !consumers(0).assignment().isEmpty
+ }, "Expected non-empty assignment")
+
+ val group= client.describeGroup(groupId)
+ assertEquals("consumer", group.protocolType)
+ assertEquals("range", group.protocol)
+ assertEquals("Stable", group.state)
+ assertFalse(group.members.isEmpty)
+
+ val member = group.members(0)
+ assertEquals(clientId, member.clientId)
+ assertFalse(member.clientHost.isEmpty)
+ assertFalse(member.memberId.isEmpty)
+ }
+
+ @Test
+ def testDescribeConsumerGroup() {
+ consumers(0).subscribe(List(topic))
+ TestUtils.waitUntilTrue(() => {
+ consumers(0).poll(0)
+ !consumers(0).assignment().isEmpty
+ }, "Expected non-empty assignment")
+
+ val assignment = client.describeConsumerGroup(groupId)
+ assertEquals(1, assignment.size)
+ for (partitions <- assignment.values)
+ assertEquals(Set(tp, tp2), partitions.toSet)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 3d484b8..e363e27 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -85,7 +85,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
RequestKeys.OffsetsKey -> classOf[ListOffsetResponse],
RequestKeys.OffsetCommitKey -> classOf[OffsetCommitResponse],
RequestKeys.OffsetFetchKey -> classOf[OffsetFetchResponse],
- RequestKeys.GroupMetadataKey -> classOf[GroupMetadataResponse],
+ RequestKeys.GroupCoordinatorKey -> classOf[GroupCoordinatorResponse],
RequestKeys.UpdateMetadataKey -> classOf[UpdateMetadataResponse],
RequestKeys.JoinGroupKey -> classOf[JoinGroupResponse],
RequestKeys.SyncGroupKey -> classOf[SyncGroupResponse],
@@ -103,7 +103,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
RequestKeys.OffsetsKey -> ((resp: ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
RequestKeys.OffsetCommitKey -> ((resp: OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2),
RequestKeys.OffsetFetchKey -> ((resp: OffsetFetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
- RequestKeys.GroupMetadataKey -> ((resp: GroupMetadataResponse) => resp.errorCode()),
+ RequestKeys.GroupCoordinatorKey -> ((resp: GroupCoordinatorResponse) => resp.errorCode()),
RequestKeys.UpdateMetadataKey -> ((resp: UpdateMetadataResponse) => resp.errorCode()),
RequestKeys.JoinGroupKey -> ((resp: JoinGroupResponse) => resp.errorCode()),
RequestKeys.SyncGroupKey -> ((resp: SyncGroupResponse) => resp.errorCode()),
@@ -121,7 +121,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
RequestKeys.OffsetsKey -> TopicDescribeAcl,
RequestKeys.OffsetCommitKey -> (TopicReadAcl ++ GroupReadAcl),
RequestKeys.OffsetFetchKey -> (TopicReadAcl ++ GroupReadAcl),
- RequestKeys.GroupMetadataKey -> (TopicReadAcl ++ GroupReadAcl),
+ RequestKeys.GroupCoordinatorKey -> (TopicReadAcl ++ GroupReadAcl),
RequestKeys.UpdateMetadataKey -> ClusterAcl,
RequestKeys.JoinGroupKey -> GroupReadAcl,
RequestKeys.SyncGroupKey -> GroupReadAcl,
@@ -174,7 +174,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
RequestKeys.FetchKey -> new FetchRequest(5000, 100, Map(tp -> new PartitionData(0, 100)).asJava),
RequestKeys.OffsetsKey -> new ListOffsetRequest(Map(tp -> new ListOffsetRequest.PartitionData(0, 100)).asJava),
RequestKeys.OffsetFetchKey -> new OffsetFetchRequest(group, List(tp).asJava),
- RequestKeys.GroupMetadataKey -> new GroupMetadataRequest(group),
+ RequestKeys.GroupCoordinatorKey -> new GroupCoordinatorRequest(group),
RequestKeys.UpdateMetadataKey -> new UpdateMetadataRequest(brokerId, Int.MaxValue,
Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
Set(new UpdateMetadataRequest.Broker(brokerId, Map(SecurityProtocol.PLAINTEXT -> new UpdateMetadataRequest.EndPoint("localhost", 0)).asJava)).asJava),
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index a77979a..86e6877 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -135,8 +135,8 @@ object TestOffsetManager {
val id = random.nextInt().abs % numGroups
val group = "group-" + id
try {
- metadataChannel.send(GroupMetadataRequest(group))
- val coordinatorId = GroupMetadataResponse.readFrom(metadataChannel.receive().payload()).coordinatorOpt.map(_.id).getOrElse(-1)
+ metadataChannel.send(GroupCoordinatorRequest(group))
+ val coordinatorId = GroupCoordinatorResponse.readFrom(metadataChannel.receive().payload()).coordinatorOpt.map(_.id).getOrElse(-1)
val channel = if (channels.contains(coordinatorId))
channels(coordinatorId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 09e9ce3..90f629a 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -231,12 +231,12 @@ object SerializationTestUtils {
))
}
- def createConsumerMetadataRequest: GroupMetadataRequest = {
- GroupMetadataRequest("group 1", clientId = "client 1")
+ def createConsumerMetadataRequest: GroupCoordinatorRequest = {
+ GroupCoordinatorRequest("group 1", clientId = "client 1")
}
- def createConsumerMetadataResponse: GroupMetadataResponse = {
- GroupMetadataResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), ErrorMapping.NoError, 0)
+ def createConsumerMetadataResponse: GroupCoordinatorResponse = {
+ GroupCoordinatorResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), ErrorMapping.NoError, 0)
}
def createUpdateMetadataRequest(versionId: Short): UpdateMetadataRequest = {
@@ -276,7 +276,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse
private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest
private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse
- private val consumerMetadataResponseNoCoordinator = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0)
+ private val consumerMetadataResponseNoCoordinator = GroupCoordinatorResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0)
private val updateMetadataRequestV0 = SerializationTestUtils.createUpdateMetadataRequest(0)
private val updateMetadataRequestV1 = SerializationTestUtils.createUpdateMetadataRequest(1)
private val updateMetdataResponse = SerializationTestUtils.createUpdateMetadataResponse
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 5e6bd03..c1278e4 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -51,10 +51,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
type LeaveGroupCallbackParams = Short
type LeaveGroupCallback = Short => Unit
+ val ClientId = "consumer-test"
+ val ClientHost = "localhost"
val ConsumerMinSessionTimeout = 10
val ConsumerMaxSessionTimeout = 1000
val DefaultSessionTimeout = 500
- var consumerCoordinator: GroupCoordinator = null
+ var groupCoordinator: GroupCoordinator = null
var replicaManager: ReplicaManager = null
var scheduler: KafkaScheduler = null
var zkUtils: ZkUtils = null
@@ -85,26 +87,25 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(GroupCoordinator.GroupMetadataTopicName))).andReturn(ret)
EasyMock.replay(zkUtils)
- consumerCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), zkUtils, replicaManager, new MockScheduler(new MockTime()))
- consumerCoordinator.startup()
+ groupCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), zkUtils, replicaManager, new MockScheduler(new MockTime()))
+ groupCoordinator.startup()
// add the partition into the owned partition list
- groupPartitionId = consumerCoordinator.partitionFor(groupId)
- consumerCoordinator.groupManager.addPartitionOwnership(groupPartitionId)
+ groupPartitionId = groupCoordinator.partitionFor(groupId)
+ groupCoordinator.groupManager.addPartitionOwnership(groupPartitionId)
}
@After
def tearDown() {
EasyMock.reset(replicaManager)
- consumerCoordinator.shutdown()
+ groupCoordinator.shutdown()
}
@Test
def testJoinGroupWrongCoordinator() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(otherGroupId, memberId, DefaultSessionTimeout, protocolType,
- protocols)
+ val joinGroupResult = joinGroup(otherGroupId, memberId, DefaultSessionTimeout, protocolType, protocols)
val joinGroupErrorCode = joinGroupResult.errorCode
assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, joinGroupErrorCode)
}
@@ -139,8 +140,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
val groupId = ""
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType,
- protocols)
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
assertEquals(Errors.INVALID_GROUP_ID.code, joinGroupResult.errorCode)
}
@@ -164,8 +164,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
EasyMock.reset(replicaManager)
- val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, "copycat",
- protocols)
+ val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, "copycat", protocols)
assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code, otherJoinGroupResult.errorCode)
}
@@ -285,6 +284,27 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
}
@Test
+ def testSyncGroupEmptyAssignment() {
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+ val assignedConsumerId = joinGroupResult.memberId
+ val generationId = joinGroupResult.generationId
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ EasyMock.reset(replicaManager)
+ val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map())
+ val syncGroupErrorCode = syncGroupResult._2
+ assertEquals(Errors.NONE.code, syncGroupErrorCode)
+ assertTrue(syncGroupResult._1.isEmpty)
+
+ EasyMock.reset(replicaManager)
+ val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
+ assertEquals(Errors.NONE.code, heartbeatResult)
+ }
+
+ @Test
def testSyncGroupNotCoordinator() {
val generation = 1
@@ -668,6 +688,92 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
assertEquals(Errors.NONE.code, leaveGroupResult)
}
+ @Test
+ def testListGroupsIncludesStableGroups() {
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+ val assignedMemberId = joinGroupResult.memberId
+ val generationId = joinGroupResult.generationId
+ assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+
+ EasyMock.reset(replicaManager)
+ val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
+ val syncGroupErrorCode = syncGroupResult._2
+ assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+ val (error, groups) = groupCoordinator.handleListGroups()
+ assertEquals(Errors.NONE, error)
+ assertEquals(1, groups.size)
+ assertEquals(GroupOverview("groupId", "consumer"), groups(0))
+ }
+
+ @Test
+ def testListGroupsIncludesRebalancingGroups() {
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+ assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+
+ val (error, groups) = groupCoordinator.handleListGroups()
+ assertEquals(Errors.NONE, error)
+ assertEquals(1, groups.size)
+ assertEquals(GroupOverview("groupId", "consumer"), groups(0))
+ }
+
+ @Test
+ def testDescribeGroupWrongCoordinator() {
+ EasyMock.reset(replicaManager)
+ val (error, _) = groupCoordinator.handleDescribeGroup(otherGroupId)
+ assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, error)
+ }
+
+ @Test
+ def testDescribeGroupInactiveGroup() {
+ EasyMock.reset(replicaManager)
+ val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+ assertEquals(Errors.NONE, error)
+ assertEquals(GroupCoordinator.DeadGroup, summary)
+ }
+
+ @Test
+ def testDescribeGroupStable() {
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+ val assignedMemberId = joinGroupResult.memberId
+ val generationId = joinGroupResult.generationId
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ EasyMock.reset(replicaManager)
+ val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
+ val syncGroupErrorCode = syncGroupResult._2
+ assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+ EasyMock.reset(replicaManager)
+ val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+ assertEquals(Errors.NONE, error)
+ assertEquals(protocolType, summary.protocolType)
+ assertEquals("range", summary.protocol)
+ assertEquals(List(assignedMemberId), summary.members.map(_.memberId))
+ }
+
+ @Test
+ def testDescribeGroupRebalancing() {
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ EasyMock.reset(replicaManager)
+ val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+ assertEquals(Errors.NONE, error)
+ assertEquals(protocolType, summary.protocolType)
+ assertEquals(GroupCoordinator.NoProtocol, summary.protocol)
+ assertEquals(AwaitingSync.toString, summary.state)
+ assertTrue(summary.members.map(_.memberId).contains(joinGroupResult.memberId))
+ assertTrue(summary.members.forall(_.metadata.isEmpty))
+ assertTrue(summary.members.forall(_.assignment.isEmpty))
+ }
+
private def setupJoinGroupCallback: (Future[JoinGroupResult], JoinGroupCallback) = {
val responsePromise = Promise[JoinGroupResult]
val responseFuture = responsePromise.future
@@ -706,7 +812,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.replay(replicaManager)
- consumerCoordinator.handleJoinGroup(groupId, memberId, "clientId", sessionTimeout, protocolType, protocols, responseCallback)
+ groupCoordinator.handleJoinGroup(groupId, memberId, "clientId", "clientHost", sessionTimeout,
+ protocolType, protocols, responseCallback)
responseFuture
}
@@ -731,7 +838,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
)})
EasyMock.replay(replicaManager)
- consumerCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback)
+ groupCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback)
responseFuture
}
@@ -742,7 +849,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.replay(replicaManager)
- consumerCoordinator.handleSyncGroup(groupId, generation, memberId, Map.empty[String, Array[Byte]], responseCallback)
+ groupCoordinator.handleSyncGroup(groupId, generation, memberId, Map.empty[String, Array[Byte]], responseCallback)
responseFuture
}
@@ -779,7 +886,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.replay(replicaManager)
- consumerCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback)
+ groupCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback)
Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
}
@@ -807,7 +914,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
)})
EasyMock.replay(replicaManager)
- consumerCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
+ groupCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
}
@@ -817,7 +924,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.expect(replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId)).andReturn(None)
EasyMock.replay(replicaManager)
- consumerCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
+ groupCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
}
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
index 021aea6..2846622 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
@@ -146,18 +146,19 @@ class GroupMetadataTest extends JUnitSuite {
@Test
def testSelectProtocol() {
val groupId = "groupId"
-
+ val clientId = "clientId"
+ val clientHost = "clientHost"
val sessionTimeoutMs = 10000
val memberId = "memberId"
- val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs,
+ val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
group.add(memberId, member)
assertEquals("range", group.selectProtocol)
val otherMemberId = "otherMemberId"
- val otherMember = new MemberMetadata(otherMemberId, groupId, sessionTimeoutMs,
+ val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs,
List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
group.add(otherMemberId, otherMember)
@@ -165,7 +166,7 @@ class GroupMetadataTest extends JUnitSuite {
assertTrue(Set("range", "roundrobin")(group.selectProtocol))
val lastMemberId = "lastMemberId"
- val lastMember = new MemberMetadata(lastMemberId, groupId, sessionTimeoutMs,
+ val lastMember = new MemberMetadata(lastMemberId, groupId, clientId, clientHost, sessionTimeoutMs,
List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
group.add(lastMemberId, lastMember)
@@ -182,15 +183,16 @@ class GroupMetadataTest extends JUnitSuite {
@Test
def testSelectProtocolChoosesCompatibleProtocol() {
val groupId = "groupId"
-
+ val clientId = "clientId"
+ val clientHost = "clientHost"
val sessionTimeoutMs = 10000
val memberId = "memberId"
- val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs,
+ val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
val otherMemberId = "otherMemberId"
- val otherMember = new MemberMetadata(otherMemberId, groupId, sessionTimeoutMs,
+ val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs,
List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
group.add(memberId, member)
@@ -201,14 +203,15 @@ class GroupMetadataTest extends JUnitSuite {
@Test
def testSupportsProtocols() {
val groupId = "groupId"
-
+ val clientId = "clientId"
+ val clientHost = "clientHost"
val sessionTimeoutMs = 10000
// by default, the group supports everything
assertTrue(group.supportsProtocols(Set("roundrobin", "range")))
val memberId = "memberId"
- val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs,
+ val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
group.add(memberId, member)
@@ -217,7 +220,7 @@ class GroupMetadataTest extends JUnitSuite {
assertFalse(group.supportsProtocols(Set("foo", "bar")))
val otherMemberId = "otherMemberId"
- val otherMember = new MemberMetadata(otherMemberId, groupId, sessionTimeoutMs,
+ val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs,
List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
group.add(otherMemberId, otherMember)