You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/18 05:51:22 UTC
[1/3] kafka git commit: KAFKA-3267;
Describe and Alter Configs Admin APIs (KIP-133)
Repository: kafka
Updated Branches:
refs/heads/trunk e1abf1770 -> 972b75453
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 9eb1275..a362577 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
-import org.apache.kafka.common.requests._
+import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
import CreateTopicsRequest.TopicDetails
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{Node, TopicPartition, requests}
@@ -40,6 +40,7 @@ import scala.collection.mutable
import scala.collection.mutable.Buffer
import org.apache.kafka.common.KafkaException
import kafka.admin.AdminUtils
+import kafka.log.LogConfig
import kafka.network.SocketServer
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord}
@@ -65,13 +66,15 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val deleteTopicResource = new Resource(Topic, deleteTopic)
val producerTransactionalIdResource = new Resource(ProducerTransactionalId, transactionalId)
- val GroupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
- val ClusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)))
- val ClusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)))
- val TopicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
- val TopicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
- val TopicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
- val TopicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)))
+ val groupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
+ val clusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)))
+ val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)))
+ val topicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
+ val topicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
+ val topicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
+ val topicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)))
+ val topicDescribeConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, DescribeConfigs)))
+ val topicAlterConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, AlterConfigs)))
val producerTransactionalIdWriteAcl = Map(producerTransactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
@@ -92,7 +95,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
}
- val RequestKeyToResponseDeserializer: Map[ApiKeys, Class[_ <: Any]] =
+ val requestKeyToResponseDeserializer: Map[ApiKeys, Class[_ <: Any]] =
Map(ApiKeys.METADATA -> classOf[requests.MetadataResponse],
ApiKeys.PRODUCE -> classOf[requests.ProduceResponse],
ApiKeys.FETCH -> classOf[requests.FetchResponse],
@@ -110,15 +113,17 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.CONTROLLED_SHUTDOWN_KEY -> classOf[requests.ControlledShutdownResponse],
ApiKeys.CREATE_TOPICS -> classOf[CreateTopicsResponse],
ApiKeys.DELETE_TOPICS -> classOf[requests.DeleteTopicsResponse],
- ApiKeys.OFFSET_FOR_LEADER_EPOCH -> classOf[OffsetsForLeaderEpochResponse]
+ ApiKeys.OFFSET_FOR_LEADER_EPOCH -> classOf[OffsetsForLeaderEpochResponse],
+ ApiKeys.DESCRIBE_CONFIGS -> classOf[DescribeConfigsResponse],
+ ApiKeys.ALTER_CONFIGS -> classOf[AlterConfigsResponse]
)
- val RequestKeyToError = Map[ApiKeys, (Nothing) => Errors](
- ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
- ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses().asScala.find(_._1 == tp).get._2.error),
- ApiKeys.FETCH -> ((resp: requests.FetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.error),
- ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.error),
- ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2),
+ val requestKeyToError = Map[ApiKeys, Nothing => Errors](
+ ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
+ ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses.asScala.find(_._1 == tp).get._2.error),
+ ApiKeys.FETCH -> ((resp: requests.FetchResponse) => resp.responseData.asScala.find(_._1 == tp).get._2.error),
+ ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => resp.responseData.asScala.find(_._1 == tp).get._2.error),
+ ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => resp.responseData.asScala.find(_._1 == tp).get._2),
ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.error),
ApiKeys.FIND_COORDINATOR -> ((resp: FindCoordinatorResponse) => resp.error),
ApiKeys.UPDATE_METADATA_KEY -> ((resp: requests.UpdateMetadataResponse) => resp.error),
@@ -126,33 +131,39 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => resp.error),
ApiKeys.HEARTBEAT -> ((resp: HeartbeatResponse) => resp.error),
ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error),
- ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
- ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
+ ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses.asScala.find(_._1 == tp).get._2),
+ ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses.asScala.find(_._1 == tp).get._2),
ApiKeys.CONTROLLED_SHUTDOWN_KEY -> ((resp: requests.ControlledShutdownResponse) => resp.error),
- ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => resp.errors().asScala.find(_._1 == createTopic).get._2.error),
- ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors().asScala.find(_._1 == deleteTopic).get._2),
- ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ((resp: OffsetsForLeaderEpochResponse) => resp.responses.asScala.get(tp).get.error())
+ ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => resp.errors.asScala.find(_._1 == createTopic).get._2.error),
+ ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors.asScala.find(_._1 == deleteTopic).get._2),
+ ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ((resp: OffsetsForLeaderEpochResponse) => resp.responses.get(tp).error),
+ ApiKeys.DESCRIBE_CONFIGS -> ((resp: DescribeConfigsResponse) =>
+ resp.configs.get(new RResource(RResourceType.TOPIC, tp.topic)).error.error),
+ ApiKeys.ALTER_CONFIGS -> ((resp: AlterConfigsResponse) =>
+ resp.errors.get(new RResource(RResourceType.TOPIC, tp.topic)).error)
)
- val RequestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
- ApiKeys.METADATA -> TopicDescribeAcl,
- ApiKeys.PRODUCE -> TopicWriteAcl,
- ApiKeys.FETCH -> TopicReadAcl,
- ApiKeys.LIST_OFFSETS -> TopicDescribeAcl,
- ApiKeys.OFFSET_COMMIT -> (TopicReadAcl ++ GroupReadAcl),
- ApiKeys.OFFSET_FETCH -> (TopicReadAcl ++ GroupReadAcl),
- ApiKeys.FIND_COORDINATOR -> (TopicReadAcl ++ GroupReadAcl),
- ApiKeys.UPDATE_METADATA_KEY -> ClusterAcl,
- ApiKeys.JOIN_GROUP -> GroupReadAcl,
- ApiKeys.SYNC_GROUP -> GroupReadAcl,
- ApiKeys.HEARTBEAT -> GroupReadAcl,
- ApiKeys.LEAVE_GROUP -> GroupReadAcl,
- ApiKeys.LEADER_AND_ISR -> ClusterAcl,
- ApiKeys.STOP_REPLICA -> ClusterAcl,
- ApiKeys.CONTROLLED_SHUTDOWN_KEY -> ClusterAcl,
- ApiKeys.CREATE_TOPICS -> ClusterCreateAcl,
- ApiKeys.DELETE_TOPICS -> TopicDeleteAcl,
- ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ClusterAcl
+ val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
+ ApiKeys.METADATA -> topicDescribeAcl,
+ ApiKeys.PRODUCE -> topicWriteAcl,
+ ApiKeys.FETCH -> topicReadAcl,
+ ApiKeys.LIST_OFFSETS -> topicDescribeAcl,
+ ApiKeys.OFFSET_COMMIT -> (topicReadAcl ++ groupReadAcl),
+ ApiKeys.OFFSET_FETCH -> (topicReadAcl ++ groupReadAcl),
+ ApiKeys.FIND_COORDINATOR -> (topicReadAcl ++ groupReadAcl),
+ ApiKeys.UPDATE_METADATA_KEY -> clusterAcl,
+ ApiKeys.JOIN_GROUP -> groupReadAcl,
+ ApiKeys.SYNC_GROUP -> groupReadAcl,
+ ApiKeys.HEARTBEAT -> groupReadAcl,
+ ApiKeys.LEAVE_GROUP -> groupReadAcl,
+ ApiKeys.LEADER_AND_ISR -> clusterAcl,
+ ApiKeys.STOP_REPLICA -> clusterAcl,
+ ApiKeys.CONTROLLED_SHUTDOWN_KEY -> clusterAcl,
+ ApiKeys.CREATE_TOPICS -> clusterCreateAcl,
+ ApiKeys.DELETE_TOPICS -> topicDeleteAcl,
+ ApiKeys.OFFSET_FOR_LEADER_EPOCH -> clusterAcl,
+ ApiKeys.DESCRIBE_CONFIGS -> topicDescribeConfigsAcl,
+ ApiKeys.ALTER_CONFIGS -> topicAlterConfigsAcl
)
@Before
@@ -221,8 +232,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
private def offsetsForLeaderEpochRequest = {
- new OffsetsForLeaderEpochRequest.Builder().add(tp, 7).build()
-}
+ new OffsetsForLeaderEpochRequest.Builder().add(tp, 7).build()
+ }
private def createOffsetFetchRequest = {
new requests.OffsetFetchRequest.Builder(group, List(tp).asJava).build()
@@ -289,6 +300,17 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
new DeleteTopicsRequest.Builder(Set(deleteTopic).asJava, 5000).build()
}
+ private def createDescribeConfigsRequest =
+ new DescribeConfigsRequest.Builder(Collections.singleton(new RResource(RResourceType.TOPIC, tp.topic))).build()
+
+ private def createAlterConfigsRequest =
+ new AlterConfigsRequest.Builder(
+ Collections.singletonMap(new RResource(RResourceType.TOPIC, tp.topic),
+ new AlterConfigsRequest.Config(Collections.singleton(
+ new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000")
+ ))), true).build()
+
+
@Test
def testAuthorizationWithTopicExisting() {
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
@@ -309,17 +331,19 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.CONTROLLED_SHUTDOWN_KEY -> createControlledShutdownRequest,
ApiKeys.CREATE_TOPICS -> createTopicsRequest,
ApiKeys.DELETE_TOPICS -> deleteTopicsRequest,
- ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest
+ ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest,
+ ApiKeys.DESCRIBE_CONFIGS -> createDescribeConfigsRequest,
+ ApiKeys.ALTER_CONFIGS -> createAlterConfigsRequest
)
for ((key, request) <- requestKeyToRequest) {
removeAllAcls
- val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet
+ val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false)
- val resourceToAcls = RequestKeysToAcls(key)
+ val resourceToAcls = requestKeysToAcls(key)
resourceToAcls.get(topicResource).map { acls =>
- val describeAcls = TopicDescribeAcl(topicResource)
+ val describeAcls = topicDescribeAcl(topicResource)
val isAuthorized = describeAcls == acls
addAndVerifyAcls(describeAcls, topicResource)
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true)
@@ -353,12 +377,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
for ((key, request) <- requestKeyToRequest) {
removeAllAcls
- val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet
+ val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false, topicExists = false)
- val resourceToAcls = RequestKeysToAcls(key)
+ val resourceToAcls = requestKeysToAcls(key)
resourceToAcls.get(topicResource).map { acls =>
- val describeAcls = TopicDescribeAcl(topicResource)
+ val describeAcls = topicDescribeAcl(topicResource)
val isAuthorized = describeAcls == acls
addAndVerifyAcls(describeAcls, topicResource)
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true, topicExists = false)
@@ -427,7 +451,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
sendRecords(numRecords, topicPartition)
}
- @Test(expected = classOf[AuthorizationException])
+ @Test(expected = classOf[GroupAuthorizationException])
def testConsumeWithNoAccess(): Unit = {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
@@ -666,8 +690,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val topicPartition = new TopicPartition(newTopic, 0)
val newTopicResource = new Resource(Topic, newTopic)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), newTopicResource)
- addAndVerifyAcls(GroupReadAcl(groupResource), groupResource)
- addAndVerifyAcls(ClusterAcl(Resource.ClusterResource), Resource.ClusterResource)
+ addAndVerifyAcls(groupReadAcl(groupResource), groupResource)
+ addAndVerifyAcls(clusterAcl(Resource.ClusterResource), Resource.ClusterResource)
try {
this.consumers.head.assign(List(topicPartition).asJava)
consumeRecords(this.consumers.head)
@@ -933,9 +957,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
isAuthorizedTopicDescribe: Boolean,
topicExists: Boolean = true): AbstractResponse = {
val resp = connectAndSend(request, apiKey)
- val response = RequestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer], classOf[Short]).invoke(
+ val response = requestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer], classOf[Short]).invoke(
null, resp, request.version: java.lang.Short).asInstanceOf[AbstractResponse]
- val error = RequestKeyToError(apiKey).asInstanceOf[(AbstractResponse) => Errors](response)
+ val error = requestKeyToError(apiKey).asInstanceOf[(AbstractResponse) => Errors](response)
val authorizationErrors = resources.flatMap { resourceType =>
if (resourceType == Topic) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
index f381b15..81f5c27 100644
--- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
@@ -22,12 +22,13 @@ import java.util.concurrent.ExecutionException
import org.apache.kafka.common.utils.Utils
import kafka.integration.KafkaServerTestHarness
-import kafka.server.KafkaConfig
+import kafka.log.LogConfig
+import kafka.server.{Defaults, KafkaConfig}
import org.apache.kafka.clients.admin._
import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.KafkaFuture
-import org.apache.kafka.common.errors.{SecurityDisabledException, TopicExistsException}
+import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TopicExistsException}
import org.apache.kafka.common.protocol.ApiKeys
import org.junit.{After, Before, Rule, Test}
import org.apache.kafka.common.requests.MetadataResponse
@@ -104,13 +105,12 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin
def testListNodes(): Unit = {
client = AdminClient.create(createConfig())
val brokerStrs = brokerList.split(",").toList.sorted
- var nodeStrs : List[String] = null
+ var nodeStrs: List[String] = null
do {
- var nodes = client.describeCluster().nodes().get().asScala
+ val nodes = client.describeCluster().nodes().get().asScala
nodeStrs = nodes.map ( node => s"${node.host}:${node.port}" ).toList.sorted
} while (nodeStrs.size < brokerStrs.size)
assertEquals(brokerStrs.mkString(","), nodeStrs.mkString(","))
- client.close()
}
@Test
@@ -153,7 +153,213 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin
assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
}
- client.close()
+ }
+
+ @Test
+ def testDescribeAndAlterConfigs(): Unit = {
+ client = AdminClient.create(createConfig)
+
+ // Create topics
+ val topic1 = "describe-alter-configs-topic-1"
+ val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
+ val topicConfig1 = new Properties
+ topicConfig1.setProperty(LogConfig.MaxMessageBytesProp, "500000")
+ topicConfig1.setProperty(LogConfig.RetentionMsProp, "60000000")
+ TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, topicConfig1)
+
+ val topic2 = "describe-alter-configs-topic-2"
+ val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
+ TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties)
+
+ // Describe topics and broker
+ val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER, servers(1).config.brokerId.toString)
+ val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER, servers(2).config.brokerId.toString)
+ val configResources = Seq(topicResource1, topicResource2, brokerResource1, brokerResource2)
+ var describeResult = client.describeConfigs(configResources.asJava)
+ var configs = describeResult.all.get
+
+ assertEquals(4, configs.size)
+
+ val maxMessageBytes1 = configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp)
+ assertEquals(LogConfig.MaxMessageBytesProp, maxMessageBytes1.name)
+ assertEquals(topicConfig1.get(LogConfig.MaxMessageBytesProp), maxMessageBytes1.value)
+ assertFalse(maxMessageBytes1.isDefault)
+ assertFalse(maxMessageBytes1.isSensitive)
+ assertFalse(maxMessageBytes1.isReadOnly)
+
+ assertEquals(topicConfig1.get(LogConfig.RetentionMsProp),
+ configs.get(topicResource1).get(LogConfig.RetentionMsProp).value)
+
+ val maxMessageBytes2 = configs.get(topicResource2).get(LogConfig.MaxMessageBytesProp)
+ assertEquals(Defaults.MessageMaxBytes.toString, maxMessageBytes2.value)
+ assertEquals(LogConfig.MaxMessageBytesProp, maxMessageBytes2.name)
+ assertTrue(maxMessageBytes2.isDefault)
+ assertFalse(maxMessageBytes2.isSensitive)
+ assertFalse(maxMessageBytes2.isReadOnly)
+
+ assertEquals(servers(1).config.values.size, configs.get(brokerResource1).entries.size)
+ assertEquals(servers(1).config.brokerId.toString, configs.get(brokerResource1).get(KafkaConfig.BrokerIdProp).value)
+ val listenerSecurityProtocolMap = configs.get(brokerResource1).get(KafkaConfig.ListenerSecurityProtocolMapProp)
+ assertEquals(servers(1).config.getString(KafkaConfig.ListenerSecurityProtocolMapProp), listenerSecurityProtocolMap.value)
+ assertEquals(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityProtocolMap.name)
+ assertFalse(listenerSecurityProtocolMap.isDefault)
+ assertFalse(listenerSecurityProtocolMap.isSensitive)
+ assertTrue(listenerSecurityProtocolMap.isReadOnly)
+ val truststorePassword = configs.get(brokerResource1).get(KafkaConfig.SslTruststorePasswordProp)
+ assertEquals(KafkaConfig.SslTruststorePasswordProp, truststorePassword.name)
+ assertNull(truststorePassword.value)
+ assertFalse(truststorePassword.isDefault)
+ assertTrue(truststorePassword.isSensitive)
+ assertTrue(truststorePassword.isReadOnly)
+ val compressionType = configs.get(brokerResource1).get(KafkaConfig.CompressionTypeProp)
+ assertEquals(servers(1).config.compressionType.toString, compressionType.value)
+ assertEquals(KafkaConfig.CompressionTypeProp, compressionType.name)
+ assertTrue(compressionType.isDefault)
+ assertFalse(compressionType.isSensitive)
+ assertTrue(compressionType.isReadOnly)
+
+ assertEquals(servers(2).config.values.size, configs.get(brokerResource2).entries.size)
+ assertEquals(servers(2).config.brokerId.toString, configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value)
+ assertEquals(servers(2).config.logCleanerThreads.toString,
+ configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value)
+
+ // Alter topics
+ var topicConfigEntries1 = Seq(
+ new ConfigEntry(LogConfig.FlushMsProp, "1000")
+ ).asJava
+
+ var topicConfigEntries2 = Seq(
+ new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"),
+ new ConfigEntry(LogConfig.CompressionTypeProp, "lz4")
+ ).asJava
+
+ var alterResult = client.alterConfigs(Map(
+ topicResource1 -> new Config(topicConfigEntries1),
+ topicResource2 -> new Config(topicConfigEntries2)
+ ).asJava)
+
+ assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet)
+ alterResult.all.get
+
+ // Verify that topics were updated correctly
+ describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava)
+ configs = describeResult.all.get
+
+ assertEquals(2, configs.size)
+
+ assertEquals("1000", configs.get(topicResource1).get(LogConfig.FlushMsProp).value)
+ assertEquals(Defaults.MessageMaxBytes.toString,
+ configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp).value)
+ assertEquals((Defaults.LogRetentionHours * 60 * 60 * 1000).toString,
+ configs.get(topicResource1).get(LogConfig.RetentionMsProp).value)
+
+ assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
+ assertEquals("lz4", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
+
+ // Alter topics with validateOnly=true
+ topicConfigEntries1 = Seq(
+ new ConfigEntry(LogConfig.MaxMessageBytesProp, "10")
+ ).asJava
+
+ topicConfigEntries2 = Seq(
+ new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.3")
+ ).asJava
+
+ alterResult = client.alterConfigs(Map(
+ topicResource1 -> new Config(topicConfigEntries1),
+ topicResource2 -> new Config(topicConfigEntries2)
+ ).asJava, new AlterConfigsOptions().validateOnly(true))
+
+ assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet)
+ alterResult.all.get
+
+ // Verify that topics were not updated due to validateOnly = true
+ describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava)
+ configs = describeResult.all.get
+
+ assertEquals(2, configs.size)
+
+ assertEquals(Defaults.MessageMaxBytes.toString,
+ configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp).value)
+ assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
+ }
+
+ @Test
+ def testInvalidAlterConfigs(): Unit = {
+ client = AdminClient.create(createConfig)
+
+ // Create topics
+ val topic1 = "invalid-alter-configs-topic-1"
+ val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
+ TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, new Properties())
+
+ val topic2 = "invalid-alter-configs-topic-2"
+ val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
+ TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties)
+
+ val topicConfigEntries1 = Seq(
+ new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), // this value is invalid as it's above 1.0
+ new ConfigEntry(LogConfig.CompressionTypeProp, "lz4")
+ ).asJava
+
+ var topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "snappy")).asJava
+
+ val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, servers.head.config.brokerId.toString)
+ val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.CompressionTypeProp, "gzip")).asJava
+
+ // Alter configs: first and third are invalid, second is valid
+ var alterResult = client.alterConfigs(Map(
+ topicResource1 -> new Config(topicConfigEntries1),
+ topicResource2 -> new Config(topicConfigEntries2),
+ brokerResource -> new Config(brokerConfigEntries)
+ ).asJava)
+
+ assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet)
+ assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
+ alterResult.results.get(topicResource2).get
+ assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+
+ // Verify that first and third resources were not updated and second was updated
+ var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava)
+ var configs = describeResult.all.get
+ assertEquals(3, configs.size)
+
+ assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
+ configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
+ assertEquals(Defaults.CompressionType.toString,
+ configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value)
+
+ assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
+
+ assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(LogConfig.CompressionTypeProp).value)
+
+ // Alter configs with validateOnly = true: first and third are invalid, second is valid
+ topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip")).asJava
+
+ alterResult = client.alterConfigs(Map(
+ topicResource1 -> new Config(topicConfigEntries1),
+ topicResource2 -> new Config(topicConfigEntries2),
+ brokerResource -> new Config(brokerConfigEntries)
+ ).asJava, new AlterConfigsOptions().validateOnly(true))
+
+ assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet)
+ assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
+ alterResult.results.get(topicResource2).get
+ assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+
+ // Verify that no resources are updated since validate_only = true
+ describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava)
+ configs = describeResult.all.get
+ assertEquals(3, configs.size)
+
+ assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
+ configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
+ assertEquals(Defaults.CompressionType.toString,
+ configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value)
+
+ assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
+
+ assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(LogConfig.CompressionTypeProp).value)
}
val ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
@@ -183,7 +389,11 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin
config.remove(KafkaConfig.InterBrokerSecurityProtocolProp)
config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value)
config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}")
- config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true");
+ config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true")
+ // We set this in order to test that we don't expose sensitive data via describe configs. This will already be
+ // set for subclasses with security enabled and we don't want to overwrite it.
+ if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp))
+ config.setProperty(KafkaConfig.SslTruststorePasswordProp, "some.invalid.pass")
}
cfgs.foreach(_.putAll(serverConfig))
cfgs.map(KafkaConfig.fromProps)
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index 6d89d4f..f5b0a06 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -35,17 +35,22 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
private val TopicResources = Set(new Resource(Topic, "test-1"), new Resource(Topic, "test-2"))
private val GroupResources = Set(new Resource(Group, "testGroup-1"), new Resource(Group, "testGroup-2"))
+ private val BrokerResources = Set(new Resource(Broker, "0"), new Resource(Broker, "1"))
private val ResourceToCommand = Map[Set[Resource], Array[String]](
TopicResources -> Array("--topic", "test-1", "--topic", "test-2"),
Set(Resource.ClusterResource) -> Array("--cluster"),
- GroupResources -> Array("--group", "testGroup-1", "--group", "testGroup-2")
+ GroupResources -> Array("--group", "testGroup-1", "--group", "testGroup-2"),
+ BrokerResources -> Array("--broker", "0", "--broker", "1")
)
private val ResourceToOperations = Map[Set[Resource], (Set[Operation], Array[String])](
- TopicResources -> (Set(Read, Write, Describe, Delete), Array("--operation", "Read" , "--operation", "Write", "--operation", "Describe", "--operation", "Delete")),
+ TopicResources -> (Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs),
+ Array("--operation", "Read" , "--operation", "Write", "--operation", "Describe", "--operation", "Delete",
+ "--operation", "DescribeConfigs", "--operation", "AlterConfigs")),
Set(Resource.ClusterResource) -> (Set(Create, ClusterAction), Array("--operation", "Create", "--operation", "ClusterAction")),
- GroupResources -> (Set(Read, Describe), Array("--operation", "Read", "--operation", "Describe"))
+ GroupResources -> (Set(Read, Describe), Array("--operation", "Read", "--operation", "Describe")),
+ BrokerResources -> (Set(DescribeConfigs), Array("--operation", "DescribeConfigs"))
)
private val ProducerResourceToAcls = Map[Set[Resource], Set[Acl]](
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index b82ddf9..8e6b11d 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -23,7 +23,7 @@ import kafka.network.SocketServer
import kafka.utils.TestUtils
import org.apache.kafka.common.protocol.types.Struct
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{CreateTopicsRequest, CreateTopicsResponse, MetadataRequest, MetadataResponse}
+import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest, CreateTopicsResponse, MetadataRequest, MetadataResponse}
import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue}
import scala.collection.JavaConverters._
@@ -79,8 +79,8 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
}
}
- protected def error(error: Errors, errorMessage: Option[String] = None): CreateTopicsResponse.Error =
- new CreateTopicsResponse.Error(error, errorMessage.orNull)
+ protected def error(error: Errors, errorMessage: Option[String] = None): ApiError =
+ new ApiError(error, errorMessage.orNull)
protected def toStructWithDuplicateFirstTopic(request: CreateTopicsRequest): Struct = {
val struct = request.toStruct
@@ -101,7 +101,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
}
protected def validateErrorCreateTopicsRequests(request: CreateTopicsRequest,
- expectedResponse: Map[String, CreateTopicsResponse.Error],
+ expectedResponse: Map[String, ApiError],
checkErrorMessage: Boolean = true,
requestStruct: Option[Struct] = None): Unit = {
val response = requestStruct.map(sendCreateTopicRequestStruct(_, request.version)).getOrElse(
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 5cb6f71..7e50049 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -20,6 +20,7 @@ import java.util.{Collections, LinkedHashMap, Properties}
import java.util.concurrent.{Executors, Future, TimeUnit}
import kafka.admin.AdminUtils
+import kafka.log.LogConfig
import kafka.network.RequestChannel.Session
import kafka.security.auth._
import kafka.utils.TestUtils
@@ -30,9 +31,8 @@ import org.apache.kafka.common.network.{Authenticator, ListenerName, TransportLa
import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
import org.apache.kafka.common.protocol.types.Struct
import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests
import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
-import org.apache.kafka.common.requests._
+import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
import org.apache.kafka.common.security.auth.{DefaultPrincipalBuilder, KafkaPrincipal}
import org.junit.Assert._
import org.junit.{After, Before, Test}
@@ -277,7 +277,17 @@ class RequestQuotaTest extends BaseRequestTest {
new ResourceFilter(AdminResourceType.TOPIC, null),
new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY))))
- case key =>
+ case ApiKeys.DESCRIBE_CONFIGS =>
+ new DescribeConfigsRequest.Builder(Collections.singleton(new RResource(RResourceType.TOPIC, tp.topic)))
+
+ case ApiKeys.ALTER_CONFIGS =>
+ new AlterConfigsRequest.Builder(
+ Collections.singletonMap(new RResource(RResourceType.TOPIC, tp.topic),
+ new AlterConfigsRequest.Config(Collections.singleton(
+ new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000")
+ ))), true)
+
+ case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)
}
}
@@ -366,6 +376,8 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.DESCRIBE_ACLS => new DescribeAclsResponse(response).throttleTimeMs
case ApiKeys.CREATE_ACLS => new CreateAclsResponse(response).throttleTimeMs
case ApiKeys.DELETE_ACLS => new DeleteAclsResponse(response).throttleTimeMs
+ case ApiKeys.DESCRIBE_CONFIGS => new DescribeConfigsResponse(response).throttleTimeMs
+ case ApiKeys.ALTER_CONFIGS => new AlterConfigsResponse(response).throttleTimeMs
case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 96a56b4..47ca8ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
@@ -182,7 +183,7 @@ public class StreamsKafkaClient {
final CreateTopicsResponse createTopicsResponse = (CreateTopicsResponse) clientResponse.responseBody();
for (InternalTopicConfig internalTopicConfig : topicsMap.keySet()) {
- CreateTopicsResponse.Error error = createTopicsResponse.errors().get(internalTopicConfig.name());
+ ApiError error = createTopicsResponse.errors().get(internalTopicConfig.name());
if (!error.is(Errors.NONE) && !error.is(Errors.TOPIC_ALREADY_EXISTS)) {
throw new StreamsException("Could not create topic: " + internalTopicConfig.name() + " due to " + error.messageWithFallback());
}
[2/3] kafka git commit: KAFKA-3267;
Describe and Alter Configs Admin APIs (KIP-133)
Posted by ij...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
new file mode 100644
index 0000000..26034eb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+/**
+ * Encapsulates an error code (via the Errors enum) and an optional message. Generally, the optional message is only
+ * defined if it adds information over the default message associated with the error code.
+ *
+ * This is an internal class (like every class in the requests package).
+ */
+public class ApiError {
+
+ private static final String CODE_KEY_NAME = "error_code";
+ private static final String MESSAGE_KEY_NAME = "error_message";
+
+ private final Errors error;
+ private final String message;
+
+ public static ApiError fromThrowable(Throwable t) {
+ // Avoid populating the error message if it's a generic one
+ Errors error = Errors.forException(t);
+ String message = error.message().equals(t.getMessage()) ? null : t.getMessage();
+ return new ApiError(error, message);
+ }
+
+ public ApiError(Struct struct) {
+ error = Errors.forCode(struct.getShort(CODE_KEY_NAME));
+ // In some cases, the error message field was introduced in newer version
+ if (struct.hasField(MESSAGE_KEY_NAME))
+ message = struct.getString(MESSAGE_KEY_NAME);
+ else
+ message = null;
+ }
+
+ public ApiError(Errors error, String message) {
+ this.error = error;
+ this.message = message;
+ }
+
+ public void write(Struct struct) {
+ struct.set(CODE_KEY_NAME, error.code());
+ // In some cases, the error message field was introduced in a newer protocol API version
+ if (struct.hasField(MESSAGE_KEY_NAME) && message != null && error != Errors.NONE)
+ struct.set(MESSAGE_KEY_NAME, message);
+ }
+
+ public boolean is(Errors error) {
+ return this.error == error;
+ }
+
+ public Errors error() {
+ return error;
+ }
+
+ /**
+ * Return the optional error message or null. Consider using {@link #messageWithFallback()} instead.
+ */
+ public String message() {
+ return message;
+ }
+
+ /**
+ * If `message` is defined, return it. Otherwise fallback to the default error message associated with the error
+ * code.
+ */
+ public String messageWithFallback() {
+ if (message == null)
+ return error.message();
+ return message;
+ }
+
+ public ApiException exception() {
+ return error.exception(message);
+ }
+
+ @Override
+ public String toString() {
+ return "ApiError(error=" + error + ", message=" + message + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
index a0626cc..def4c85 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
@@ -42,9 +41,9 @@ public class CreateTopicsRequest extends AbstractRequest {
private static final String REPLICA_ASSIGNMENT_PARTITION_ID_KEY_NAME = "partition_id";
private static final String REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME = "replicas";
- private static final String CONFIG_KEY_KEY_NAME = "config_key";
+ private static final String CONFIG_KEY_KEY_NAME = "config_name";
private static final String CONFIG_VALUE_KEY_NAME = "config_value";
- private static final String CONFIGS_KEY_NAME = "configs";
+ private static final String CONFIGS_KEY_NAME = "config_entries";
public static final class TopicDetails {
public final int numPartitions;
@@ -210,12 +209,9 @@ public class CreateTopicsRequest extends AbstractRequest {
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
- Map<String, CreateTopicsResponse.Error> topicErrors = new HashMap<>();
+ Map<String, ApiError> topicErrors = new HashMap<>();
for (String topic : topics.keySet()) {
- Errors error = Errors.forException(e);
- // Avoid populating the error message if it's a generic one
- String message = error.message().equals(e.getMessage()) ? null : e.getMessage();
- topicErrors.put(topic, new CreateTopicsResponse.Error(error, message));
+ topicErrors.put(topic, ApiError.fromThrowable(e));
}
short versionId = version();
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index 2c2b2dd..e46e7a1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -17,9 +17,7 @@
package org.apache.kafka.common.requests;
-import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
@@ -32,45 +30,6 @@ public class CreateTopicsResponse extends AbstractResponse {
private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String TOPIC_ERRORS_KEY_NAME = "topic_errors";
private static final String TOPIC_KEY_NAME = "topic";
- private static final String ERROR_CODE_KEY_NAME = "error_code";
- private static final String ERROR_MESSAGE_KEY_NAME = "error_message";
-
- public static class Error {
- private final Errors error;
- private final String message; // introduced in V1
-
- public Error(Errors error, String message) {
- this.error = error;
- this.message = message;
- }
-
- public boolean is(Errors error) {
- return this.error == error;
- }
-
- public Errors error() {
- return error;
- }
-
- public String message() {
- return message;
- }
-
- public String messageWithFallback() {
- if (message == null)
- return error.message();
- return message;
- }
-
- public ApiException exception() {
- return error.exception(message);
- }
-
- @Override
- public String toString() {
- return "Error(error=" + error + ", message=" + message + ")";
- }
- }
/**
* Possible error codes:
@@ -87,29 +46,25 @@ public class CreateTopicsResponse extends AbstractResponse {
* INVALID_REQUEST(42)
*/
- private final Map<String, Error> errors;
+ private final Map<String, ApiError> errors;
private final int throttleTimeMs;
- public CreateTopicsResponse(Map<String, Error> errors) {
+ public CreateTopicsResponse(Map<String, ApiError> errors) {
this(DEFAULT_THROTTLE_TIME, errors);
}
- public CreateTopicsResponse(int throttleTimeMs, Map<String, Error> errors) {
+ public CreateTopicsResponse(int throttleTimeMs, Map<String, ApiError> errors) {
this.throttleTimeMs = throttleTimeMs;
this.errors = errors;
}
public CreateTopicsResponse(Struct struct) {
Object[] topicErrorStructs = struct.getArray(TOPIC_ERRORS_KEY_NAME);
- Map<String, Error> errors = new HashMap<>();
+ Map<String, ApiError> errors = new HashMap<>();
for (Object topicErrorStructObj : topicErrorStructs) {
- Struct topicErrorCodeStruct = (Struct) topicErrorStructObj;
- String topic = topicErrorCodeStruct.getString(TOPIC_KEY_NAME);
- Errors error = Errors.forCode(topicErrorCodeStruct.getShort(ERROR_CODE_KEY_NAME));
- String errorMessage = null;
- if (topicErrorCodeStruct.hasField(ERROR_MESSAGE_KEY_NAME))
- errorMessage = topicErrorCodeStruct.getString(ERROR_MESSAGE_KEY_NAME);
- errors.put(topic, new Error(error, errorMessage));
+ Struct topicErrorStruct = (Struct) topicErrorStructObj;
+ String topic = topicErrorStruct.getString(TOPIC_KEY_NAME);
+ errors.put(topic, new ApiError(topicErrorStruct));
}
this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
@@ -123,13 +78,10 @@ public class CreateTopicsResponse extends AbstractResponse {
struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
List<Struct> topicErrorsStructs = new ArrayList<>(errors.size());
- for (Map.Entry<String, Error> topicError : errors.entrySet()) {
+ for (Map.Entry<String, ApiError> topicError : errors.entrySet()) {
Struct topicErrorsStruct = struct.instance(TOPIC_ERRORS_KEY_NAME);
topicErrorsStruct.set(TOPIC_KEY_NAME, topicError.getKey());
- Error error = topicError.getValue();
- topicErrorsStruct.set(ERROR_CODE_KEY_NAME, error.error.code());
- if (version >= 1)
- topicErrorsStruct.set(ERROR_MESSAGE_KEY_NAME, error.message());
+ topicError.getValue().write(topicErrorsStruct);
topicErrorsStructs.add(topicErrorsStruct);
}
struct.set(TOPIC_ERRORS_KEY_NAME, topicErrorsStructs.toArray());
@@ -140,7 +92,7 @@ public class CreateTopicsResponse extends AbstractResponse {
return throttleTimeMs;
}
- public Map<String, Error> errors() {
+ public Map<String, ApiError> errors() {
return errors;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
new file mode 100644
index 0000000..64fae0e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DescribeConfigsRequest extends AbstractRequest {
+
+ private static final String RESOURCES_KEY_NAME = "resources";
+ private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
+ private static final String RESOURCE_NAME_KEY_NAME = "resource_name";
+ private static final String CONFIG_NAMES_KEY_NAME = "config_names";
+
+ public static class Builder extends AbstractRequest.Builder {
+ private final Map<Resource, Collection<String>> resourceToConfigNames;
+
+ public Builder(Map<Resource, Collection<String>> resourceToConfigNames) {
+ super(ApiKeys.DESCRIBE_CONFIGS);
+ this.resourceToConfigNames = resourceToConfigNames;
+ }
+
+ public Builder(Collection<Resource> resources) {
+ this(toResourceToConfigNames(resources));
+ }
+
+ private static Map<Resource, Collection<String>> toResourceToConfigNames(Collection<Resource> resources) {
+ Map<Resource, Collection<String>> result = new HashMap<>(resources.size());
+ for (Resource resource : resources)
+ result.put(resource, null);
+ return result;
+ }
+
+ @Override
+ public DescribeConfigsRequest build(short version) {
+ return new DescribeConfigsRequest(version, resourceToConfigNames);
+ }
+ }
+
+ private final Map<Resource, Collection<String>> resourceToConfigNames;
+
+ public DescribeConfigsRequest(short version, Map<Resource, Collection<String>> resourceToConfigNames) {
+ super(version);
+ this.resourceToConfigNames = resourceToConfigNames;
+
+ }
+
+ public DescribeConfigsRequest(Struct struct, short version) {
+ super(version);
+ Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME);
+ resourceToConfigNames = new HashMap<>(resourcesArray.length);
+ for (Object resourceObj : resourcesArray) {
+ Struct resourceStruct = (Struct) resourceObj;
+ ResourceType resourceType = ResourceType.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME));
+ String resourceName = resourceStruct.getString(RESOURCE_NAME_KEY_NAME);
+
+ Object[] configNamesArray = resourceStruct.getArray(CONFIG_NAMES_KEY_NAME);
+ List<String> configNames = null;
+ if (configNamesArray != null) {
+ configNames = new ArrayList<>(configNamesArray.length);
+ for (Object configNameObj : configNamesArray)
+ configNames.add((String) configNameObj);
+ }
+
+ resourceToConfigNames.put(new Resource(resourceType, resourceName), configNames);
+ }
+ }
+
+ public Collection<Resource> resources() {
+ return resourceToConfigNames.keySet();
+ }
+
+ /**
+ * Return null if all config names should be returned.
+ */
+ public Collection<String> configNames(Resource resource) {
+ return resourceToConfigNames.get(resource);
+ }
+
+ @Override
+ protected Struct toStruct() {
+ Struct struct = new Struct(ApiKeys.DESCRIBE_CONFIGS.requestSchema(version()));
+ List<Struct> resourceStructs = new ArrayList<>(resources().size());
+ for (Map.Entry<Resource, Collection<String>> entry : resourceToConfigNames.entrySet()) {
+ Resource resource = entry.getKey();
+ Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME);
+ resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id());
+ resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name());
+
+ String[] configNames = entry.getValue() == null ? null : entry.getValue().toArray(new String[0]);
+ resourceStruct.set(CONFIG_NAMES_KEY_NAME, configNames);
+
+ resourceStructs.add(resourceStruct);
+ }
+ struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0]));
+ return struct;
+ }
+
+ @Override
+ public DescribeConfigsResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ short version = version();
+ switch (version) {
+ case 0:
+ ApiError error = ApiError.fromThrowable(e);
+ Map<Resource, DescribeConfigsResponse.Config> errors = new HashMap<>(resources().size());
+ DescribeConfigsResponse.Config config = new DescribeConfigsResponse.Config(error,
+ Collections.<DescribeConfigsResponse.ConfigEntry>emptyList());
+ for (Resource resource : resources())
+ errors.put(resource, config);
+ return new DescribeConfigsResponse(throttleTimeMs, errors);
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ version, this.getClass().getSimpleName(), ApiKeys.DESCRIBE_CONFIGS.latestVersion()));
+ }
+ }
+
+ public static DescribeConfigsRequest parse(ByteBuffer buffer, short version) {
+ return new DescribeConfigsRequest(ApiKeys.DESCRIBE_CONFIGS.parseRequest(version, buffer), version);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
new file mode 100644
index 0000000..05bf88d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DescribeConfigsResponse extends AbstractResponse {
+
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
+
+ private static final String RESOURCES_KEY_NAME = "resources";
+
+ private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
+ private static final String RESOURCE_NAME_KEY_NAME = "resource_name";
+
+ private static final String CONFIG_ENTRIES_KEY_NAME = "config_entries";
+
+ private static final String CONFIG_NAME = "config_name";
+ private static final String CONFIG_VALUE = "config_value";
+ private static final String IS_SENSITIVE = "is_sensitive";
+ private static final String IS_DEFAULT = "is_default";
+ private static final String READ_ONLY = "read_only";
+
+ public static class Config {
+ private final ApiError error;
+ private final Collection<ConfigEntry> entries;
+
+ public Config(ApiError error, Collection<ConfigEntry> entries) {
+ this.error = error;
+ this.entries = entries;
+ }
+
+ public ApiError error() {
+ return error;
+ }
+
+ public Collection<ConfigEntry> entries() {
+ return entries;
+ }
+ }
+
+ public static class ConfigEntry {
+ private final String name;
+ private final String value;
+ private final boolean isSensitive;
+ private final boolean isDefault;
+ private final boolean readOnly;
+
+ public ConfigEntry(String name, String value, boolean isSensitive, boolean isDefault, boolean readOnly) {
+ this.name = name;
+ this.value = value;
+ this.isSensitive = isSensitive;
+ this.isDefault = isDefault;
+ this.readOnly = readOnly;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public String value() {
+ return value;
+ }
+
+ public boolean isSensitive() {
+ return isSensitive;
+ }
+
+ public boolean isDefault() {
+ return isDefault;
+ }
+
+ public boolean isReadOnly() {
+ return readOnly;
+ }
+ }
+
+ private final int throttleTimeMs;
+ private final Map<Resource, Config> configs;
+
+ public DescribeConfigsResponse(int throttleTimeMs, Map<Resource, Config> configs) {
+ this.throttleTimeMs = throttleTimeMs;
+ this.configs = configs;
+ }
+
+ public DescribeConfigsResponse(Struct struct) {
+ throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+ Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME);
+ configs = new HashMap<>(resourcesArray.length);
+ for (Object resourceObj : resourcesArray) {
+ Struct resourceStruct = (Struct) resourceObj;
+
+ ApiError error = new ApiError(resourceStruct);
+ ResourceType resourceType = ResourceType.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME));
+ String resourceName = resourceStruct.getString(RESOURCE_NAME_KEY_NAME);
+ Resource resource = new Resource(resourceType, resourceName);
+
+ Object[] configEntriesArray = resourceStruct.getArray(CONFIG_ENTRIES_KEY_NAME);
+ List<ConfigEntry> configEntries = new ArrayList<>(configEntriesArray.length);
+ for (Object configEntriesObj: configEntriesArray) {
+ Struct configEntriesStruct = (Struct) configEntriesObj;
+ String configName = configEntriesStruct.getString(CONFIG_NAME);
+ String configValue = configEntriesStruct.getString(CONFIG_VALUE);
+ boolean isSensitive = configEntriesStruct.getBoolean(IS_SENSITIVE);
+ boolean isDefault = configEntriesStruct.getBoolean(IS_DEFAULT);
+ boolean readOnly = configEntriesStruct.getBoolean(READ_ONLY);
+ configEntries.add(new ConfigEntry(configName, configValue, isSensitive, isDefault, readOnly));
+ }
+ Config config = new Config(error, configEntries);
+ configs.put(resource, config);
+ }
+ }
+
+ public Map<Resource, Config> configs() {
+ return configs;
+ }
+
+ public Config config(Resource resource) {
+ return configs.get(resource);
+ }
+
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
+ @Override
+ protected Struct toStruct(short version) {
+ Struct struct = new Struct(ApiKeys.DESCRIBE_CONFIGS.responseSchema(version));
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+ List<Struct> resourceStructs = new ArrayList<>(configs.size());
+ for (Map.Entry<Resource, Config> entry : configs.entrySet()) {
+ Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME);
+
+ Resource resource = entry.getKey();
+ resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id());
+ resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name());
+
+ Config config = entry.getValue();
+ config.error.write(resourceStruct);
+
+ List<Struct> configEntryStructs = new ArrayList<>(config.entries.size());
+ for (ConfigEntry configEntry : config.entries) {
+ Struct configEntriesStruct = resourceStruct.instance(CONFIG_ENTRIES_KEY_NAME);
+ configEntriesStruct.set(CONFIG_NAME, configEntry.name);
+ configEntriesStruct.set(CONFIG_VALUE, configEntry.value);
+ configEntriesStruct.set(IS_SENSITIVE, configEntry.isSensitive);
+ configEntriesStruct.set(IS_DEFAULT, configEntry.isDefault);
+ configEntriesStruct.set(READ_ONLY, configEntry.readOnly);
+ configEntryStructs.add(configEntriesStruct);
+ }
+ resourceStruct.set(CONFIG_ENTRIES_KEY_NAME, configEntryStructs.toArray(new Struct[0]));
+
+ resourceStructs.add(resourceStruct);
+ }
+ struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0]));
+ return struct;
+ }
+
+ public static DescribeConfigsResponse parse(ByteBuffer buffer, short version) {
+ return new DescribeConfigsResponse(ApiKeys.DESCRIBE_CONFIGS.parseResponse(version, buffer));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/Resource.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/Resource.java b/clients/src/main/java/org/apache/kafka/common/requests/Resource.java
new file mode 100644
index 0000000..6a360a5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/Resource.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+public final class Resource {
+ private final ResourceType type;
+ private final String name;
+
+ public Resource(ResourceType type, String name) {
+ this.type = type;
+ this.name = name;
+ }
+
+ public ResourceType type() {
+ return type;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ Resource resource = (Resource) o;
+
+ return type == resource.type && name.equals(resource.name);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = type.hashCode();
+ result = 31 * result + name.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "Resource(type=" + type + ", name='" + name + "'}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/ResourceType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResourceType.java b/clients/src/main/java/org/apache/kafka/common/requests/ResourceType.java
new file mode 100644
index 0000000..2c11772
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ResourceType.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+public enum ResourceType {
+ UNKNOWN((byte) 0), ANY((byte) 1), TOPIC((byte) 2), GROUP((byte) 3), BROKER((byte) 4);
+
+ private static final ResourceType[] VALUES = values();
+
+ private final byte id;
+
+ ResourceType(byte id) {
+ this.id = id;
+ }
+
+ public byte id() {
+ return id;
+ }
+
+ public static ResourceType forId(byte id) {
+ if (id < 0)
+ throw new IllegalArgumentException("id should be positive, id: " + id);
+ if (id >= VALUES.length)
+ return UNKNOWN;
+ return VALUES[id];
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
index 4ff0dc6..e623d73 100644
--- a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
+++ b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
@@ -106,7 +106,7 @@ public interface CreateTopicPolicy extends Configurable, AutoCloseable {
@Override
public String toString() {
- return "RequestMetadata(topic=" + topic +
+ return "CreateTopicPolicy.RequestMetadata(topic=" + topic +
", numPartitions=" + numPartitions +
", replicationFactor=" + replicationFactor +
", replicasAssignments=" + replicasAssignments +
@@ -116,12 +116,12 @@ public interface CreateTopicPolicy extends Configurable, AutoCloseable {
/**
* Validate the request parameters and throw a <code>PolicyViolationException</code> with a suitable error
- * message if the create request parameters for the provided topic do not satisfy this policy.
+ * message if the create topics request parameters for the provided topic do not satisfy this policy.
*
* Clients will receive the POLICY_VIOLATION error code along with the exception's message. Note that validation
* failure only affects the relevant topic, other topics in the request will still be processed.
*
- * @param requestMetadata the create request parameters for the provided topic.
+ * @param requestMetadata the create topics request parameters for the provided topic.
* @throws PolicyViolationException if the request parameters do not satisfy this policy.
*/
void validate(RequestMetadata requestMetadata) throws PolicyViolationException;
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
index 2d7c546..06ace63 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
@@ -45,7 +45,9 @@ public class AclOperationTest {
new AclOperationTestInfo(AclOperation.DELETE, 6, "delete", false),
new AclOperationTestInfo(AclOperation.ALTER, 7, "alter", false),
new AclOperationTestInfo(AclOperation.DESCRIBE, 8, "describe", false),
- new AclOperationTestInfo(AclOperation.CLUSTER_ACTION, 9, "cluster_action", false)
+ new AclOperationTestInfo(AclOperation.CLUSTER_ACTION, 9, "cluster_action", false),
+ new AclOperationTestInfo(AclOperation.DESCRIBE_CONFIGS, 10, "describe_configs", false),
+ new AclOperationTestInfo(AclOperation.ALTER_CONFIGS, 11, "alter_configs", false)
};
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 6d01b0a..e432c0a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -27,9 +27,9 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.CreateAclsResponse;
import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
-import org.apache.kafka.common.requests.CreateTopicsResponse.Error;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DeleteAclsResponse;
import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
@@ -60,8 +60,7 @@ import static org.junit.Assert.fail;
/**
* A unit test for KafkaAdminClient.
*
- * See for an integration test of the KafkaAdminClient.
- * Also see KafkaAdminClientIntegrationTest for a unit test of the admin client.
+ * See KafkaAdminClientIntegrationTest for an integration test of the KafkaAdminClient.
*/
public class KafkaAdminClientTest {
@Rule
@@ -160,8 +159,7 @@ public class KafkaAdminClientTest {
@Test
public void testCloseAdminClient() throws Exception {
- try (MockKafkaAdminClientContext ctx = new MockKafkaAdminClientContext(newStrMap())) {
- }
+ new MockKafkaAdminClientContext(newStrMap()).close();
}
private static void assertFutureError(Future<?> future, Class<? extends Throwable> exceptionClass)
@@ -186,12 +184,12 @@ public class KafkaAdminClientTest {
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
ctx.mockClient.setNodeApiVersions(NodeApiVersions.create());
ctx.mockClient.setNode(new Node(0, "localhost", 8121));
- ctx.mockClient.prepareResponse(new CreateTopicsResponse(new HashMap<String, Error>() {{
- put("myTopic", new Error(Errors.NONE, ""));
+ ctx.mockClient.prepareResponse(new CreateTopicsResponse(new HashMap<String, ApiError>() {{
+ put("myTopic", new ApiError(Errors.NONE, ""));
}}));
KafkaFuture<Void> future = ctx.client.
createTopics(Collections.singleton(new NewTopic("myTopic", new HashMap<Integer, List<Integer>>() {{
- put(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 2}));
+ put(0, Arrays.asList(new Integer[]{0, 1, 2}));
}})), new CreateTopicsOptions().timeoutMs(1000)).all();
assertFutureError(future, TimeoutException.class);
}
@@ -203,12 +201,12 @@ public class KafkaAdminClientTest {
ctx.mockClient.setNodeApiVersions(NodeApiVersions.create());
ctx.mockClient.prepareMetadataUpdate(ctx.cluster, Collections.<String>emptySet());
ctx.mockClient.setNode(ctx.nodes.get(0));
- ctx.mockClient.prepareResponse(new CreateTopicsResponse(new HashMap<String, Error>() {{
- put("myTopic", new Error(Errors.NONE, ""));
+ ctx.mockClient.prepareResponse(new CreateTopicsResponse(new HashMap<String, ApiError>() {{
+ put("myTopic", new ApiError(Errors.NONE, ""));
}}));
KafkaFuture<Void> future = ctx.client.
createTopics(Collections.singleton(new NewTopic("myTopic", new HashMap<Integer, List<Integer>>() {{
- put(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 2}));
+ put(0, Arrays.asList(new Integer[]{0, 1, 2}));
}})), new CreateTopicsOptions().timeoutMs(10000)).all();
future.get();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java
index 8f6f670..af72de2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java
@@ -40,7 +40,8 @@ public class ResourceTypeTest {
new AclResourceTypeTestInfo(ResourceType.ANY, 1, "any", false),
new AclResourceTypeTestInfo(ResourceType.TOPIC, 2, "topic", false),
new AclResourceTypeTestInfo(ResourceType.GROUP, 3, "group", false),
- new AclResourceTypeTestInfo(ResourceType.CLUSTER, 4, "cluster", false)
+ new AclResourceTypeTestInfo(ResourceType.CLUSTER, 4, "cluster", false),
+ new AclResourceTypeTestInfo(ResourceType.BROKER, 5, "broker", false)
};
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index ede55a5..9142c90 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -60,6 +60,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -230,6 +231,13 @@ public class RequestResponseTest {
checkRequest(createDeleteAclsRequest());
checkErrorResponse(createDeleteAclsRequest(), new SecurityDisabledException("Security is not enabled."));
checkResponse(createDeleteAclsResponse(), ApiKeys.DELETE_ACLS.latestVersion());
+ checkRequest(createAlterConfigsRequest());
+ checkErrorResponse(createAlterConfigsRequest(), new UnknownServerException());
+ checkResponse(createAlterConfigsResponse(), 0);
+ checkRequest(createDescribeConfigsRequest());
+ checkRequest(createDescribeConfigsRequestWithConfigEntries());
+ checkErrorResponse(createDescribeConfigsRequest(), new UnknownServerException());
+ checkResponse(createDescribeConfigsResponse(), 0);
}
@Test
@@ -887,9 +895,9 @@ public class RequestResponseTest {
}
private CreateTopicsResponse createCreateTopicResponse() {
- Map<String, CreateTopicsResponse.Error> errors = new HashMap<>();
- errors.put("t1", new CreateTopicsResponse.Error(Errors.INVALID_TOPIC_EXCEPTION, null));
- errors.put("t2", new CreateTopicsResponse.Error(Errors.LEADER_NOT_AVAILABLE, "Leader with id 5 is not available."));
+ Map<String, ApiError> errors = new HashMap<>();
+ errors.put("t1", new ApiError(Errors.INVALID_TOPIC_EXCEPTION, null));
+ errors.put("t2", new ApiError(Errors.LEADER_NOT_AVAILABLE, "Leader with id 5 is not available."));
return new CreateTopicsResponse(errors);
}
@@ -1085,4 +1093,50 @@ public class RequestResponseTest {
closed = true;
}
}
+
+ private DescribeConfigsRequest createDescribeConfigsRequest() {
+ return new DescribeConfigsRequest.Builder(asList(
+ new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"),
+ new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"))).build((short) 0);
+ }
+
+ private DescribeConfigsRequest createDescribeConfigsRequestWithConfigEntries() {
+ Map<org.apache.kafka.common.requests.Resource, Collection<String>> resources = new HashMap<>();
+ resources.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), asList("foo", "bar"));
+ resources.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"), null);
+ resources.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic a"), Collections.<String>emptyList());
+ return new DescribeConfigsRequest.Builder(resources).build((short) 0);
+ }
+
+ private DescribeConfigsResponse createDescribeConfigsResponse() {
+ Map<org.apache.kafka.common.requests.Resource, DescribeConfigsResponse.Config> configs = new HashMap<>();
+ List<DescribeConfigsResponse.ConfigEntry> configEntries = asList(
+ new DescribeConfigsResponse.ConfigEntry("config_name", "config_value", false, true, false),
+ new DescribeConfigsResponse.ConfigEntry("another_name", "another value", true, false, true)
+ );
+ configs.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), new DescribeConfigsResponse.Config(
+ new ApiError(Errors.NONE, null), configEntries));
+ configs.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"), new DescribeConfigsResponse.Config(
+ new ApiError(Errors.NONE, null), Collections.<DescribeConfigsResponse.ConfigEntry>emptyList()));
+ return new DescribeConfigsResponse(200, configs);
+ }
+
+ private AlterConfigsRequest createAlterConfigsRequest() {
+ Map<org.apache.kafka.common.requests.Resource, AlterConfigsRequest.Config> configs = new HashMap<>();
+ List<AlterConfigsRequest.ConfigEntry> configEntries = asList(
+ new AlterConfigsRequest.ConfigEntry("config_name", "config_value"),
+ new AlterConfigsRequest.ConfigEntry("another_name", "another value")
+ );
+ configs.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), new AlterConfigsRequest.Config(configEntries));
+ configs.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"),
+ new AlterConfigsRequest.Config(Collections.<AlterConfigsRequest.ConfigEntry>emptyList()));
+ return new AlterConfigsRequest((short) 0, configs, false);
+ }
+
+ private AlterConfigsResponse createAlterConfigsResponse() {
+ Map<org.apache.kafka.common.requests.Resource, ApiError> errors = new HashMap<>();
+ errors.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), new ApiError(Errors.NONE, null));
+ errors.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"), new ApiError(Errors.INVALID_REQUEST, "This request is invalid"));
+ return new AlterConfigsResponse(20, errors);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 0bedee3..925c407 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -31,9 +31,10 @@ object AclCommand {
val Newline = scala.util.Properties.lineSeparator
val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] (
- Topic -> Set(Read, Write, Describe, All, Delete),
+ Broker -> Set(DescribeConfigs),
+ Topic -> Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, All),
Group -> Set(Read, Describe, All),
- Cluster -> Set(Create, ClusterAction, All)
+ Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, All)
)
def main(args: Array[String]) {
@@ -237,6 +238,9 @@ object AclCommand {
if (opts.options.has(opts.groupOpt))
opts.options.valuesOf(opts.groupOpt).asScala.foreach(group => resources += new Resource(Group, group.trim))
+ if (opts.options.has(opts.brokerOpt))
+ opts.options.valuesOf(opts.brokerOpt).asScala.foreach(broker => resources += new Resource(Broker, broker.toString))
+
if (resources.isEmpty && dieIfNoResourceFound)
CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --group <group>")
@@ -285,6 +289,12 @@ object AclCommand {
.describedAs("group")
.ofType(classOf[String])
+ val brokerOpt = parser.accepts("broker", "broker to which the ACLs should be added or removed. " +
+ "A value of * indicates the ACLs should apply to all brokers.")
+ .withRequiredArg
+ .describedAs("broker")
+ .ofType(classOf[Int])
+
val addOpt = parser.accepts("add", "Indicates you are trying to add ACLs.")
val removeOpt = parser.accepts("remove", "Indicates you are trying to remove ACLs.")
val listOpt = parser.accepts("list", "List ACLs for the specified resource, use --topic <topic> or --group <group> or --cluster to specify a resource.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 49d249b..bd8771b 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -46,6 +46,26 @@ trait AdminUtilities {
def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties)
def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties)
def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configs: Properties)
+
+ def changeConfigs(zkUtils: ZkUtils, entityType: String, entityName: String, configs: Properties): Unit = {
+
+ def parseBroker(broker: String): Int = {
+ try broker.toInt
+ catch {
+ case _: NumberFormatException =>
+ throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value")
+ }
+ }
+
+ entityType match {
+ case ConfigType.Topic => changeTopicConfig(zkUtils, entityName, configs)
+ case ConfigType.Client => changeClientIdConfig(zkUtils, entityName, configs)
+ case ConfigType.User => changeUserOrUserClientIdConfig(zkUtils, entityName, configs)
+ case ConfigType.Broker => changeBrokerConfig(zkUtils, Seq(parseBroker(entityName)), configs)
+ case _ => throw new IllegalArgumentException(s"$entityType is not a known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, ${ConfigType.Broker}")
+ }
+ }
+
def fetchEntityConfig(zkUtils: ZkUtils,entityType: String, entityName: String): Properties
}
@@ -527,6 +547,14 @@ object AdminUtils extends Logging with AdminUtilities {
changeEntityConfig(zkUtils, ConfigType.User, sanitizedEntityName, configs)
}
+ def validateTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties): Unit = {
+ Topic.validate(topic)
+ if (!topicExists(zkUtils, topic))
+ throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
+ // remove the topic overrides
+ LogConfig.validate(configs)
+ }
+
/**
* Update the config for an existing topic and create a change notification so the change will propagate to other brokers
*
@@ -537,10 +565,7 @@ object AdminUtils extends Logging with AdminUtilities {
*
*/
def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties) {
- if (!topicExists(zkUtils, topic))
- throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
- // remove the topic overrides
- LogConfig.validate(configs)
+ validateTopicConfig(zkUtils, topic, configs)
changeEntityConfig(zkUtils, ConfigType.Topic, topic, configs)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 3985490..f74d31d 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -98,13 +98,8 @@ object ConfigCommand extends Config {
configs.putAll(configsToBeAdded)
configsToBeDeleted.foreach(configs.remove(_))
- entityType match {
- case ConfigType.Topic => utils.changeTopicConfig(zkUtils, entityName, configs)
- case ConfigType.Client => utils.changeClientIdConfig(zkUtils, entityName, configs)
- case ConfigType.User => utils.changeUserOrUserClientIdConfig(zkUtils, entityName, configs)
- case ConfigType.Broker => utils.changeBrokerConfig(zkUtils, Seq(parseBroker(entityName)), configs)
- case _ => throw new IllegalArgumentException(s"$entityType is not a known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, ${ConfigType.Broker}")
- }
+ utils.changeConfigs(zkUtils, entityType, entityName, configs)
+
println(s"Completed Updating config for entity: $entity.")
}
@@ -129,14 +124,6 @@ object ConfigCommand extends Config {
}
}
- private def parseBroker(broker: String): Int = {
- try broker.toInt
- catch {
- case _: NumberFormatException =>
- throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value")
- }
- }
-
private def describeConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions) {
val configEntity = parseEntity(opts)
val describeAllUsers = configEntity.root.entityType == ConfigType.User && !configEntity.root.sanitizedName.isDefined && !configEntity.child.isDefined
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 30bc26b..6a329d8 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -321,7 +321,7 @@ object LogConfig {
val names = configNames
for(name <- props.asScala.keys)
if (!names.contains(name))
- throw new InvalidConfigurationException(s"Unknown Log configuration $name.")
+ throw new InvalidConfigurationException(s"Unknown topic config name: $name")
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 0b094df..d8cdf90 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -44,7 +44,7 @@ import scala.collection._
*/
@threadsafe
class LogManager(val logDirs: Array[File],
- val topicConfigs: Map[String, LogConfig],
+ val topicConfigs: Map[String, LogConfig], // note that this doesn't get updated after creation
val defaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
ioThreads: Int,
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/security/auth/Operation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Operation.scala b/core/src/main/scala/kafka/security/auth/Operation.scala
index 7d292d2..f65d9f0 100644
--- a/core/src/main/scala/kafka/security/auth/Operation.scala
+++ b/core/src/main/scala/kafka/security/auth/Operation.scala
@@ -57,16 +57,25 @@ case object ClusterAction extends Operation {
val name = "ClusterAction"
val toJava = AclOperation.CLUSTER_ACTION
}
+case object DescribeConfigs extends Operation {
+ val name = "DescribeConfigs"
+ val toJava = AclOperation.DESCRIBE_CONFIGS
+}
+case object AlterConfigs extends Operation {
+ val name = "AlterConfigs"
+ val toJava = AclOperation.ALTER_CONFIGS
+}
case object All extends Operation {
val name = "All"
val toJava = AclOperation.ALL
}
object Operation {
- def fromString(operation: String): Operation = {
- val op = values.find(op => op.name.equalsIgnoreCase(operation))
- op.getOrElse(throw new KafkaException(operation + " not a valid operation name. The valid names are " + values.mkString(",")))
- }
+
+ def fromString(operation: String): Operation = {
+ val op = values.find(op => op.name.equalsIgnoreCase(operation))
+ op.getOrElse(throw new KafkaException(operation + " not a valid operation name. The valid names are " + values.mkString(",")))
+ }
def fromJava(operation: AclOperation): Try[Operation] = {
try {
@@ -76,5 +85,6 @@ object Operation {
}
}
- def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, Describe, ClusterAction, All)
+ def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, Describe, ClusterAction, AlterConfigs,
+ DescribeConfigs, All)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/security/auth/ResourceType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala
index e58d8ec..ea7ce3c 100644
--- a/core/src/main/scala/kafka/security/auth/ResourceType.scala
+++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala
@@ -31,6 +31,11 @@ case object Cluster extends ResourceType {
val error = Errors.CLUSTER_AUTHORIZATION_FAILED
}
+case object Broker extends ResourceType {
+ val name = "Broker"
+ val error = Errors.BROKER_AUTHORIZATION_FAILED
+}
+
case object Topic extends ResourceType {
val name = "Topic"
val error = Errors.TOPIC_AUTHORIZATION_FAILED
@@ -58,5 +63,5 @@ object ResourceType {
rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(",")))
}
- def values: Seq[ResourceType] = List(Cluster, Topic, Group, ProducerTransactionalId, ProducerIdResource)
+ def values: Seq[ResourceType] = List(Cluster, Topic, Group, ProducerTransactionalId, ProducerIdResource, Broker)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index eaacd6a..19fbdc4 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -252,7 +252,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
/**
* Safely updates the resources ACLs by ensuring reads and writes respect the expected zookeeper version.
- * Continues to retry until it succesfully updates zookeeper.
+ * Continues to retry until it successfully updates zookeeper.
*
* Returns a boolean indicating if the content of the ACLs was actually changed.
*
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 2f60cbd..c147593 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -16,18 +16,20 @@
*/
package kafka.server
-import java.util.Properties
+import java.util.{Collections, Properties}
import kafka.admin.AdminUtils
import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.log.LogConfig
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException}
import org.apache.kafka.common.errors.{ApiException, InvalidRequestException, PolicyViolationException}
+import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.CreateTopicsRequest._
-import org.apache.kafka.common.requests.CreateTopicsResponse
+import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, DescribeConfigsResponse, Resource, ResourceType}
import org.apache.kafka.server.policy.CreateTopicPolicy
import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
@@ -63,7 +65,7 @@ class AdminManager(val config: KafkaConfig,
def createTopics(timeout: Int,
validateOnly: Boolean,
createInfo: Map[String, TopicDetails],
- responseCallback: Map[String, CreateTopicsResponse.Error] => Unit) {
+ responseCallback: Map[String, ApiError] => Unit) {
// 1. map over topics creating assignment and calling zookeeper
val brokers = metadataCache.getAliveBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }
@@ -114,15 +116,15 @@ class AdminManager(val config: KafkaConfig,
else
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments, configs, update = false)
}
- CreateTopicMetadata(topic, assignments, new CreateTopicsResponse.Error(Errors.NONE, null))
+ CreateTopicMetadata(topic, assignments, new ApiError(Errors.NONE, null))
} catch {
// Log client errors at a lower level than unexpected exceptions
case e@ (_: PolicyViolationException | _: ApiException) =>
info(s"Error processing create topic request for topic $topic with arguments $arguments", e)
- CreateTopicMetadata(topic, Map(), new CreateTopicsResponse.Error(Errors.forException(e), e.getMessage))
+ CreateTopicMetadata(topic, Map(), ApiError.fromThrowable(e))
case e: Throwable =>
error(s"Error processing create topic request for topic $topic with arguments $arguments", e)
- CreateTopicMetadata(topic, Map(), new CreateTopicsResponse.Error(Errors.forException(e), e.getMessage))
+ CreateTopicMetadata(topic, Map(), ApiError.fromThrowable(e))
}
}
@@ -131,7 +133,7 @@ class AdminManager(val config: KafkaConfig,
val results = metadata.map { createTopicMetadata =>
// ignore topics that already have errors
if (createTopicMetadata.error.is(Errors.NONE) && !validateOnly) {
- (createTopicMetadata.topic, new CreateTopicsResponse.Error(Errors.REQUEST_TIMED_OUT, null))
+ (createTopicMetadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null))
} else {
(createTopicMetadata.topic, createTopicMetadata.error)
}
@@ -189,6 +191,99 @@ class AdminManager(val config: KafkaConfig,
}
}
+ def describeConfigs(resourceToConfigNames: Map[Resource, Option[Set[String]]]): Map[Resource, DescribeConfigsResponse.Config] = {
+ resourceToConfigNames.map { case (resource, configNames) =>
+
+ def createResponseConfig(config: AbstractConfig, isReadOnly: Boolean, isDefault: String => Boolean): DescribeConfigsResponse.Config = {
+ val filteredConfigPairs = config.values.asScala.filter { case (configName, _) =>
+ /* Always returns true if configNames is None */
+ configNames.map(_.contains(configName)).getOrElse(true)
+ }.toIndexedSeq
+
+ val configEntries = filteredConfigPairs.map { case (name, value) =>
+ val configEntryType = config.typeOf(name)
+ val isSensitive = configEntryType == ConfigDef.Type.PASSWORD
+ val valueAsString =
+ if (isSensitive) null
+ else ConfigDef.convertToString(value, configEntryType)
+ new DescribeConfigsResponse.ConfigEntry(name, valueAsString, isSensitive, isDefault(name), isReadOnly)
+ }
+
+ new DescribeConfigsResponse.Config(new ApiError(Errors.NONE, null), configEntries.asJava)
+ }
+
+ try {
+ val resourceConfig = resource.`type` match {
+
+ case ResourceType.TOPIC =>
+ val topic = resource.name
+ Topic.validate(topic)
+ // Consider optimizing this by caching the configs or retrieving them from the `Log` when possible
+ val topicProps = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+ val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps)
+ createResponseConfig(logConfig, isReadOnly = false, name => !topicProps.containsKey(name))
+
+ case ResourceType.BROKER =>
+ val brokerId = try resource.name.toInt catch {
+ case _: NumberFormatException =>
+ throw new InvalidRequestException(s"Broker id must be an integer, but it is: ${resource.name}")
+ }
+ if (brokerId == config.brokerId)
+ createResponseConfig(config, isReadOnly = true, name => !config.originals.containsKey(name))
+ else
+ throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId}, but received $brokerId")
+
+ case resourceType => throw new InvalidRequestException(s"Unsupported resource type: $resourceType")
+ }
+ resource -> resourceConfig
+ } catch {
+ case e: Throwable =>
+ // Log client errors at a lower level than unexpected exceptions
+ val message = s"Error processing describe configs request for resource $resource"
+ if (e.isInstanceOf[ApiException])
+ info(message, e)
+ else
+ error(message, e)
+ resource -> new DescribeConfigsResponse.Config(ApiError.fromThrowable(e), Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
+ }
+ }.toMap
+ }
+
+ def alterConfigs(configs: Map[Resource, AlterConfigsRequest.Config], validateOnly: Boolean): Map[Resource, ApiError] = {
+ configs.map { case (resource, config) =>
+ try {
+ resource.`type` match {
+ case ResourceType.TOPIC =>
+ val topic = resource.name
+ val properties = new Properties
+ config.entries.asScala.foreach { configEntry =>
+ properties.setProperty(configEntry.name(), configEntry.value())
+ }
+ if (validateOnly)
+ AdminUtils.validateTopicConfig(zkUtils, topic, properties)
+ else
+ AdminUtils.changeTopicConfig(zkUtils, topic, properties)
+ resource -> new ApiError(Errors.NONE, null)
+ case resourceType =>
+ throw new InvalidRequestException(s"AlterConfigs is only supported for topics, but resource type is $resourceType")
+ }
+ } catch {
+ case e: ConfigException =>
+ val message = s"Invalid config value for resource $resource: ${e.getMessage}"
+ info(message)
+ resource -> ApiError.fromThrowable(new InvalidRequestException(message, e))
+ case e: Throwable =>
+ // Log client errors at a lower level than unexpected exceptions
+ val message = s"Error processing alter configs request for resource $resource"
+ if (e.isInstanceOf[ApiException])
+ info(message, e)
+ else
+ error(message, e)
+ resource -> ApiError.fromThrowable(e)
+ }
+ }.toMap
+ }
+
def shutdown() {
topicPurgatory.shutdown()
CoreUtils.swallow(createTopicPolicy.foreach(_.close()))
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedCreateTopics.scala b/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
index 32f844c..abf6bc0 100644
--- a/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
+++ b/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
@@ -19,7 +19,7 @@ package kafka.server
import kafka.api.LeaderAndIsr
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.CreateTopicsResponse
+import org.apache.kafka.common.requests.{ApiError, CreateTopicsResponse}
import scala.collection._
@@ -29,7 +29,7 @@ import scala.collection._
* TODO: local state doesn't count, need to know state of all relevant brokers
*
*/
-case class CreateTopicMetadata(topic: String, replicaAssignments: Map[Int, Seq[Int]], error: CreateTopicsResponse.Error)
+case class CreateTopicMetadata(topic: String, replicaAssignments: Map[Int, Seq[Int]], error: ApiError)
/**
* A delayed create topics operation that can be created by the admin manager and watched
@@ -38,7 +38,7 @@ case class CreateTopicMetadata(topic: String, replicaAssignments: Map[Int, Seq[I
class DelayedCreateTopics(delayMs: Long,
createMetadata: Seq[CreateTopicMetadata],
adminManager: AdminManager,
- responseCallback: Map[String, CreateTopicsResponse.Error] => Unit)
+ responseCallback: Map[String, ApiError] => Unit)
extends DelayedOperation(delayMs) {
/**
@@ -70,7 +70,7 @@ class DelayedCreateTopics(delayMs: Long,
val results = createMetadata.map { metadata =>
// ignore topics that already have errors
if (metadata.error.is(Errors.NONE) && missingLeaderCount(metadata.topic, metadata.replicaAssignments.keySet) > 0)
- (metadata.topic, new CreateTopicsResponse.Error(Errors.REQUEST_TIMED_OUT, null))
+ (metadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null))
else
(metadata.topic, metadata.error)
}.toMap
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index bf7d4c1..02a1103 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -43,10 +43,10 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol}
-import org.apache.kafka.common.record._
+import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse}
-import org.apache.kafka.common.requests._
+import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{Node, TopicPartition}
@@ -126,6 +126,8 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
+ case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
+ case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
}
} catch {
case e: FatalExitError => throw e
@@ -1266,7 +1268,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleCreateTopicsRequest(request: RequestChannel.Request) {
val createTopicsRequest = request.body[CreateTopicsRequest]
- def sendResponseCallback(results: Map[String, CreateTopicsResponse.Error]): Unit = {
+ def sendResponseCallback(results: Map[String, ApiError]): Unit = {
def createResponse(throttleTimeMs: Int): AbstractResponse = {
val responseBody = new CreateTopicsResponse(throttleTimeMs, results.asJava)
trace(s"Sending create topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
@@ -1277,12 +1279,12 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!controller.isActive) {
val results = createTopicsRequest.topics.asScala.map { case (topic, _) =>
- (topic, new CreateTopicsResponse.Error(Errors.NOT_CONTROLLER, null))
+ (topic, new ApiError(Errors.NOT_CONTROLLER, null))
}
sendResponseCallback(results)
} else if (!authorize(request.session, Create, Resource.ClusterResource)) {
val results = createTopicsRequest.topics.asScala.map { case (topic, _) =>
- (topic, new CreateTopicsResponse.Error(Errors.CLUSTER_AUTHORIZATION_FAILED, null))
+ (topic, new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null))
}
sendResponseCallback(results)
} else {
@@ -1291,7 +1293,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
// Special handling to add duplicate topics to the response
- def sendResponseWithDuplicatesCallback(results: Map[String, CreateTopicsResponse.Error]): Unit = {
+ def sendResponseWithDuplicatesCallback(results: Map[String, ApiError]): Unit = {
val duplicatedTopicsResults =
if (duplicateTopics.nonEmpty) {
@@ -1300,7 +1302,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// We can send the error message in the response for version 1, so we don't have to log it any more
if (request.header.apiVersion == 0)
warn(errorMessage)
- duplicateTopics.keySet.map((_, new CreateTopicsResponse.Error(Errors.INVALID_REQUEST, errorMessage))).toMap
+ duplicateTopics.keySet.map((_, new ApiError(Errors.INVALID_REQUEST, errorMessage))).toMap
} else Map.empty
val completeResults = results ++ duplicatedTopicsResults
@@ -1894,11 +1896,11 @@ class KafkaApis(val requestChannel: RequestChannel,
}
if (mayThrottle) {
- val clientId : String =
- if (request.requestObj.isInstanceOf[ControlledShutdownRequest])
- request.requestObj.asInstanceOf[ControlledShutdownRequest].clientId.getOrElse("")
- else
+ val clientId: String = request.requestObj match {
+ case r: ControlledShutdownRequest => r.clientId.getOrElse("")
+ case _ =>
throw new IllegalStateException("Old style requests should only be used for ControlledShutdownRequest")
+ }
sendResponseMaybeThrottle(request, clientId, sendResponseCallback)
} else
sendResponseExemptThrottle(request, () => sendResponseCallback(0))
@@ -1920,6 +1922,64 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
+ def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
+ val alterConfigsRequest = request.body[AlterConfigsRequest]
+ val (authorizedResources, unauthorizedResources) = alterConfigsRequest.configs.asScala.partition { case (resource, _) =>
+ resource.`type` match {
+ case RResourceType.BROKER =>
+ authorize(request.session, AlterConfigs, new Resource(Broker, resource.name)) ||
+ authorize(request.session, AlterConfigs, Resource.ClusterResource)
+ case RResourceType.TOPIC =>
+ authorize(request.session, AlterConfigs, new Resource(Topic, resource.name)) ||
+ authorize(request.session, AlterConfigs, Resource.ClusterResource)
+ case rt => throw new InvalidRequestException(s"Unexpected resource type $rt")
+ }
+ }
+ val authorizedResult = adminManager.alterConfigs(authorizedResources, alterConfigsRequest.validateOnly)
+ val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+ resource -> configsAuthorizationApiError(request.session, resource)
+ }
+ sendResponseMaybeThrottle(request, new AlterConfigsResponse(_, (authorizedResult ++ unauthorizedResult).asJava))
+ }
+
+ private def configsAuthorizationApiError(session: RequestChannel.Session, resource: RResource): ApiError = {
+ val error = resource.`type` match {
+ case RResourceType.BROKER => Errors.BROKER_AUTHORIZATION_FAILED
+ case RResourceType.TOPIC =>
+ // Don't leak topic name unless the user has describe topic permission
+ if (authorize(session, Describe, new Resource(Topic, resource.name)))
+ Errors.TOPIC_AUTHORIZATION_FAILED
+ else
+ Errors.UNKNOWN_TOPIC_OR_PARTITION
+ case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}")
+ }
+ new ApiError(error, null)
+ }
+
+ def handleDescribeConfigsRequest(request: RequestChannel.Request): Unit = {
+ val describeConfigsRequest = request.body[DescribeConfigsRequest]
+ val (authorizedResources, unauthorizedResources) = describeConfigsRequest.resources.asScala.partition { resource =>
+ resource.`type` match {
+ case RResourceType.BROKER =>
+ authorize(request.session, DescribeConfigs, new Resource(Broker, resource.name)) ||
+ authorize(request.session, DescribeConfigs, Resource.ClusterResource)
+ case RResourceType.TOPIC =>
+ authorize(request.session, DescribeConfigs, new Resource(Topic, resource.name)) ||
+ authorize(request.session, DescribeConfigs, Resource.ClusterResource)
+ case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}")
+ }
+ }
+ val authorizedConfigs = adminManager.describeConfigs(authorizedResources.map { resource =>
+ resource -> Option(describeConfigsRequest.configNames(resource)).map(_.asScala.toSet)
+ }.toMap)
+ val unauthorizedConfigs = unauthorizedResources.map { resource =>
+ val error = configsAuthorizationApiError(request.session, resource)
+ resource -> new DescribeConfigsResponse.Config(error, Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
+ }
+
+ sendResponseMaybeThrottle(request, new DescribeConfigsResponse(_, (authorizedConfigs ++ unauthorizedConfigs).asJava))
+ }
+
def authorizeClusterAction(request: RequestChannel.Request): Unit = {
if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
throw new ClusterAuthorizationException(s"Request $request is not authorized.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 788f718..94dfa43 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -24,20 +24,18 @@ import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import com.yammer.metrics.core.Gauge
-import kafka.admin.AdminUtils
import kafka.api.KAFKA_0_9_0
import kafka.cluster.Broker
import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
-import kafka.controller.{ControllerStats, KafkaController}
+import kafka.controller.KafkaController
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
-import kafka.log.{CleanerConfig, LogConfig, LogManager}
+import kafka.log.{LogConfig, LogManager}
import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsReporter}
import kafka.network.{BlockingChannel, SocketServer}
import kafka.security.CredentialProvider
import kafka.security.auth.Authorizer
import kafka.utils._
-import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
import org.apache.kafka.common.internals.ClusterResourceListeners
import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
[3/3] kafka git commit: KAFKA-3267;
Describe and Alter Configs Admin APIs (KIP-133)
Posted by ij...@apache.org.
KAFKA-3267; Describe and Alter Configs Admin APIs (KIP-133)
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #3076 from ijuma/kafka-3267-describe-alter-configs-protocol
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/972b7545
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/972b7545
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/972b7545
Branch: refs/heads/trunk
Commit: 972b7545363ae85a55f94cf7ea83614be8840b75
Parents: e1abf17
Author: Ismael Juma <is...@juma.me.uk>
Authored: Thu May 18 06:51:02 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu May 18 06:51:02 2017 +0100
----------------------------------------------------------------------
checkstyle/suppressions.xml | 4 +-
.../kafka/clients/admin/AclOperation.java | 12 +-
.../apache/kafka/clients/admin/AdminClient.java | 60 +++++
.../clients/admin/AlterConfigsOptions.java | 45 ++++
.../clients/admin/AlterConfigsResults.java | 42 ++++
.../org/apache/kafka/clients/admin/Config.java | 63 ++++++
.../apache/kafka/clients/admin/ConfigEntry.java | 59 +++++
.../kafka/clients/admin/ConfigResource.java | 65 ++++++
.../clients/admin/DescribeConfigsOptions.java | 37 +++
.../clients/admin/DescribeConfigsResults.java | 59 +++++
.../kafka/clients/admin/KafkaAdminClient.java | 184 ++++++++++++++-
.../kafka/clients/admin/ResourceType.java | 7 +-
.../kafka/common/config/AbstractConfig.java | 7 +
.../errors/BrokerAuthorizationException.java | 23 ++
.../apache/kafka/common/protocol/ApiKeys.java | 4 +-
.../apache/kafka/common/protocol/Errors.java | 20 +-
.../apache/kafka/common/protocol/Protocol.java | 182 ++++++++++-----
.../kafka/common/protocol/types/Schema.java | 16 +-
.../kafka/common/requests/AbstractRequest.java | 6 +
.../kafka/common/requests/AbstractResponse.java | 4 +
.../common/requests/AlterConfigsRequest.java | 179 +++++++++++++++
.../common/requests/AlterConfigsResponse.java | 88 ++++++++
.../apache/kafka/common/requests/ApiError.java | 99 ++++++++
.../common/requests/CreateTopicsRequest.java | 12 +-
.../common/requests/CreateTopicsResponse.java | 68 +-----
.../common/requests/DescribeConfigsRequest.java | 142 ++++++++++++
.../requests/DescribeConfigsResponse.java | 186 +++++++++++++++
.../apache/kafka/common/requests/Resource.java | 60 +++++
.../kafka/common/requests/ResourceType.java | 42 ++++
.../kafka/server/policy/CreateTopicPolicy.java | 6 +-
.../kafka/clients/admin/AclOperationTest.java | 4 +-
.../clients/admin/KafkaAdminClientTest.java | 20 +-
.../kafka/clients/admin/ResourceTypeTest.java | 3 +-
.../common/requests/RequestResponseTest.java | 60 ++++-
.../src/main/scala/kafka/admin/AclCommand.scala | 14 +-
.../src/main/scala/kafka/admin/AdminUtils.scala | 33 ++-
.../main/scala/kafka/admin/ConfigCommand.scala | 17 +-
core/src/main/scala/kafka/log/LogConfig.scala | 2 +-
core/src/main/scala/kafka/log/LogManager.scala | 2 +-
.../scala/kafka/security/auth/Operation.scala | 20 +-
.../kafka/security/auth/ResourceType.scala | 7 +-
.../security/auth/SimpleAclAuthorizer.scala | 2 +-
.../main/scala/kafka/server/AdminManager.scala | 109 ++++++++-
.../kafka/server/DelayedCreateTopics.scala | 8 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 82 ++++++-
.../main/scala/kafka/server/KafkaServer.scala | 6 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 132 ++++++-----
.../api/KafkaAdminClientIntegrationTest.scala | 224 ++++++++++++++++++-
.../scala/unit/kafka/admin/AclCommandTest.scala | 11 +-
.../AbstractCreateTopicsRequestTest.scala | 8 +-
.../unit/kafka/server/RequestQuotaTest.scala | 18 +-
.../processor/internals/StreamsKafkaClient.java | 3 +-
52 files changed, 2271 insertions(+), 295 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 66548d9..dc00bee 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -19,7 +19,7 @@
files=".*/requests/AbstractResponse.java"/>
<suppress checks="MethodLength"
- files="KerberosLogin.java"/>
+ files="KerberosLogin.java|RequestResponseTest.java"/>
<suppress checks="ParameterNumber"
files="NetworkClient.java"/>
@@ -46,7 +46,7 @@
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|SsLTransportLayer|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslTransportLayer).java"/>
<suppress checks="JavaNCSS"
- files="KerberosLogin.java"/>
+ files="AbstractRequest.java|KerberosLogin.java"/>
<suppress checks="JavaNCSS"
files="AbstractRequest.java"/>
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java b/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
index 14fb61b..062e5e3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
@@ -73,7 +73,17 @@ public enum AclOperation {
/**
* CLUSTER_ACTION operation.
*/
- CLUSTER_ACTION((byte) 9);
+ CLUSTER_ACTION((byte) 9),
+
+ /**
+ * DESCRIBE_CONFIGS operation.
+ */
+ DESCRIBE_CONFIGS((byte) 10),
+
+ /**
+ * ALTER_CONFIGS operation.
+ */
+ ALTER_CONFIGS((byte) 11);
private final static HashMap<Byte, AclOperation> CODE_TO_VALUE = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 8bb495c..4cfc174 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -32,6 +32,7 @@ import java.util.Properties;
*/
@InterfaceStability.Unstable
public abstract class AdminClient implements AutoCloseable {
+
/**
* Create a new AdminClient with the given configuration.
*
@@ -196,6 +197,7 @@ public abstract class AdminClient implements AutoCloseable {
public abstract ApiVersionsResults apiVersions(Collection<Node> nodes, ApiVersionsOptions options);
/**
+<<<<<<< HEAD
* Similar to #{@link AdminClient#describeAcls(AclBindingFilter, DescribeAclsOptions),
* but uses the default options.
*
@@ -260,4 +262,62 @@ public abstract class AdminClient implements AutoCloseable {
* @return The DeleteAclsResult.
*/
public abstract DeleteAclsResults deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options);
+
+
+ /**
+ * Get the configuration for the specified resources with the default options.
+ *
+ * See {@link #describeConfigs(Collection, DescribeConfigsOptions)} for more details.
+ *
+ * @param resources The resources (topic and broker resource types are currently supported)
+ * @return The DescribeConfigsResults
+ */
+ public DescribeConfigsResults describeConfigs(Collection<ConfigResource> resources) {
+ return describeConfigs(resources, new DescribeConfigsOptions());
+ }
+
+ /**
+ * Get the configuration for the specified resources.
+ *
+ * The returned configuration includes default values and the isDefault() method can be used to distinguish them
+ * from user supplied values.
+ *
+ * The value of config entries where isSensitive() is true is always {@code null} so that sensitive information
+ * is not disclosed.
+ *
+ * Config entries where isReadOnly() is true cannot be updated.
+ *
+ * @param resources The resources (topic and broker resource types are currently supported)
+ * @param options The options to use when describing configs
+ * @return The DescribeConfigsResults
+ */
+ public abstract DescribeConfigsResults describeConfigs(Collection<ConfigResource> resources,
+ DescribeConfigsOptions options);
+
+ /**
+ * Update the configuration for the specified resources with the default options.
+ *
+ * See {@link #alterConfigs(Map, AlterConfigsOptions)} for more details.
+ *
+ * @param configs The resources with their configs (topic is the only resource type with configs that can
+ * be updated currently)
+ * @return The AlterConfigsResults
+ */
+ public AlterConfigsResults alterConfigs(Map<ConfigResource, Config> configs) {
+ return alterConfigs(configs, new AlterConfigsOptions());
+ }
+
+ /**
+ * Update the configuration for the specified resources with the default options.
+ *
+ * Updates are not transactional so they may succeed for some resources while fail for others. The configs for
+ * a particular resource are updated atomically.
+ *
+ * @param configs The resources with their configs (topic is the only resource type with configs that can
+ * be updated currently)
+ * @param options The options to use when describing configs
+ * @return The AlterConfigsResults
+ */
+ public abstract AlterConfigsResults alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options);
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
new file mode 100644
index 0000000..5698fed
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+@InterfaceStability.Unstable
+public class AlterConfigsOptions {
+
+ private Integer timeoutMs = null;
+ private boolean validateOnly = false;
+
+ public Integer timeoutMs() {
+ return timeoutMs;
+ }
+
+ public boolean isValidateOnly() {
+ return validateOnly;
+ }
+
+ public AlterConfigsOptions timeoutMs(Integer timeout) {
+ this.timeoutMs = timeout;
+ return this;
+ }
+
+ public AlterConfigsOptions validateOnly(boolean validateOnly) {
+ this.validateOnly = validateOnly;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResults.java
new file mode 100644
index 0000000..3f44cfd
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResults.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+@InterfaceStability.Unstable
+public class AlterConfigsResults {
+
+ private final Map<ConfigResource, KafkaFuture<Void>> futures;
+
+ AlterConfigsResults(Map<ConfigResource, KafkaFuture<Void>> futures) {
+ this.futures = futures;
+ }
+
+ public Map<ConfigResource, KafkaFuture<Void>> results() {
+ return futures;
+ }
+
+ public KafkaFuture<Void> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/Config.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Config.java b/clients/src/main/java/org/apache/kafka/clients/admin/Config.java
new file mode 100644
index 0000000..189a0b3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Config.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class Config {
+
+ private final Collection<ConfigEntry> entries;
+
+ public Config(Collection<ConfigEntry> entries) {
+ this.entries = entries;
+ }
+
+ public Collection<ConfigEntry> entries() {
+ return Collections.unmodifiableCollection(entries);
+ }
+
+ public ConfigEntry get(String name) {
+ for (ConfigEntry entry : entries)
+ if (entry.name().equals(name))
+ return entry;
+ return null;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ Config config = (Config) o;
+
+ return entries.equals(config.entries);
+ }
+
+ @Override
+ public int hashCode() {
+ return entries.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "Config(entries=" + entries + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
new file mode 100644
index 0000000..cafc8fb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+public class ConfigEntry {
+
+ private final String name;
+ private final String value;
+ private final boolean isDefault;
+ private final boolean isSensitive;
+ private final boolean isReadOnly;
+
+ public ConfigEntry(String name, String value) {
+ this(name, value, false, false, false);
+ }
+
+ public ConfigEntry(String name, String value, boolean isDefault, boolean isSensitive, boolean isReadOnly) {
+ this.name = name;
+ this.value = value;
+ this.isDefault = isDefault;
+ this.isSensitive = isSensitive;
+ this.isReadOnly = isReadOnly;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public String value() {
+ return value;
+ }
+
+ public boolean isDefault() {
+ return isDefault;
+ }
+
+ public boolean isSensitive() {
+ return isSensitive;
+ }
+
+ public boolean isReadOnly() {
+ return isReadOnly;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/ConfigResource.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigResource.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigResource.java
new file mode 100644
index 0000000..61af4a8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigResource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+public final class ConfigResource {
+
+ public enum Type {
+ BROKER, TOPIC, UNKNOWN;
+ }
+
+ private final Type type;
+ private final String name;
+
+ public ConfigResource(Type type, String name) {
+ this.type = type;
+ this.name = name;
+ }
+
+ public Type type() {
+ return type;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ConfigResource that = (ConfigResource) o;
+
+ return type == that.type && name.equals(that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = type.hashCode();
+ result = 31 * result + name.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "ConfigResource{type=" + type + ", name='" + name + "'}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
new file mode 100644
index 0000000..f167bab
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for describeConfigs.
+ */
+@InterfaceStability.Unstable
+public class DescribeConfigsOptions {
+ private Integer timeoutMs = null;
+
+ public DescribeConfigsOptions timeoutMs(Integer timeoutMs) {
+ this.timeoutMs = timeoutMs;
+ return this;
+ }
+
+ public Integer timeoutMs() {
+ return timeoutMs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java
new file mode 100644
index 0000000..c29872a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+@InterfaceStability.Unstable
+public class DescribeConfigsResults {
+
+ private final Map<ConfigResource, KafkaFuture<Config>> futures;
+
+ DescribeConfigsResults(Map<ConfigResource, KafkaFuture<Config>> futures) {
+ this.futures = futures;
+ }
+
+ public Map<ConfigResource, KafkaFuture<Config>> results() {
+ return futures;
+ }
+
+ public KafkaFuture<Map<ConfigResource, Config>> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+ thenApply(new KafkaFuture.Function<Void, Map<ConfigResource, Config>>() {
+ @Override
+ public Map<ConfigResource, Config> apply(Void v) {
+ Map<ConfigResource, Config> configs = new HashMap<>(futures.size());
+ for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : futures.entrySet()) {
+ try {
+ configs.put(entry.getKey(), entry.getValue().get());
+ } catch (InterruptedException | ExecutionException e) {
+ // This should be unreachable, because allOf ensured that all the futures
+ // completed successfully.
+ throw new RuntimeException(e);
+ }
+ }
+ return configs;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 9f1b1b2..76919ee 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -53,6 +53,8 @@ import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AlterConfigsRequest;
+import org.apache.kafka.common.requests.AlterConfigsResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.CreateAclsRequest;
@@ -69,8 +71,13 @@ import org.apache.kafka.common.requests.DeleteTopicsRequest;
import org.apache.kafka.common.requests.DeleteTopicsResponse;
import org.apache.kafka.common.requests.DescribeAclsRequest;
import org.apache.kafka.common.requests.DescribeAclsResponse;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.Resource;
+import org.apache.kafka.common.requests.ResourceType;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
@@ -355,13 +362,26 @@ public class KafkaAdminClient extends AdminClient {
Node provide();
}
+ private class ConstantNodeIdProvider implements NodeProvider {
+ private final int nodeId;
+
+ ConstantNodeIdProvider(int nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public Node provide() {
+ return metadata.fetch().nodeById(nodeId);
+ }
+ }
+
/**
* Provides a constant node which is known at construction time.
*/
- private static class ConstantAdminNodeProvider implements NodeProvider {
+ private static class ConstantNodeProvider implements NodeProvider {
private final Node node;
- ConstantAdminNodeProvider(Node node) {
+ ConstantNodeProvider(Node node) {
this.node = node;
}
@@ -853,7 +873,7 @@ public class KafkaAdminClient extends AdminClient {
public void handleResponse(AbstractResponse abstractResponse) {
CreateTopicsResponse response = (CreateTopicsResponse) abstractResponse;
// Handle server responses for particular topics.
- for (Map.Entry<String, CreateTopicsResponse.Error> entry : response.errors().entrySet()) {
+ for (Map.Entry<String, ApiError> entry : response.errors().entrySet()) {
KafkaFutureImpl<Void> future = topicFutures.get(entry.getKey());
if (future == null) {
log.warn("Server response mentioned unknown topic {}", entry.getKey());
@@ -1071,7 +1091,7 @@ public class KafkaAdminClient extends AdminClient {
continue;
final KafkaFutureImpl<NodeApiVersions> nodeFuture = new KafkaFutureImpl<>();
nodeFutures.put(node, nodeFuture);
- runnable.call(new Call("apiVersions", deadlineMs, new ConstantAdminNodeProvider(node)) {
+ runnable.call(new Call("apiVersions", deadlineMs, new ConstantNodeProvider(node)) {
@Override
public AbstractRequest.Builder createRequest(int timeoutMs) {
return new ApiVersionsRequest.Builder();
@@ -1229,4 +1249,160 @@ public class KafkaAdminClient extends AdminClient {
}, now);
return new DeleteAclsResults(new HashMap<AclBindingFilter, KafkaFuture<FilterResults>>(futures));
}
+
+ @Override
+ public DescribeConfigsResults describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options) {
+ final Map<ConfigResource, KafkaFutureImpl<Config>> singleRequestFutures = new HashMap<>();
+ final Collection<Resource> singleRequestResources = new ArrayList<>(configResources.size());
+
+ final Map<ConfigResource, KafkaFutureImpl<Config>> brokerFutures = new HashMap<>(configResources.size());
+ final Collection<Resource> brokerResources = new ArrayList<>();
+
+ for (ConfigResource resource : configResources) {
+ if (resource.type() != ConfigResource.Type.BROKER) {
+ singleRequestFutures.put(resource, new KafkaFutureImpl<Config>());
+ singleRequestResources.add(configResourceToResource(resource));
+ } else {
+ brokerFutures.put(resource, new KafkaFutureImpl<Config>());
+ brokerResources.add(configResourceToResource(resource));
+ }
+ }
+
+ final long now = time.milliseconds();
+ runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.timeoutMs()),
+ new LeastLoadedNodeProvider()) {
+
+ @Override
+ AbstractRequest.Builder createRequest(int timeoutMs) {
+ return new DescribeConfigsRequest.Builder(singleRequestResources);
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ DescribeConfigsResponse response = (DescribeConfigsResponse) abstractResponse;
+ for (Map.Entry<ConfigResource, KafkaFutureImpl<Config>> entry : singleRequestFutures.entrySet()) {
+ ConfigResource configResource = entry.getKey();
+ KafkaFutureImpl<Config> future = entry.getValue();
+ DescribeConfigsResponse.Config config = response.config(configResourceToResource(configResource));
+ if (!config.error().is(Errors.NONE)) {
+ future.completeExceptionally(config.error().exception());
+ continue;
+ }
+ List<ConfigEntry> configEntries = new ArrayList<>();
+ for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) {
+ configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(),
+ configEntry.isDefault(), configEntry.isSensitive(), configEntry.isReadOnly()));
+ }
+ future.complete(new Config(configEntries));
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ completeAllExceptionally(singleRequestFutures.values(), throwable);
+ }
+ }, now);
+
+ for (Map.Entry<ConfigResource, KafkaFutureImpl<Config>> entry : brokerFutures.entrySet()) {
+ final KafkaFutureImpl<Config> brokerFuture = entry.getValue();
+ final Resource resource = configResourceToResource(entry.getKey());
+ int nodeId = Integer.parseInt(resource.name());
+ runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.timeoutMs()),
+ new ConstantNodeIdProvider(nodeId)) {
+
+ @Override
+ AbstractRequest.Builder createRequest(int timeoutMs) {
+ return new DescribeConfigsRequest.Builder(Collections.singleton(resource));
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ DescribeConfigsResponse response = (DescribeConfigsResponse) abstractResponse;
+ DescribeConfigsResponse.Config config = response.configs().get(resource);
+
+ if (!config.error().is(Errors.NONE))
+ brokerFuture.completeExceptionally(config.error().exception());
+ else {
+ List<ConfigEntry> configEntries = new ArrayList<>();
+ for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) {
+ configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(),
+ configEntry.isDefault(), configEntry.isSensitive(), configEntry.isReadOnly()));
+ }
+ brokerFuture.complete(new Config(configEntries));
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ completeAllExceptionally(singleRequestFutures.values(), throwable);
+ }
+ }, now);
+ }
+
+ Map<ConfigResource, KafkaFutureImpl<Config>> allFutures = new HashMap<>(configResources.size());
+ allFutures.putAll(singleRequestFutures);
+ allFutures.putAll(brokerFutures);
+ return new DescribeConfigsResults(new HashMap<ConfigResource, KafkaFuture<Config>>(allFutures));
+ }
+
+ private Resource configResourceToResource(ConfigResource configResource) {
+ ResourceType resourceType;
+ switch (configResource.type()) {
+ case TOPIC:
+ resourceType = ResourceType.TOPIC;
+ break;
+ case BROKER:
+ resourceType = ResourceType.BROKER;
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected resource type " + configResource.type());
+ }
+ return new Resource(resourceType, configResource.name());
+ }
+
+ @Override
+ public AlterConfigsResults alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options) {
+ final Map<ConfigResource, KafkaFutureImpl<Void>> futures = new HashMap<>(configs.size());
+ for (ConfigResource configResource : configs.keySet()) {
+ futures.put(configResource, new KafkaFutureImpl<Void>());
+ }
+ final Map<Resource, AlterConfigsRequest.Config> requestMap = new HashMap<>(configs.size());
+ for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) {
+ List<AlterConfigsRequest.ConfigEntry> configEntries = new ArrayList<>();
+ for (ConfigEntry configEntry: entry.getValue().entries())
+ configEntries.add(new AlterConfigsRequest.ConfigEntry(configEntry.name(), configEntry.value()));
+ ConfigResource resource = entry.getKey();
+ requestMap.put(configResourceToResource(resource), new AlterConfigsRequest.Config(configEntries));
+ }
+
+ final long now = time.milliseconds();
+ runnable.call(new Call("alterConfigs", calcDeadlineMs(now, options.timeoutMs()),
+ new LeastLoadedNodeProvider()) {
+
+ @Override
+ public AbstractRequest.Builder createRequest(int timeoutMs) {
+ return new AlterConfigsRequest.Builder(requestMap, options.isValidateOnly());
+ }
+
+ @Override
+ public void handleResponse(AbstractResponse abstractResponse) {
+ AlterConfigsResponse response = (AlterConfigsResponse) abstractResponse;
+ for (Map.Entry<ConfigResource, KafkaFutureImpl<Void>> entry : futures.entrySet()) {
+ KafkaFutureImpl<Void> future = entry.getValue();
+ ApiException exception = response.errors().get(configResourceToResource(entry.getKey())).exception();
+ if (exception != null) {
+ future.completeExceptionally(exception);
+ } else {
+ future.complete(null);
+ }
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ completeAllExceptionally(futures.values(), throwable);
+ }
+ }, now);
+ return new AlterConfigsResults(new HashMap<ConfigResource, KafkaFuture<Void>>(futures));
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java b/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java
index 66a91e3..ca4fa0a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java
@@ -48,7 +48,12 @@ public enum ResourceType {
/**
* The cluster as a whole.
*/
- CLUSTER((byte) 4);
+ CLUSTER((byte) 4),
+
+ /**
+ * A broker.
+ */
+ BROKER((byte) 5);
private final static HashMap<Byte, ResourceType> CODE_TO_VALUE = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index dc7fd7c..d2b6d34 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -126,6 +126,13 @@ public class AbstractConfig {
return (String) get(key);
}
+ public ConfigDef.Type typeOf(String key) {
+ ConfigDef.ConfigKey configKey = definition.configKeys().get(key);
+ if (configKey == null)
+ return null;
+ return configKey.type;
+ }
+
public Password getPassword(String key) {
return (Password) get(key);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/errors/BrokerAuthorizationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/BrokerAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/BrokerAuthorizationException.java
new file mode 100644
index 0000000..9f7211e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/BrokerAuthorizationException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class BrokerAuthorizationException extends ApiException {
+ public BrokerAuthorizationException(final String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 709d927..36f6403 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -65,7 +65,9 @@ public enum ApiKeys {
TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false),
DESCRIBE_ACLS(29, "DescribeAcls", false),
CREATE_ACLS(30, "CreateAcls", false),
- DELETE_ACLS(31, "DeleteAcls", false);
+ DELETE_ACLS(31, "DeleteAcls", false),
+ DESCRIBE_CONFIGS(32, "DescribeConfigs", false),
+ ALTER_CONFIGS(33, "AlterConfigs", false);
private static final ApiKeys[] ID_TO_TYPE;
private static final int MIN_API_KEY = 0;
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index c15edc1..db94b2c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.protocol;
import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.BrokerAuthorizationException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.ConcurrentTransactionsException;
@@ -489,13 +490,18 @@ public enum Errors {
return new ProducerIdAuthorizationException(message);
}
}),
- SECURITY_DISABLED(55, "Security features are disabled.",
- new ApiExceptionBuilder() {
- @Override
- public ApiException build(String message) {
- return new SecurityDisabledException(message);
- }
- });
+ SECURITY_DISABLED(55, "Security features are disabled.", new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new SecurityDisabledException(message);
+ }
+ }),
+ BROKER_AUTHORIZATION_FAILED(56, "Broker authorization failed", new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new BrokerAuthorizationException(message);
+ }
+ });
private interface ApiExceptionBuilder {
ApiException build(String message);
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index e970eb1..d5ce469 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -142,9 +142,8 @@ public class Protocol {
"The broker id of the controller broker."),
new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));
-
- public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3};
- public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3};
+ public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3};
+ public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3};
/* Produce api */
@@ -227,8 +226,8 @@ public class Protocol {
newThrottleTimeField());
public static final Schema PRODUCE_RESPONSE_V3 = PRODUCE_RESPONSE_V2;
- public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3};
- public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3};
+ public static final Schema[] PRODUCE_REQUEST = {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3};
+ public static final Schema[] PRODUCE_RESPONSE = {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3};
/* Offset commit api */
public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
@@ -337,7 +336,7 @@ public class Protocol {
public static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
- public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2, OFFSET_COMMIT_REQUEST_V3};
+ public static final Schema[] OFFSET_COMMIT_REQUEST = {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2, OFFSET_COMMIT_REQUEST_V3};
/* The response types for V0, V1 and V2 of OFFSET_COMMIT_REQUEST are the same. */
public static final Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0;
@@ -348,7 +347,7 @@ public class Protocol {
new Field("responses",
new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
- public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2, OFFSET_COMMIT_RESPONSE_V3};
+ public static final Schema[] OFFSET_COMMIT_RESPONSE = {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2, OFFSET_COMMIT_RESPONSE_V3};
/* Offset fetch api */
@@ -423,8 +422,8 @@ public class Protocol {
new Field("error_code",
INT16));
- public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1, OFFSET_FETCH_REQUEST_V2, OFFSET_FETCH_REQUEST_V3};
- public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1, OFFSET_FETCH_RESPONSE_V2, OFFSET_FETCH_RESPONSE_V3};
+ public static final Schema[] OFFSET_FETCH_REQUEST = {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1, OFFSET_FETCH_REQUEST_V2, OFFSET_FETCH_REQUEST_V3};
+ public static final Schema[] OFFSET_FETCH_RESPONSE = {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1, OFFSET_FETCH_RESPONSE_V2, OFFSET_FETCH_RESPONSE_V3};
/* List offset api */
public static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
@@ -520,8 +519,8 @@ public class Protocol {
new Field("responses",
new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V1)));
- public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2};
- public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, LIST_OFFSET_RESPONSE_V2};
+ public static final Schema[] LIST_OFFSET_REQUEST = {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2};
+ public static final Schema[] LIST_OFFSET_RESPONSE = {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, LIST_OFFSET_RESPONSE_V2};
/* Fetch api */
public static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
@@ -748,8 +747,8 @@ public class Protocol {
newThrottleTimeField(),
new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
- public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5};
- public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5};
+ public static final Schema[] FETCH_REQUEST = {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5};
+ public static final Schema[] FETCH_RESPONSE = {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5};
/* List groups api */
public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();
@@ -766,8 +765,8 @@ public class Protocol {
new Field("error_code", INT16),
new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
- public static final Schema[] LIST_GROUPS_REQUEST = new Schema[] {LIST_GROUPS_REQUEST_V0, LIST_GROUPS_REQUEST_V1};
- public static final Schema[] LIST_GROUPS_RESPONSE = new Schema[] {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1};
+ public static final Schema[] LIST_GROUPS_REQUEST = {LIST_GROUPS_REQUEST_V0, LIST_GROUPS_REQUEST_V1};
+ public static final Schema[] LIST_GROUPS_RESPONSE = {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1};
/* Describe group api */
public static final Schema DESCRIBE_GROUPS_REQUEST_V0 = new Schema(new Field("group_ids",
@@ -814,8 +813,8 @@ public class Protocol {
newThrottleTimeField(),
new Field("groups", new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
- public static final Schema[] DESCRIBE_GROUPS_REQUEST = new Schema[] {DESCRIBE_GROUPS_REQUEST_V0, DESCRIBE_GROUPS_REQUEST_V1};
- public static final Schema[] DESCRIBE_GROUPS_RESPONSE = new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0, DESCRIBE_GROUPS_RESPONSE_V1};
+ public static final Schema[] DESCRIBE_GROUPS_REQUEST = {DESCRIBE_GROUPS_REQUEST_V0, DESCRIBE_GROUPS_REQUEST_V1};
+ public static final Schema[] DESCRIBE_GROUPS_RESPONSE = {DESCRIBE_GROUPS_RESPONSE_V0, DESCRIBE_GROUPS_RESPONSE_V1};
/* Find coordinator api */
public static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(
@@ -853,8 +852,8 @@ public class Protocol {
"Host and port information for the coordinator for a consumer group."));
- public static final Schema[] FIND_COORDINATOR_REQUEST = new Schema[] {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1};
- public static final Schema[] FIND_COORDINATOR_RESPONSE = new Schema[] {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1};
+ public static final Schema[] FIND_COORDINATOR_REQUEST = {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1};
+ public static final Schema[] FIND_COORDINATOR_RESPONSE = {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1};
/* Controlled shutdown api */
public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = new Schema(new Field("broker_id",
@@ -872,8 +871,8 @@ public class Protocol {
"The partitions that the broker still leads."));
/* V0 is not supported as it would require changes to the request header not to include `clientId` */
- public static final Schema[] CONTROLLED_SHUTDOWN_REQUEST = new Schema[] {null, CONTROLLED_SHUTDOWN_REQUEST_V1};
- public static final Schema[] CONTROLLED_SHUTDOWN_RESPONSE = new Schema[] {null, CONTROLLED_SHUTDOWN_RESPONSE_V1};
+ public static final Schema[] CONTROLLED_SHUTDOWN_REQUEST = {null, CONTROLLED_SHUTDOWN_REQUEST_V1};
+ public static final Schema[] CONTROLLED_SHUTDOWN_RESPONSE = {null, CONTROLLED_SHUTDOWN_RESPONSE_V1};
/* Join group api */
public static final Schema JOIN_GROUP_REQUEST_PROTOCOL_V0 = new Schema(new Field("protocol_name", STRING),
@@ -937,6 +936,7 @@ public class Protocol {
new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
public static final Schema JOIN_GROUP_RESPONSE_V1 = JOIN_GROUP_RESPONSE_V0;
+
public static final Schema JOIN_GROUP_RESPONSE_V2 = new Schema(
newThrottleTimeField(),
new Field("error_code", INT16),
@@ -956,8 +956,8 @@ public class Protocol {
new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
- public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2};
- public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2};
+ public static final Schema[] JOIN_GROUP_REQUEST = {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2};
+ public static final Schema[] JOIN_GROUP_RESPONSE = {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2};
/* SyncGroup api */
public static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema(new Field("member_id", STRING),
@@ -976,8 +976,8 @@ public class Protocol {
newThrottleTimeField(),
new Field("error_code", INT16),
new Field("member_assignment", BYTES));
- public static final Schema[] SYNC_GROUP_REQUEST = new Schema[] {SYNC_GROUP_REQUEST_V0, SYNC_GROUP_REQUEST_V1};
- public static final Schema[] SYNC_GROUP_RESPONSE = new Schema[] {SYNC_GROUP_RESPONSE_V0, SYNC_GROUP_RESPONSE_V1};
+ public static final Schema[] SYNC_GROUP_REQUEST = {SYNC_GROUP_REQUEST_V0, SYNC_GROUP_REQUEST_V1};
+ public static final Schema[] SYNC_GROUP_RESPONSE = {SYNC_GROUP_RESPONSE_V0, SYNC_GROUP_RESPONSE_V1};
/* Heartbeat api */
public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."),
@@ -996,8 +996,8 @@ public class Protocol {
newThrottleTimeField(),
new Field("error_code", INT16));
- public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0, HEARTBEAT_REQUEST_V1};
- public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0, HEARTBEAT_RESPONSE_V1};
+ public static final Schema[] HEARTBEAT_REQUEST = {HEARTBEAT_REQUEST_V0, HEARTBEAT_REQUEST_V1};
+ public static final Schema[] HEARTBEAT_RESPONSE = {HEARTBEAT_RESPONSE_V0, HEARTBEAT_RESPONSE_V1};
/* Leave group api */
public static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."),
@@ -1013,8 +1013,8 @@ public class Protocol {
newThrottleTimeField(),
new Field("error_code", INT16));
- public static final Schema[] LEAVE_GROUP_REQUEST = new Schema[] {LEAVE_GROUP_REQUEST_V0, LEAVE_GROUP_REQUEST_V1};
- public static final Schema[] LEAVE_GROUP_RESPONSE = new Schema[] {LEAVE_GROUP_RESPONSE_V0, LEAVE_GROUP_RESPONSE_V1};
+ public static final Schema[] LEAVE_GROUP_REQUEST = {LEAVE_GROUP_REQUEST_V0, LEAVE_GROUP_REQUEST_V1};
+ public static final Schema[] LEAVE_GROUP_RESPONSE = {LEAVE_GROUP_RESPONSE_V0, LEAVE_GROUP_RESPONSE_V1};
/* Leader and ISR api */
public static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0 =
@@ -1046,8 +1046,8 @@ public class Protocol {
new Field("partitions",
new ArrayOf(LEADER_AND_ISR_RESPONSE_PARTITION_V0)));
- public static final Schema[] LEADER_AND_ISR_REQUEST = new Schema[] {LEADER_AND_ISR_REQUEST_V0};
- public static final Schema[] LEADER_AND_ISR_RESPONSE = new Schema[] {LEADER_AND_ISR_RESPONSE_V0};
+ public static final Schema[] LEADER_AND_ISR_REQUEST = {LEADER_AND_ISR_REQUEST_V0};
+ public static final Schema[] LEADER_AND_ISR_RESPONSE = {LEADER_AND_ISR_RESPONSE_V0};
/* Replica api */
public static final Schema STOP_REPLICA_REQUEST_PARTITION_V0 = new Schema(new Field("topic", STRING, "Topic name."),
@@ -1068,8 +1068,8 @@ public class Protocol {
new Field("partitions",
new ArrayOf(STOP_REPLICA_RESPONSE_PARTITION_V0)));
- public static final Schema[] STOP_REPLICA_REQUEST = new Schema[] {STOP_REPLICA_REQUEST_V0};
- public static final Schema[] STOP_REPLICA_RESPONSE = new Schema[] {STOP_REPLICA_RESPONSE_V0};
+ public static final Schema[] STOP_REPLICA_REQUEST = {STOP_REPLICA_REQUEST_V0};
+ public static final Schema[] STOP_REPLICA_RESPONSE = {STOP_REPLICA_RESPONSE_V0};
/* Update metadata api */
@@ -1148,9 +1148,9 @@ public class Protocol {
public static final Schema UPDATE_METADATA_RESPONSE_V3 = UPDATE_METADATA_RESPONSE_V2;
- public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1,
+ public static final Schema[] UPDATE_METADATA_REQUEST = {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1,
UPDATE_METADATA_REQUEST_V2, UPDATE_METADATA_REQUEST_V3};
- public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1,
+ public static final Schema[] UPDATE_METADATA_RESPONSE = {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1,
UPDATE_METADATA_RESPONSE_V2, UPDATE_METADATA_RESPONSE_V3};
/* SASL handshake api */
@@ -1161,8 +1161,8 @@ public class Protocol {
new Field("error_code", INT16),
new Field("enabled_mechanisms", new ArrayOf(Type.STRING), "Array of mechanisms enabled in the server."));
- public static final Schema[] SASL_HANDSHAKE_REQUEST = new Schema[] {SASL_HANDSHAKE_REQUEST_V0};
- public static final Schema[] SASL_HANDSHAKE_RESPONSE = new Schema[] {SASL_HANDSHAKE_RESPONSE_V0};
+ public static final Schema[] SASL_HANDSHAKE_REQUEST = {SASL_HANDSHAKE_REQUEST_V0};
+ public static final Schema[] SASL_HANDSHAKE_RESPONSE = {SASL_HANDSHAKE_RESPONSE_V0};
/* ApiVersion api */
public static final Schema API_VERSIONS_REQUEST_V0 = new Schema();
@@ -1185,8 +1185,8 @@ public class Protocol {
public static final Schema[] API_VERSIONS_RESPONSE = new Schema[]{API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1};
/* Admin requests common */
- public static final Schema CONFIG_ENTRY = new Schema(new Field("config_key", STRING, "Configuration key name"),
- new Field("config_value", STRING, "Configuration value"));
+ public static final Schema CONFIG_ENTRY = new Schema(new Field("config_name", STRING, "Configuration name"),
+ new Field("config_value", NULLABLE_STRING, "Configuration value"));
public static final Schema PARTITION_REPLICA_ASSIGNMENT_ENTRY = new Schema(
new Field("partition_id", INT32),
@@ -1212,7 +1212,7 @@ public class Protocol {
new Field("replica_assignment",
new ArrayOf(PARTITION_REPLICA_ASSIGNMENT_ENTRY),
"Replica assignment among kafka brokers for this topic partitions. If this is set num_partitions and replication_factor must be unset."),
- new Field("configs",
+ new Field("config_entries",
new ArrayOf(CONFIG_ENTRY),
"Topic level configuration for topic to be set."));
@@ -1254,8 +1254,8 @@ public class Protocol {
new ArrayOf(TOPIC_ERROR),
"An array of per topic errors."));
- public static final Schema[] CREATE_TOPICS_REQUEST = new Schema[] {CREATE_TOPICS_REQUEST_V0, CREATE_TOPICS_REQUEST_V1, CREATE_TOPICS_REQUEST_V2};
- public static final Schema[] CREATE_TOPICS_RESPONSE = new Schema[] {CREATE_TOPICS_RESPONSE_V0, CREATE_TOPICS_RESPONSE_V1, CREATE_TOPICS_RESPONSE_V2};
+ public static final Schema[] CREATE_TOPICS_REQUEST = {CREATE_TOPICS_REQUEST_V0, CREATE_TOPICS_REQUEST_V1, CREATE_TOPICS_REQUEST_V2};
+ public static final Schema[] CREATE_TOPICS_RESPONSE = {CREATE_TOPICS_RESPONSE_V0, CREATE_TOPICS_RESPONSE_V1, CREATE_TOPICS_RESPONSE_V2};
/* DeleteTopic api */
public static final Schema DELETE_TOPICS_REQUEST_V0 = new Schema(
@@ -1278,8 +1278,8 @@ public class Protocol {
new ArrayOf(TOPIC_ERROR_CODE),
"An array of per topic error codes."));
- public static final Schema[] DELETE_TOPICS_REQUEST = new Schema[] {DELETE_TOPICS_REQUEST_V0, DELETE_TOPICS_REQUEST_V1};
- public static final Schema[] DELETE_TOPICS_RESPONSE = new Schema[] {DELETE_TOPICS_RESPONSE_V0, DELETE_TOPICS_RESPONSE_V1};
+ public static final Schema[] DELETE_TOPICS_REQUEST = {DELETE_TOPICS_REQUEST_V0, DELETE_TOPICS_REQUEST_V1};
+ public static final Schema[] DELETE_TOPICS_RESPONSE = {DELETE_TOPICS_RESPONSE_V0, DELETE_TOPICS_RESPONSE_V1};
public static final Schema DELETE_RECORDS_REQUEST_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."),
new Field("offset", INT64, "The offset before which the messages will be deleted."));
@@ -1301,8 +1301,8 @@ public class Protocol {
newThrottleTimeField(),
new Field("topics", new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0)));
- public static final Schema[] DELETE_RECORDS_REQUEST = new Schema[] {DELETE_RECORDS_REQUEST_V0};
- public static final Schema[] DELETE_RECORDS_RESPONSE = new Schema[] {DELETE_RECORDS_RESPONSE_V0};
+ public static final Schema[] DELETE_RECORDS_REQUEST = {DELETE_RECORDS_REQUEST_V0};
+ public static final Schema[] DELETE_RECORDS_RESPONSE = {DELETE_RECORDS_RESPONSE_V0};
/* Transactions API */
public static final Schema INIT_PRODUCER_ID_REQUEST_V0 = new Schema(
@@ -1327,9 +1327,9 @@ public class Protocol {
"The epoch for the producer id. Will always be 0 if no transactional id was specified in the request.")
);
- public static final Schema[] INIT_PRODUCER_ID_REQUEST = new Schema[] {INIT_PRODUCER_ID_REQUEST_V0};
+ public static final Schema[] INIT_PRODUCER_ID_REQUEST = {INIT_PRODUCER_ID_REQUEST_V0};
- public static final Schema[] INIT_PRODUCER_ID_RESPONSE = new Schema[] {INIT_PRODUCER_ID_RESPONSE_V0};
+ public static final Schema[] INIT_PRODUCER_ID_RESPONSE = {INIT_PRODUCER_ID_RESPONSE_V0};
/* Offsets for Leader Epoch api */
public static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_PARTITION_V0 = new Schema(
@@ -1378,8 +1378,8 @@ public class Protocol {
new ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V0),
"An array of topics for which we have leader offsets for some requested Partition Leader Epoch"));
- public static final Schema[] OFFSET_FOR_LEADER_EPOCH_REQUEST = new Schema[] {OFFSET_FOR_LEADER_EPOCH_REQUEST_V0};
- public static final Schema[] OFFSET_FOR_LEADER_EPOCH_RESPONSE = new Schema[] {OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0};
+ public static final Schema[] OFFSET_FOR_LEADER_EPOCH_REQUEST = {OFFSET_FOR_LEADER_EPOCH_REQUEST_V0};
+ public static final Schema[] OFFSET_FOR_LEADER_EPOCH_RESPONSE = {OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0};
public static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema(
new Field("transactional_id",
@@ -1408,8 +1408,8 @@ public class Protocol {
INT16)))))))
);
- public static final Schema[] ADD_PARTITIONS_TO_TXN_REQUEST = new Schema[] {ADD_PARTITIONS_TO_TXN_REQUEST_V0};
- public static final Schema[] ADD_PARTITIONS_TO_TXN_RESPONSE = new Schema[] {ADD_PARTITIONS_TO_TXN_RESPONSE_V0};
+ public static final Schema[] ADD_PARTITIONS_TO_TXN_REQUEST = {ADD_PARTITIONS_TO_TXN_REQUEST_V0};
+ public static final Schema[] ADD_PARTITIONS_TO_TXN_RESPONSE = {ADD_PARTITIONS_TO_TXN_RESPONSE_V0};
public static final Schema ADD_OFFSETS_TO_TXN_REQUEST_V0 = new Schema(
new Field("transactional_id",
@@ -1432,8 +1432,8 @@ public class Protocol {
"An integer error code.")
);
- public static final Schema[] ADD_OFFSETS_TO_TXN_REQUEST = new Schema[] {ADD_OFFSETS_TO_TXN_REQUEST_V0};
- public static final Schema[] ADD_OFFSETS_TO_TXN_RESPONSE = new Schema[] {ADD_OFFSETS_TO_TXN_RESPONSE_V0};
+ public static final Schema[] ADD_OFFSETS_TO_TXN_REQUEST = {ADD_OFFSETS_TO_TXN_REQUEST_V0};
+ public static final Schema[] ADD_OFFSETS_TO_TXN_RESPONSE = {ADD_OFFSETS_TO_TXN_RESPONSE_V0};
public static final Schema END_TXN_REQUEST_V0 = new Schema(
new Field("transactional_id",
@@ -1457,8 +1457,8 @@ public class Protocol {
"An integer error code.")
);
- public static final Schema[] END_TXN_REQUEST = new Schema[] {END_TXN_REQUEST_V0};
- public static final Schema[] END_TXN_RESPONSE = new Schema[] {END_TXN_RESPONSE_V0};
+ public static final Schema[] END_TXN_REQUEST = {END_TXN_REQUEST_V0};
+ public static final Schema[] END_TXN_RESPONSE = {END_TXN_RESPONSE_V0};
public static final Schema WRITE_TXN_MARKERS_ENTRY_V0 = new Schema(
new Field("producer_id",
@@ -1506,8 +1506,8 @@ public class Protocol {
new Field("transaction_markers", new ArrayOf(WRITE_TXN_MARKERS_ENTRY_RESPONSE_V0), "Errors per partition from writing markers.")
);
- public static final Schema[] WRITE_TXN_REQUEST = new Schema[] {WRITE_TXN_MARKERS_REQUEST_V0};
- public static final Schema[] WRITE_TXN_RESPONSE = new Schema[] {WRITE_TXN_MARKERS_RESPONSE_V0};
+ public static final Schema[] WRITE_TXN_REQUEST = {WRITE_TXN_MARKERS_REQUEST_V0};
+ public static final Schema[] WRITE_TXN_RESPONSE = {WRITE_TXN_MARKERS_RESPONSE_V0};
public static final Schema TXN_OFFSET_COMMIT_PARTITION_OFFSET_METADATA_REQUEST_V0 = new Schema(
new Field("partition", INT32),
@@ -1546,8 +1546,66 @@ public class Protocol {
"Errors per partition from writing markers.")
);
- public static final Schema[] TXN_OFFSET_COMMIT_REQUEST = new Schema[] {TXN_OFFSET_COMMIT_REQUEST_V0};
- public static final Schema[] TXN_OFFSET_COMMIT_RESPONSE = new Schema[] {TXN_OFFSET_COMMIT_RESPONSE_V0};
+ public static final Schema[] TXN_OFFSET_COMMIT_REQUEST = {TXN_OFFSET_COMMIT_REQUEST_V0};
+ public static final Schema[] TXN_OFFSET_COMMIT_RESPONSE = {TXN_OFFSET_COMMIT_RESPONSE_V0};
+
+ /* DescribeConfigs API */
+
+ public static final Schema DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0 = new Schema(
+ new Field("resource_type", INT8),
+ new Field("resource_name", STRING),
+ new Field("config_names", ArrayOf.nullable(STRING))
+ );
+
+ public static final Schema DESCRIBE_CONFIGS_REQUEST_V0 = new Schema(
+ new Field("resources", new ArrayOf(DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0),
+ "An array of config resources to be returned."));
+
+ public static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0 = new Schema(
+ new Field("error_code", INT16),
+ new Field("error_message", NULLABLE_STRING),
+ new Field("resource_type", INT8),
+ new Field("resource_name", STRING),
+ new Field("config_entries", new ArrayOf(new Schema(
+ new Field("config_name", STRING),
+ new Field("config_value", NULLABLE_STRING),
+ new Field("read_only", BOOLEAN),
+ new Field("is_default", BOOLEAN),
+ new Field("is_sensitive", BOOLEAN)
+ ))
+ ));
+
+ public static final Schema DESCRIBE_CONFIGS_RESPONSE_V0 = new Schema(
+ newThrottleTimeField(),
+ new Field("resources", new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0)));
+
+ public static final Schema[] DESCRIBE_CONFIGS_REQUEST = {DESCRIBE_CONFIGS_REQUEST_V0};
+ public static final Schema[] DESCRIBE_CONFIGS_RESPONSE = {DESCRIBE_CONFIGS_RESPONSE_V0};
+
+ /* AlterConfigs API */
+
+ public static final Schema ALTER_CONFIGS_REQUEST_RESOURCE_V0 = new Schema(
+ new Field("resource_type", INT8),
+ new Field("resource_name", STRING),
+ new Field("config_entries", new ArrayOf(CONFIG_ENTRY)));
+
+ public static final Schema ALTER_CONFIGS_REQUEST_V0 = new Schema(
+ new Field("resources", new ArrayOf(ALTER_CONFIGS_REQUEST_RESOURCE_V0),
+ "An array of resources to update with the provided configs."),
+ new Field("validate_only", BOOLEAN));
+
+ public static final Schema ALTER_CONFIGS_RESPONSE_ENTITY_V0 = new Schema(
+ new Field("error_code", INT16),
+ new Field("error_message", NULLABLE_STRING),
+ new Field("resource_type", INT8),
+ new Field("resource_name", STRING));
+
+ public static final Schema ALTER_CONFIGS_RESPONSE_V0 = new Schema(
+ newThrottleTimeField(),
+ new Field("resources", new ArrayOf(ALTER_CONFIGS_RESPONSE_ENTITY_V0)));
+
+ public static final Schema[] ALTER_CONFIGS_REQUEST = {ALTER_CONFIGS_REQUEST_V0};
+ public static final Schema[] ALTER_CONFIGS_RESPONSE = {ALTER_CONFIGS_RESPONSE_V0};
public static final Schema DESCRIBE_ACLS_REQUEST_V0 = new Schema(
new Field("resource_type", INT8, "The filter resource type."),
@@ -1675,6 +1733,8 @@ public class Protocol {
REQUESTS[ApiKeys.DESCRIBE_ACLS.id] = DESCRIBE_ACLS_REQUEST;
REQUESTS[ApiKeys.CREATE_ACLS.id] = CREATE_ACLS_REQUEST;
REQUESTS[ApiKeys.DELETE_ACLS.id] = DELETE_ACLS_REQUEST;
+ REQUESTS[ApiKeys.DESCRIBE_CONFIGS.id] = DESCRIBE_CONFIGS_REQUEST;
+ REQUESTS[ApiKeys.ALTER_CONFIGS.id] = ALTER_CONFIGS_REQUEST;
RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -1708,6 +1768,8 @@ public class Protocol {
RESPONSES[ApiKeys.DESCRIBE_ACLS.id] = DESCRIBE_ACLS_RESPONSE;
RESPONSES[ApiKeys.CREATE_ACLS.id] = CREATE_ACLS_RESPONSE;
RESPONSES[ApiKeys.DELETE_ACLS.id] = DELETE_ACLS_RESPONSE;
+ RESPONSES[ApiKeys.DESCRIBE_CONFIGS.id] = DESCRIBE_CONFIGS_RESPONSE;
+ RESPONSES[ApiKeys.ALTER_CONFIGS.id] = ALTER_CONFIGS_RESPONSE;
/* set the minimum and maximum version of each api */
for (ApiKeys api : ApiKeys.values()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index f275ada..fbb520c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -56,8 +56,7 @@ public class Schema extends Type {
Object value = field.type().validate(r.get(field));
field.type.write(buffer, value);
} catch (Exception e) {
- throw new SchemaException("Error writing field '" + field.name +
- "': " +
+ throw new SchemaException("Error writing field '" + field.name + "': " +
(e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
}
}
@@ -73,8 +72,7 @@ public class Schema extends Type {
try {
objects[i] = fields[i].type.read(buffer);
} catch (Exception e) {
- throw new SchemaException("Error reading field '" + fields[i].name +
- "': " +
+ throw new SchemaException("Error reading field '" + fields[i].name + "': " +
(e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
}
}
@@ -88,8 +86,14 @@ public class Schema extends Type {
public int sizeOf(Object o) {
int size = 0;
Struct r = (Struct) o;
- for (Field field : fields)
- size += field.type.sizeOf(r.get(field));
+ for (Field field : fields) {
+ try {
+ size += field.type.sizeOf(r.get(field));
+ } catch (Exception e) {
+ throw new SchemaException("Error computing size for field '" + field.name + "': " +
+ (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
+ }
+ }
return size;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 16c0c21..2cd88e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -208,6 +208,12 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
case DELETE_ACLS:
request = new DeleteAclsRequest(struct, version);
break;
+ case DESCRIBE_CONFIGS:
+ request = new DescribeConfigsRequest(struct, version);
+ break;
+ case ALTER_CONFIGS:
+ request = new AlterConfigsRequest(struct, version);
+ break;
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
"code should be updated to do so.", apiKey));
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index aee4f5e..1000ef5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -114,6 +114,10 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
return new CreateAclsResponse(struct);
case DELETE_ACLS:
return new DeleteAclsResponse(struct);
+ case DESCRIBE_CONFIGS:
+ return new DescribeConfigsResponse(struct);
+ case ALTER_CONFIGS:
+ return new AlterConfigsResponse(struct);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `getResponse`, the " +
"code should be updated to do so.", apiKey));
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
new file mode 100644
index 0000000..a964f85
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AlterConfigsRequest extends AbstractRequest {
+
+ private static final String RESOURCES_KEY_NAME = "resources";
+ private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
+ private static final String RESOURCE_NAME_KEY_NAME = "resource_name";
+ private static final String VALIDATE_ONLY_KEY_NAME = "validate_only";
+
+ private static final String CONFIG_ENTRIES_KEY_NAME = "config_entries";
+ private static final String CONFIG_NAME = "config_name";
+ private static final String CONFIG_VALUE = "config_value";
+
+ public static class Config {
+ private final Collection<ConfigEntry> entries;
+
+ public Config(Collection<ConfigEntry> entries) {
+ this.entries = entries;
+ }
+
+ public Collection<ConfigEntry> entries() {
+ return entries;
+ }
+ }
+
+ public static class ConfigEntry {
+ private final String name;
+ private final String value;
+
+ public ConfigEntry(String name, String value) {
+ this.name = name;
+ this.value = value;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public String value() {
+ return value;
+ }
+
+ }
+
+ public static class Builder extends AbstractRequest.Builder {
+
+ private final Map<Resource, Config> configs;
+ private final boolean validateOnly;
+
+ public Builder(Map<Resource, Config> configs, boolean validateOnly) {
+ super(ApiKeys.ALTER_CONFIGS);
+ this.configs = configs;
+ this.validateOnly = validateOnly;
+ }
+
+ @Override
+ public AlterConfigsRequest build(short version) {
+ return new AlterConfigsRequest(version, configs, validateOnly);
+ }
+ }
+
+ private final Map<Resource, Config> configs;
+ private final boolean validateOnly;
+
+ public AlterConfigsRequest(short version, Map<Resource, Config> configs, boolean validateOnly) {
+ super(version);
+ this.configs = configs;
+ this.validateOnly = validateOnly;
+ }
+
+ public AlterConfigsRequest(Struct struct, short version) {
+ super(version);
+ validateOnly = struct.getBoolean(VALIDATE_ONLY_KEY_NAME);
+ Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME);
+ configs = new HashMap<>(resourcesArray.length);
+ for (Object resourcesObj : resourcesArray) {
+ Struct resourcesStruct = (Struct) resourcesObj;
+
+ ResourceType resourceType = ResourceType.forId(resourcesStruct.getByte(RESOURCE_TYPE_KEY_NAME));
+ String resourceName = resourcesStruct.getString(RESOURCE_NAME_KEY_NAME);
+ Resource resource = new Resource(resourceType, resourceName);
+
+ Object[] configEntriesArray = resourcesStruct.getArray(CONFIG_ENTRIES_KEY_NAME);
+ List<ConfigEntry> configEntries = new ArrayList<>(configEntriesArray.length);
+ for (Object configEntriesObj: configEntriesArray) {
+ Struct configEntriesStruct = (Struct) configEntriesObj;
+ String configName = configEntriesStruct.getString(CONFIG_NAME);
+ String configValue = configEntriesStruct.getString(CONFIG_VALUE);
+ configEntries.add(new ConfigEntry(configName, configValue));
+ }
+ Config config = new Config(configEntries);
+ configs.put(resource, config);
+ }
+ }
+
+ public Map<Resource, Config> configs() {
+ return configs;
+ }
+
+ public boolean validateOnly() {
+ return validateOnly;
+ }
+
+ @Override
+ protected Struct toStruct() {
+ Struct struct = new Struct(ApiKeys.ALTER_CONFIGS.requestSchema(version()));
+ struct.set(VALIDATE_ONLY_KEY_NAME, validateOnly);
+ List<Struct> resourceStructs = new ArrayList<>(configs.size());
+ for (Map.Entry<Resource, Config> entry : configs.entrySet()) {
+ Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME);
+
+ Resource resource = entry.getKey();
+ resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id());
+ resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name());
+
+ Config config = entry.getValue();
+ List<Struct> configEntryStructs = new ArrayList<>(config.entries.size());
+ for (ConfigEntry configEntry : config.entries) {
+ Struct configEntriesStruct = resourceStruct.instance(CONFIG_ENTRIES_KEY_NAME);
+ configEntriesStruct.set(CONFIG_NAME, configEntry.name);
+ configEntriesStruct.set(CONFIG_VALUE, configEntry.value);
+ configEntryStructs.add(configEntriesStruct);
+ }
+ resourceStruct.set(CONFIG_ENTRIES_KEY_NAME, configEntryStructs.toArray(new Struct[0]));
+
+ resourceStructs.add(resourceStruct);
+ }
+ struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0]));
+ return struct;
+ }
+
+ @Override
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ short version = version();
+ switch (version) {
+ case 0:
+ ApiError error = ApiError.fromThrowable(e);
+ Map<Resource, ApiError> errors = new HashMap<>(configs.size());
+ for (Resource resource : configs.keySet())
+ errors.put(resource, error);
+ return new AlterConfigsResponse(throttleTimeMs, errors);
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ version, this.getClass().getSimpleName(), ApiKeys.ALTER_CONFIGS.latestVersion()));
+ }
+ }
+
+ public static AlterConfigsRequest parse(ByteBuffer buffer, short version) {
+ return new AlterConfigsRequest(ApiKeys.ALTER_CONFIGS.parseRequest(version, buffer), version);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
new file mode 100644
index 0000000..8f904d8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AlterConfigsResponse extends AbstractResponse {
+
+ private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
+
+ private static final String RESOURCES_KEY_NAME = "resources";
+ private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
+ private static final String RESOURCE_NAME_KEY_NAME = "resource_name";
+
+ private final int throttleTimeMs;
+ private final Map<Resource, ApiError> errors;
+
+ public AlterConfigsResponse(int throttleTimeMs, Map<Resource, ApiError> errors) {
+ this.throttleTimeMs = throttleTimeMs;
+ this.errors = errors;
+
+ }
+
+ public AlterConfigsResponse(Struct struct) {
+ throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+ Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME);
+ errors = new HashMap<>(resourcesArray.length);
+ for (Object resourceObj : resourcesArray) {
+ Struct resourceStruct = (Struct) resourceObj;
+ ApiError error = new ApiError(resourceStruct);
+ ResourceType resourceType = ResourceType.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME));
+ String resourceName = resourceStruct.getString(RESOURCE_NAME_KEY_NAME);
+ errors.put(new Resource(resourceType, resourceName), error);
+ }
+ }
+
+ public Map<Resource, ApiError> errors() {
+ return errors;
+ }
+
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
+ @Override
+ protected Struct toStruct(short version) {
+ Struct struct = new Struct(ApiKeys.ALTER_CONFIGS.responseSchema(version));
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+ List<Struct> resourceStructs = new ArrayList<>(errors.size());
+ for (Map.Entry<Resource, ApiError> entry : errors.entrySet()) {
+ Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME);
+ Resource resource = entry.getKey();
+ entry.getValue().write(resourceStruct);
+ resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id());
+ resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name());
+ resourceStructs.add(resourceStruct);
+ }
+ struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0]));
+ return struct;
+ }
+
+ public static AlterConfigsResponse parse(ByteBuffer buffer, short version) {
+ return new AlterConfigsResponse(ApiKeys.ALTER_CONFIGS.parseResponse(version, buffer));
+ }
+
+}