You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/15 01:05:54 UTC
kafka git commit: KAFKA-5275; AdminClient API consistency
Repository: kafka
Updated Branches:
refs/heads/trunk bcf5da0ba -> 0f60617fa
KAFKA-5275; AdminClient API consistency
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Colin P. Mccabe <cm...@confluent.io>, Jason Gustafson <ja...@confluent.io>
Closes #3339 from ijuma/kafka-5275-admin-client-api-consistency
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0f60617f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0f60617f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0f60617f
Branch: refs/heads/trunk
Commit: 0f60617fab5fc6805f522d0b9a213a7f600fab12
Parents: bcf5da0
Author: Ismael Juma <is...@juma.me.uk>
Authored: Thu Jun 15 02:05:41 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Jun 15 02:05:41 2017 +0100
----------------------------------------------------------------------
.../kafka/clients/admin/AdminClientConfig.java | 9 +++
.../clients/admin/AlterConfigsOptions.java | 2 +-
.../kafka/clients/admin/AlterConfigsResult.java | 2 +-
.../kafka/clients/admin/CreateAclsResult.java | 2 +-
.../clients/admin/CreateTopicsOptions.java | 2 +-
.../kafka/clients/admin/CreateTopicsResult.java | 2 +-
.../kafka/clients/admin/DeleteAclsResult.java | 32 +++++------
.../kafka/clients/admin/DeleteTopicsResult.java | 2 +-
.../kafka/clients/admin/DescribeAclsResult.java | 2 +-
.../clients/admin/DescribeConfigsResult.java | 2 +-
.../clients/admin/DescribeTopicsResult.java | 2 +-
.../kafka/clients/admin/KafkaAdminClient.java | 60 ++++++++++----------
.../kafka/clients/admin/ListTopicsOptions.java | 2 +-
.../kafka/clients/admin/ListTopicsResult.java | 13 +++--
.../apache/kafka/clients/admin/NewTopic.java | 2 +-
.../kafka/clients/admin/TopicDescription.java | 19 ++++---
.../kafka/clients/admin/TopicListing.java | 2 +-
.../apache/kafka/common/TopicPartitionInfo.java | 7 ++-
.../kafka/common/acl/AccessControlEntry.java | 10 ++--
.../common/acl/AccessControlEntryData.java | 4 +-
.../common/acl/AccessControlEntryFilter.java | 4 +-
.../org/apache/kafka/common/acl/AclBinding.java | 2 +-
.../kafka/common/acl/AclBindingFilter.java | 2 +-
.../apache/kafka/common/acl/AclOperation.java | 2 +-
.../kafka/common/acl/AclPermissionType.java | 2 +-
.../apache/kafka/common/resource/Resource.java | 4 +-
.../kafka/common/resource/ResourceFilter.java | 2 +-
.../kafka/common/resource/ResourceType.java | 2 +-
.../clients/admin/KafkaAdminClientTest.java | 28 ++++-----
.../kafka/common/acl/AclOperationTest.java | 2 +-
.../kafka/common/acl/AclPermissionTypeTest.java | 2 +-
.../kafka/common/resource/ResourceTypeTest.java | 2 +-
.../apache/kafka/connect/util/TopicAdmin.java | 2 +-
.../storage/KafkaConfigBackingStoreTest.java | 2 +-
.../storage/KafkaOffsetBackingStoreTest.java | 2 +-
.../kafka/api/AdminClientIntegrationTest.scala | 30 +++++-----
...AdminClientWithPoliciesIntegrationTest.scala | 20 +++----
.../api/SaslSslAdminClientIntegrationTest.scala | 38 ++++++-------
.../kafka/tools/ClientCompatibilityTest.java | 6 +-
39 files changed, 173 insertions(+), 159 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index c106823..ed51e67 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -98,6 +98,9 @@ public class AdminClientConfig extends AbstractConfig {
private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC;
private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;
+ public static final String RETRIES_CONFIG = "retries";
+ private static final String RETRIES_DOC = "The maximum number of times to retry a call before failing it.";
+
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
@@ -136,6 +139,12 @@ public class AdminClientConfig extends AbstractConfig {
5 * 60 * 1000,
Importance.MEDIUM,
CONNECTIONS_MAX_IDLE_MS_DOC)
+ .define(RETRIES_CONFIG,
+ Type.INT,
+ 5,
+ atLeast(0),
+ Importance.LOW,
+ RETRIES_DOC)
.define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
Type.LONG,
30000,
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
index 31f130f..c5665c0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
@@ -52,7 +52,7 @@ public class AlterConfigsOptions {
/**
* Return true if the request should be validated without altering the configs.
*/
- public boolean isValidateOnly() {
+ public boolean shouldValidateOnly() {
return validateOnly;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
index 5baacf7..df6c1c2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
@@ -40,7 +40,7 @@ public class AlterConfigsResult {
/**
* Return a map from resources to futures which can be used to check the status of the operation on each resource.
*/
- public Map<ConfigResource, KafkaFuture<Void>> results() {
+ public Map<ConfigResource, KafkaFuture<Void>> values() {
return futures;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java
index e575184..2917f17 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java
@@ -41,7 +41,7 @@ public class CreateAclsResult {
* Return a map from ACL bindings to futures which can be used to check the status of the creation of each ACL
* binding.
*/
- public Map<AclBinding, KafkaFuture<Void>> results() {
+ public Map<AclBinding, KafkaFuture<Void>> values() {
return futures;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
index bc24014..cb23a8d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
@@ -59,7 +59,7 @@ public class CreateTopicsOptions {
/**
* Return true if the request should be validated without creating the topic.
*/
- public boolean validateOnly() {
+ public boolean shouldValidateOnly() {
return validateOnly;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
index 3731fad..404cb918 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
@@ -39,7 +39,7 @@ public class CreateTopicsResult {
* Return a map from topic names to futures, which can be used to check the status of individual
* topic creations.
*/
- public Map<String, KafkaFuture<Void>> results() {
+ public Map<String, KafkaFuture<Void>> values() {
return futures;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
index 16a505d..90bc297 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
@@ -38,22 +38,22 @@ import java.util.Map;
public class DeleteAclsResult {
/**
- * A class containing either the deleted ACL or an exception if the delete failed.
+ * A class containing either the deleted ACL binding or an exception if the delete failed.
*/
public static class FilterResult {
- private final AclBinding acl;
+ private final AclBinding binding;
private final ApiException exception;
- FilterResult(AclBinding acl, ApiException exception) {
- this.acl = acl;
+ FilterResult(AclBinding binding, ApiException exception) {
+ this.binding = binding;
this.exception = exception;
}
/**
- * Return the deleted ACL or null if there was an error.
+ * Return the deleted ACL binding or null if there was an error.
*/
- public AclBinding acl() {
- return acl;
+ public AclBinding binding() {
+ return binding;
}
/**
@@ -68,17 +68,17 @@ public class DeleteAclsResult {
* A class containing the results of the delete ACLs operation.
*/
public static class FilterResults {
- private final List<FilterResult> acls;
+ private final List<FilterResult> values;
- FilterResults(List<FilterResult> acls) {
- this.acls = acls;
+ FilterResults(List<FilterResult> values) {
+ this.values = values;
}
/**
- * Return a list of delete ACLs results.
+ * Return a list of delete ACLs results for a given filter.
*/
- public List<FilterResult> acls() {
- return acls;
+ public List<FilterResult> values() {
+ return values;
}
}
@@ -92,7 +92,7 @@ public class DeleteAclsResult {
* Return a map from acl filters to futures which can be used to check the status of the deletions by each
* filter.
*/
- public Map<AclBindingFilter, KafkaFuture<FilterResults>> results() {
+ public Map<AclBindingFilter, KafkaFuture<FilterResults>> values() {
return futures;
}
@@ -115,11 +115,11 @@ public class DeleteAclsResult {
// have failed if any Future failed.
throw new KafkaException("DeleteAclsResult#all: internal error", e);
}
- for (FilterResult result : results.acls()) {
+ for (FilterResult result : results.values()) {
if (result.exception() != null) {
throw result.exception();
}
- acls.add(result.acl());
+ acls.add(result.binding());
}
}
return acls;
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
index 2fd6648..9148a76 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
@@ -40,7 +40,7 @@ public class DeleteTopicsResult {
* Return a map from topic names to futures which can be used to check the status of
* individual deletions.
*/
- public Map<String, KafkaFuture<Void>> results() {
+ public Map<String, KafkaFuture<Void>> values() {
return futures;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java
index 7c49b28..e09bf43 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java
@@ -40,7 +40,7 @@ public class DescribeAclsResult {
/**
* Return a future containing the ACLs requested.
*/
- public KafkaFuture<Collection<AclBinding>> all() {
+ public KafkaFuture<Collection<AclBinding>> values() {
return future;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
index c5d4c26..478bf05 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
@@ -44,7 +44,7 @@ public class DescribeConfigsResult {
* Return a map from resources to futures which can be used to check the status of the configuration for each
* resource.
*/
- public Map<ConfigResource, KafkaFuture<Config>> results() {
+ public Map<ConfigResource, KafkaFuture<Config>> values() {
return futures;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
index 4e8d433..18f5f9d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
@@ -42,7 +42,7 @@ public class DescribeTopicsResult {
* Return a map from topic names to futures which can be used to check the status of
* individual topics.
*/
- public Map<String, KafkaFuture<TopicDescription>> results() {
+ public Map<String, KafkaFuture<TopicDescription>> values() {
return futures;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index d2927ea..e92b1d3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -89,13 +89,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -113,11 +113,6 @@ public class KafkaAdminClient extends AdminClient {
private static final Logger log = LoggerFactory.getLogger(KafkaAdminClient.class);
/**
- * The maximum number of times to retry a call before failing it.
- */
- private static final int MAX_CALL_RETRIES = 5;
-
- /**
* The next integer to use to name a KafkaAdminClient which the user hasn't specified an explicit name for.
*/
private static final AtomicInteger ADMIN_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
@@ -183,6 +178,8 @@ public class KafkaAdminClient extends AdminClient {
*/
private final TimeoutProcessorFactory timeoutProcessorFactory;
+ private final int maxRetries;
+
/**
* Get or create a list value from a map.
*
@@ -357,6 +354,7 @@ public class KafkaAdminClient extends AdminClient {
this.thread = new KafkaThread(threadName, runnable, false);
this.timeoutProcessorFactory = (timeoutProcessorFactory == null) ?
new TimeoutProcessorFactory() : timeoutProcessorFactory;
+ this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG);
config.logUnused();
log.debug("Created Kafka admin client {}", this.clientId);
thread.start();
@@ -421,22 +419,6 @@ public class KafkaAdminClient extends AdminClient {
}
/**
- * Provides a constant node which is known at construction time.
- */
- private static class ConstantNodeProvider implements NodeProvider {
- private final Node node;
-
- ConstantNodeProvider(Node node) {
- this.node = node;
- }
-
- @Override
- public Node provide() {
- return node;
- }
- }
-
- /**
* Provides the controller node.
*/
private class ControllerNodeProvider implements NodeProvider {
@@ -506,7 +488,7 @@ public class KafkaAdminClient extends AdminClient {
return;
}
// If we are out of retries, fail.
- if (tries > MAX_CALL_RETRIES) {
+ if (tries > maxRetries) {
if (log.isDebugEnabled()) {
log.debug("{} failed after {} attempt(s)", this, tries,
new Exception(prettyPrintException(throwable)));
@@ -1031,7 +1013,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
public AbstractRequest.Builder createRequest(int timeoutMs) {
- return new CreateTopicsRequest.Builder(topicsMap, timeoutMs, options.validateOnly());
+ return new CreateTopicsRequest.Builder(topicsMap, timeoutMs, options.shouldValidateOnly());
}
@Override
@@ -1141,7 +1123,7 @@ public class KafkaAdminClient extends AdminClient {
Map<String, TopicListing> topicListing = new HashMap<>();
for (String topicName : cluster.topics()) {
boolean internal = cluster.internalTopics().contains(topicName);
- if (!internal || options.listInternal())
+ if (!internal || options.shouldListInternal())
topicListing.put(topicName, new TopicListing(topicName, internal));
}
topicListingFuture.complete(topicListing);
@@ -1197,19 +1179,31 @@ public class KafkaAdminClient extends AdminClient {
continue;
}
boolean isInternal = cluster.internalTopics().contains(topicName);
- TreeMap<Integer, TopicPartitionInfo> partitions = new TreeMap<>();
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topicName);
+ List<TopicPartitionInfo> partitions = new ArrayList<>(partitionInfos.size());
for (PartitionInfo partitionInfo : partitionInfos) {
TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(
- partitionInfo.partition(), partitionInfo.leader(), Arrays.asList(partitionInfo.replicas()),
+ partitionInfo.partition(), leader(partitionInfo), Arrays.asList(partitionInfo.replicas()),
Arrays.asList(partitionInfo.inSyncReplicas()));
- partitions.put(partitionInfo.partition(), topicPartitionInfo);
+ partitions.add(topicPartitionInfo);
}
+ Collections.sort(partitions, new Comparator<TopicPartitionInfo>() {
+ @Override
+ public int compare(TopicPartitionInfo tp1, TopicPartitionInfo tp2) {
+ return Integer.compare(tp1.partition(), tp2.partition());
+ }
+ });
TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions);
future.complete(topicDescription);
}
}
+ private Node leader(PartitionInfo partitionInfo) {
+ if (partitionInfo.leader() == null || partitionInfo.leader().id() == Node.noNode().id())
+ return null;
+ return partitionInfo.leader();
+ }
+
@Override
boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
if (supportsDisablingTopicCreation) {
@@ -1247,10 +1241,16 @@ public class KafkaAdminClient extends AdminClient {
void handleResponse(AbstractResponse abstractResponse) {
MetadataResponse response = (MetadataResponse) abstractResponse;
describeClusterFuture.complete(response.brokers());
- controllerFuture.complete(response.controller());
+ controllerFuture.complete(controller(response));
clusterIdFuture.complete(response.clusterId());
}
+ private Node controller(MetadataResponse response) {
+ if (response.controller() == null || response.controller().id() == MetadataResponse.NO_CONTROLLER_ID)
+ return null;
+ return response.controller();
+ }
+
@Override
void handleFailure(Throwable throwable) {
describeClusterFuture.completeExceptionally(throwable);
@@ -1530,7 +1530,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
public AbstractRequest.Builder createRequest(int timeoutMs) {
- return new AlterConfigsRequest.Builder(requestMap, options.isValidateOnly());
+ return new AlterConfigsRequest.Builder(requestMap, options.shouldValidateOnly());
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
index 4068e88..81d834f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
@@ -61,7 +61,7 @@ public class ListTopicsOptions {
/**
* Return true if we should list internal topics.
*/
- public boolean listInternal() {
+ public boolean shouldListInternal() {
return listInternal;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
index 97987ae..e54b3de 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
import java.util.Map;
+import java.util.Set;
/**
* The result of the {@link AdminClient#listTopics()} call.
@@ -39,14 +40,14 @@ public class ListTopicsResult {
/**
* Return a future which yields a map of topic names to TopicListing objects.
*/
- public KafkaFuture<Map<String, TopicListing>> namesToDescriptions() {
+ public KafkaFuture<Map<String, TopicListing>> namesToListings() {
return future;
}
/**
* Return a future which yields a collection of TopicListing objects.
*/
- public KafkaFuture<Collection<TopicListing>> descriptions() {
+ public KafkaFuture<Collection<TopicListing>> listings() {
return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<TopicListing>>() {
@Override
public Collection<TopicListing> apply(Map<String, TopicListing> namesToDescriptions) {
@@ -58,11 +59,11 @@ public class ListTopicsResult {
/**
* Return a future which yields a collection of topic names.
*/
- public KafkaFuture<Collection<String>> names() {
- return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<String>>() {
+ public KafkaFuture<Set<String>> names() {
+ return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Set<String>>() {
@Override
- public Collection<String> apply(Map<String, TopicListing> namesToDescriptions) {
- return namesToDescriptions.keySet();
+ public Set<String> apply(Map<String, TopicListing> namesToListings) {
+ return namesToListings.keySet();
}
});
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
index 314337c..ff09579 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
@@ -68,7 +68,7 @@ public class NewTopic {
/**
* The number of partitions for the new topic or -1 if a replica assignment has been specified.
*/
- public int partitions() {
+ public int numPartitions() {
return numPartitions;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
index 88ec1a0..c220892 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
@@ -20,7 +20,7 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.utils.Utils;
-import java.util.NavigableMap;
+import java.util.List;
/**
* A detailed description of a single topic in the cluster.
@@ -28,17 +28,17 @@ import java.util.NavigableMap;
public class TopicDescription {
private final String name;
private final boolean internal;
- private final NavigableMap<Integer, TopicPartitionInfo> partitions;
+ private final List<TopicPartitionInfo> partitions;
/**
* Create an instance with the specified parameters.
*
* @param name The topic name
* @param internal Whether the topic is internal to Kafka
- * @param partitions A map from partition id to its leadership and replica information
+ * @param partitions A list of partitions where the index represents the partition id and the element contains
+ * leadership and replica information for that partition.
*/
- public TopicDescription(String name, boolean internal,
- NavigableMap<Integer, TopicPartitionInfo> partitions) {
+ public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions) {
this.name = name;
this.internal = internal;
this.partitions = partitions;
@@ -55,20 +55,21 @@ public class TopicDescription {
* Whether the topic is internal to Kafka. An example of an internal topic is the offsets and group management topic:
* __consumer_offsets.
*/
- public boolean internal() {
+ public boolean isInternal() {
return internal;
}
/**
- * A map from partition id to the leadership and replica information for that partition.
+ * A list of partitions where the index represents the partition id and the element contains leadership and replica
+ * information for that partition.
*/
- public NavigableMap<Integer, TopicPartitionInfo> partitions() {
+ public List<TopicPartitionInfo> partitions() {
return partitions;
}
@Override
public String toString() {
return "(name=" + name + ", internal=" + internal + ", partitions=" +
- Utils.mkString(partitions, "[", "]", "=", ",") + ")";
+ Utils.join(partitions, ",") + ")";
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
index 738c2ef..e5124be 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
@@ -46,7 +46,7 @@ public class TopicListing {
* Whether the topic is internal to Kafka. An example of an internal topic is the offsets and group management topic:
* __consumer_offsets.
*/
- public boolean internal() {
+ public boolean isInternal() {
return internal;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java
index 7656cd2..be69318 100644
--- a/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common;
import org.apache.kafka.common.utils.Utils;
+import java.util.Collections;
import java.util.List;
/**
@@ -42,8 +43,8 @@ public class TopicPartitionInfo {
public TopicPartitionInfo(int partition, Node leader, List<Node> replicas, List<Node> isr) {
this.partition = partition;
this.leader = leader;
- this.replicas = replicas;
- this.isr = isr;
+ this.replicas = Collections.unmodifiableList(replicas);
+ this.isr = Collections.unmodifiableList(isr);
}
/**
@@ -54,7 +55,7 @@ public class TopicPartitionInfo {
}
/**
- * Return the leader of the partition or {@link Node#noNode()} if there is none.
+ * Return the leader of the partition or null if there is none.
*/
public Node leader() {
return leader;
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java
index 1796762..d5e05df 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java
@@ -42,9 +42,11 @@ public class AccessControlEntry {
Objects.requireNonNull(principal);
Objects.requireNonNull(host);
Objects.requireNonNull(operation);
- assert operation != AclOperation.ANY;
+ if (operation == AclOperation.ANY)
+ throw new IllegalArgumentException("operation must not be ANY");
Objects.requireNonNull(permissionType);
- assert permissionType != AclPermissionType.ANY;
+ if (permissionType == AclPermissionType.ANY)
+ throw new IllegalArgumentException("permissionType must not be ANY");
this.data = new AccessControlEntryData(principal, host, operation, permissionType);
}
@@ -91,8 +93,8 @@ public class AccessControlEntry {
/**
* Return true if this AclResource has any UNKNOWN components.
*/
- public boolean unknown() {
- return data.unknown();
+ public boolean isUnknown() {
+ return data.isUnknown();
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java
index cf69263..ad7660d 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java
@@ -83,8 +83,8 @@ class AccessControlEntryData {
/**
* Return true if there are any UNKNOWN components.
*/
- boolean unknown() {
- return operation.unknown() || permissionType.unknown();
+ boolean isUnknown() {
+ return operation.isUnknown() || permissionType.isUnknown();
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java
index 2d8e9fb..a95303e 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java
@@ -95,8 +95,8 @@ public class AccessControlEntryFilter {
/**
* Return true if there are any UNKNOWN components.
*/
- public boolean unknown() {
- return data.unknown();
+ public boolean isUnknown() {
+ return data.isUnknown();
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
index fd2a756..ea58434 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
@@ -49,7 +49,7 @@ public class AclBinding {
* Return true if this binding has any UNKNOWN components.
*/
public boolean unknown() {
- return resource.unknown() || entry.unknown();
+ return resource.isUnknown() || entry.isUnknown();
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
index ad4a811..807b730 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
@@ -57,7 +57,7 @@ public class AclBindingFilter {
* Return true if this filter has any UNKNOWN components.
*/
public boolean unknown() {
- return resourceFilter.unknown() || entryFilter.unknown();
+ return resourceFilter.unknown() || entryFilter.isUnknown();
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
index c5d5b1a..3da18cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
@@ -158,7 +158,7 @@ public enum AclOperation {
/**
* Return true if this operation is UNKNOWN.
*/
- public boolean unknown() {
+ public boolean isUnknown() {
return this == UNKNOWN;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java b/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
index d963135..c5b077c 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
@@ -100,7 +100,7 @@ public enum AclPermissionType {
/**
* Return true if this permission type is UNKNOWN.
*/
- public boolean unknown() {
+ public boolean isUnknown() {
return this == UNKNOWN;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/Resource.java b/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
index 484c207..f41f41a 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
@@ -83,8 +83,8 @@ public class Resource {
/**
* Return true if this Resource has any UNKNOWN components.
*/
- public boolean unknown() {
- return resourceType.unknown();
+ public boolean isUnknown() {
+ return resourceType.isUnknown();
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
index 18042fa..5032660 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
@@ -71,7 +71,7 @@ public class ResourceFilter {
* Return true if this ResourceFilter has any UNKNOWN components.
*/
public boolean unknown() {
- return resourceType.unknown();
+ return resourceType.isUnknown();
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
index 0874f91..d83382d 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
@@ -110,7 +110,7 @@ public enum ResourceType {
/**
* Return whether this resource type is UNKNOWN.
*/
- public boolean unknown() {
+ public boolean isUnknown() {
return this == UNKNOWN;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index dfab018..cd6ed6b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -220,17 +220,17 @@ public class KafkaAdminClientTest {
add(ACL1);
add(ACL2);
}}));
- assertCollectionIs(env.adminClient().describeAcls(FILTER1).all().get(), ACL1, ACL2);
+ assertCollectionIs(env.adminClient().describeAcls(FILTER1).values().get(), ACL1, ACL2);
// Test a call where we get back no results.
env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, null,
Collections.<AclBinding>emptySet()));
- assertTrue(env.adminClient().describeAcls(FILTER2).all().get().isEmpty());
+ assertTrue(env.adminClient().describeAcls(FILTER2).values().get().isEmpty());
// Test a call where we get back an error.
env.kafkaClient().prepareResponse(new DescribeAclsResponse(0,
new SecurityDisabledException("Security is disabled"), Collections.<AclBinding>emptySet()));
- assertFutureError(env.adminClient().describeAcls(FILTER2).all(), SecurityDisabledException.class);
+ assertFutureError(env.adminClient().describeAcls(FILTER2).values(), SecurityDisabledException.class);
}
}
@@ -251,8 +251,8 @@ public class KafkaAdminClientTest {
add(ACL1);
add(ACL2);
}});
- assertCollectionIs(results.results().keySet(), ACL1, ACL2);
- for (KafkaFuture<Void> future : results.results().values()) {
+ assertCollectionIs(results.values().keySet(), ACL1, ACL2);
+ for (KafkaFuture<Void> future : results.values().values()) {
future.get();
}
results.all().get();
@@ -267,9 +267,9 @@ public class KafkaAdminClientTest {
add(ACL1);
add(ACL2);
}});
- assertCollectionIs(results.results().keySet(), ACL1, ACL2);
- assertFutureError(results.results().get(ACL1), SecurityDisabledException.class);
- results.results().get(ACL2).get();
+ assertCollectionIs(results.values().keySet(), ACL1, ACL2);
+ assertFutureError(results.values().get(ACL1), SecurityDisabledException.class);
+ results.values().get(ACL2).get();
assertFutureError(results.all(), SecurityDisabledException.class);
}
}
@@ -295,12 +295,12 @@ public class KafkaAdminClientTest {
add(FILTER1);
add(FILTER2);
}});
- Map<AclBindingFilter, KafkaFuture<FilterResults>> filterResults = results.results();
+ Map<AclBindingFilter, KafkaFuture<FilterResults>> filterResults = results.values();
FilterResults filter1Results = filterResults.get(FILTER1).get();
- assertEquals(null, filter1Results.acls().get(0).exception());
- assertEquals(ACL1, filter1Results.acls().get(0).acl());
- assertEquals(null, filter1Results.acls().get(1).exception());
- assertEquals(ACL2, filter1Results.acls().get(1).acl());
+ assertEquals(null, filter1Results.values().get(0).exception());
+ assertEquals(ACL1, filter1Results.values().get(0).binding());
+ assertEquals(null, filter1Results.values().get(1).exception());
+ assertEquals(ACL2, filter1Results.values().get(1).binding());
assertFutureError(filterResults.get(FILTER2), SecurityDisabledException.class);
assertFutureError(results.all(), SecurityDisabledException.class);
@@ -318,7 +318,7 @@ public class KafkaAdminClientTest {
add(FILTER1);
add(FILTER2);
}});
- assertTrue(results.results().get(FILTER2).get().acls().isEmpty());
+ assertTrue(results.values().get(FILTER2).get().values().isEmpty());
assertFutureError(results.all(), SecurityDisabledException.class);
// Test a call where there are no errors.
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
index ba09499..a9b27d8 100644
--- a/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
@@ -55,7 +55,7 @@ public class AclOperationTest {
public void testIsUnknown() throws Exception {
for (AclOperationTestInfo info : INFOS) {
assertEquals(info.operation + " was supposed to have unknown == " + info.unknown,
- info.unknown, info.operation.unknown());
+ info.unknown, info.operation.isUnknown());
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
index 15b9068..3f018d7 100644
--- a/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
@@ -47,7 +47,7 @@ public class AclPermissionTypeTest {
public void testIsUnknown() throws Exception {
for (AclPermissionTypeTestInfo info : INFOS) {
assertEquals(info.ty + " was supposed to have unknown == " + info.unknown,
- info.unknown, info.ty.unknown());
+ info.unknown, info.ty.isUnknown());
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
index 9adade1..d5f13bc 100644
--- a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
@@ -48,7 +48,7 @@ public class ResourceTypeTest {
public void testIsUnknown() throws Exception {
for (AclResourceTypeTestInfo info : INFOS) {
assertEquals(info.resourceType + " was supposed to have unknown == " + info.unknown,
- info.unknown, info.resourceType.unknown());
+ info.unknown, info.resourceType.isUnknown());
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index a3b4218..adc3378 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -216,7 +216,7 @@ public class TopicAdmin implements AutoCloseable {
// Attempt to create any missing topics
CreateTopicsOptions args = new CreateTopicsOptions().validateOnly(false);
- Map<String, KafkaFuture<Void>> newResults = admin.createTopics(topicsByName.values(), args).results();
+ Map<String, KafkaFuture<Void>> newResults = admin.createTopics(topicsByName.values(), args).values();
// Iterate over each future so that we can handle individual failures like when some topics already exist
Set<String> newlyCreatedTopicNames = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 9da574d..07d192b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -169,7 +169,7 @@ public class KafkaConfigBackingStoreTest {
assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
assertEquals(TOPIC, capturedNewTopic.getValue().name());
- assertEquals(1, capturedNewTopic.getValue().partitions());
+ assertEquals(1, capturedNewTopic.getValue().numPartitions());
assertEquals(TOPIC_REPLICATION_FACTOR, capturedNewTopic.getValue().replicationFactor());
configStorage.start();
configStorage.stop();
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index 70d7f40..8cd2f0b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -130,7 +130,7 @@ public class KafkaOffsetBackingStoreTest {
assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
assertEquals(TOPIC, capturedNewTopic.getValue().name());
- assertEquals(TOPIC_PARTITIONS, capturedNewTopic.getValue().partitions());
+ assertEquals(TOPIC_PARTITIONS, capturedNewTopic.getValue().numPartitions());
assertEquals(TOPIC_REPLICATION_FACTOR, capturedNewTopic.getValue().replicationFactor());
store.start();
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 5e94ffe..4c74bca 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -133,7 +133,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
client.createTopics(newTopics.asJava).all.get()
waitForTopics(client, topics, List())
- val results = client.createTopics(newTopics.asJava).results()
+ val results = client.createTopics(newTopics.asJava).values()
assertTrue(results.containsKey("mytopic"))
assertFutureExceptionTypeEquals(results.get("mytopic"), classOf[TopicExistsException])
assertTrue(results.containsKey("mytopic2"))
@@ -143,7 +143,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
assertEquals(topics.toSet, topicToDescription.keySet.asScala)
val topic0 = topicToDescription.get("mytopic")
- assertEquals(false, topic0.internal)
+ assertEquals(false, topic0.isInternal)
assertEquals("mytopic", topic0.name)
assertEquals(2, topic0.partitions.size)
val topic0Partition0 = topic0.partitions.get(0)
@@ -158,7 +158,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
assertEquals(Seq(2, 0), topic0Partition1.replicas.asScala.map(_.id))
val topic1 = topicToDescription.get("mytopic2")
- assertEquals(false, topic1.internal)
+ assertEquals(false, topic1.isInternal)
assertEquals("mytopic2", topic1.name)
assertEquals(3, topic1.partitions.size)
for (partitionId <- 0 until 3) {
@@ -192,7 +192,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
waitForTopics(client, Seq(existingTopic), List())
val nonExistingTopic = "non-existing"
- val results = client.describeTopics(Seq(nonExistingTopic, existingTopic).asJava).results
+ val results = client.describeTopics(Seq(nonExistingTopic, existingTopic).asJava).values
assertEquals(existingTopic, results.get(existingTopic).get.name)
intercept[ExecutionException](results.get(nonExistingTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException]
assertEquals(None, zkUtils.getTopicPartitionCount(nonExistingTopic))
@@ -303,7 +303,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
@Test
def testAclOperations(): Unit = {
client = AdminClient.create(createConfig())
- assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY).all(), classOf[SecurityDisabledException])
+ assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY).values(), classOf[SecurityDisabledException])
assertFutureExceptionTypeEquals(client.createAcls(Collections.singleton(ACL1)).all(),
classOf[SecurityDisabledException])
assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(ACL1.toFilter())).all(),
@@ -422,7 +422,7 @@ object AdminClientIntegrationTest {
topicResource2 -> new Config(topicConfigEntries2)
).asJava)
- assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet)
+ assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.values.keySet)
alterResult.all.get
// Verify that topics were updated correctly
@@ -454,7 +454,7 @@ object AdminClientIntegrationTest {
topicResource2 -> new Config(topicConfigEntries2)
).asJava, new AlterConfigsOptions().validateOnly(true))
- assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet)
+ assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.values.keySet)
alterResult.all.get
// Verify that topics were not updated due to validateOnly = true
@@ -495,10 +495,10 @@ object AdminClientIntegrationTest {
brokerResource -> new Config(brokerConfigEntries)
).asJava)
- assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet)
- assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
- alterResult.results.get(topicResource2).get
- assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+ assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet)
+ assertTrue(intercept[ExecutionException](alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
+ alterResult.values.get(topicResource2).get
+ assertTrue(intercept[ExecutionException](alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
// Verify that first and third resources were not updated and second was updated
var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava)
@@ -523,10 +523,10 @@ object AdminClientIntegrationTest {
brokerResource -> new Config(brokerConfigEntries)
).asJava, new AlterConfigsOptions().validateOnly(true))
- assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet)
- assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
- alterResult.results.get(topicResource2).get
- assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+ assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet)
+ assertTrue(intercept[ExecutionException](alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
+ alterResult.values.get(topicResource2).get
+ assertTrue(intercept[ExecutionException](alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
// Verify that no resources are updated since validate_only = true
describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index 7d3c54c..8e8e825 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -128,11 +128,11 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
brokerResource -> new Config(brokerConfigEntries)
).asJava)
- assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.results.keySet)
- assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
- alterResult.results.get(topicResource2).get
- assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException])
- assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+ assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.values.keySet)
+ assertTrue(intercept[ExecutionException](alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
+ alterResult.values.get(topicResource2).get
+ assertTrue(intercept[ExecutionException](alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException])
+ assertTrue(intercept[ExecutionException](alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
// Verify that the second resource was updated and the others were not
var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, topicResource3, brokerResource).asJava)
@@ -158,11 +158,11 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
topicResource3 -> new Config(topicConfigEntries3)
).asJava, new AlterConfigsOptions().validateOnly(true))
- assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.results.keySet)
- assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
- alterResult.results.get(topicResource2).get
- assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException])
- assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+ assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.values.keySet)
+ assertTrue(intercept[ExecutionException](alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
+ alterResult.values.get(topicResource2).get
+ assertTrue(intercept[ExecutionException](alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException])
+ assertTrue(intercept[ExecutionException](alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
// Verify that no resources are updated since validate_only = true
describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, topicResource3, brokerResource).asJava)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index d0e0806..b4e09b3 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -96,25 +96,25 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
@Test
override def testAclOperations(): Unit = {
client = AdminClient.create(createConfig())
- assertEquals(6, client.describeAcls(AclBindingFilter.ANY).all.get().size)
+ assertEquals(6, client.describeAcls(AclBindingFilter.ANY).values.get().size)
val results = client.createAcls(List(acl2, acl3).asJava)
- assertEquals(Set(acl2, acl3), results.results.keySet().asScala)
- results.results.values().asScala.foreach(value => value.get)
+ assertEquals(Set(acl2, acl3), results.values.keySet().asScala)
+ results.values.values().asScala.foreach(value => value.get)
val aclUnknown = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.UNKNOWN, AclPermissionType.ALLOW))
val results2 = client.createAcls(List(aclUnknown).asJava)
- assertEquals(Set(aclUnknown), results2.results.keySet().asScala)
+ assertEquals(Set(aclUnknown), results2.values.keySet().asScala)
assertFutureExceptionTypeEquals(results2.all, classOf[InvalidRequestException])
- val results3 = client.deleteAcls(List(ACL1.toFilter, acl2.toFilter, acl3.toFilter).asJava).results
+ val results3 = client.deleteAcls(List(ACL1.toFilter, acl2.toFilter, acl3.toFilter).asJava).values
assertEquals(Set(ACL1.toFilter, acl2.toFilter, acl3.toFilter), results3.keySet.asScala)
- assertEquals(0, results3.get(ACL1.toFilter).get.acls.size())
- assertEquals(Set(acl2), results3.get(acl2.toFilter).get.acls.asScala.map(_.acl).toSet)
- assertEquals(Set(acl3), results3.get(acl3.toFilter).get.acls.asScala.map(_.acl).toSet)
+ assertEquals(0, results3.get(ACL1.toFilter).get.values.size())
+ assertEquals(Set(acl2), results3.get(acl2.toFilter).get.values.asScala.map(_.binding).toSet)
+ assertEquals(Set(acl3), results3.get(acl3.toFilter).get.values.asScala.map(_.binding).toSet)
}
def waitForDescribeAcls(client: AdminClient, filter: AclBindingFilter, acls: Set[AclBinding]): Unit = {
TestUtils.waitUntilTrue(() => {
- val results = client.describeAcls(filter).all.get()
+ val results = client.describeAcls(filter).values.get()
acls == results.asScala.toSet
}, "timed out waiting for ACLs")
}
@@ -123,7 +123,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
def testAclOperations2(): Unit = {
client = AdminClient.create(createConfig())
val results = client.createAcls(List(acl2, acl2).asJava)
- assertEquals(Set(acl2, acl2), results.results.keySet().asScala)
+ assertEquals(Set(acl2, acl2), results.values.keySet().asScala)
results.all.get()
waitForDescribeAcls(client, acl2.toFilter, Set(acl2))
@@ -133,9 +133,9 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
waitForDescribeAcls(client, filterA, Set())
val results2 = client.deleteAcls(List(filterA, filterB).asJava, new DeleteAclsOptions())
- assertEquals(Set(filterA, filterB), results2.results.keySet().asScala)
- assertEquals(Set(), results2.results.get(filterA).get.acls.asScala.map(_.acl).toSet)
- assertEquals(Set(acl2), results2.results.get(filterB).get.acls.asScala.map(_.acl).toSet)
+ assertEquals(Set(filterA, filterB), results2.values.keySet().asScala)
+ assertEquals(Set(), results2.values.get(filterA).get.values.asScala.map(_.binding).toSet)
+ assertEquals(Set(acl2), results2.values.get(filterB).get.values.asScala.map(_.binding).toSet)
waitForDescribeAcls(client, filterB, Set())
}
@@ -148,9 +148,9 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
val emptyResourceNameAcl = new AclBinding(new Resource(ResourceType.TOPIC, ""),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
val results = client.createAcls(List(clusterAcl, emptyResourceNameAcl).asJava, new CreateAclsOptions())
- assertEquals(Set(clusterAcl, emptyResourceNameAcl), results.results.keySet().asScala)
- assertFutureExceptionTypeEquals(results.results.get(clusterAcl), classOf[InvalidRequestException])
- assertFutureExceptionTypeEquals(results.results.get(emptyResourceNameAcl), classOf[InvalidRequestException])
+ assertEquals(Set(clusterAcl, emptyResourceNameAcl), results.values.keySet().asScala)
+ assertFutureExceptionTypeEquals(results.values.get(clusterAcl), classOf[InvalidRequestException])
+ assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl), classOf[InvalidRequestException])
}
private def verifyCauseIsClusterAuth(e: Throwable): Unit = {
@@ -196,7 +196,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
verifyCauseIsClusterAuth(e)
true
case Success(_) =>
- assertEquals(Set(fooAcl), result.results.get(fooAcl.toFilter).get.acls.asScala.map(_.acl).toSet)
+ assertEquals(Set(fooAcl), result.values.get(fooAcl.toFilter).get.values.asScala.map(_.binding).toSet)
true
}
}
@@ -212,14 +212,14 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW))
val results = client.describeAcls(userAcl.toFilter)
if (expectAuth) {
- Try(results.all.get) match {
+ Try(results.values.get) match {
case Failure(e) =>
verifyCauseIsClusterAuth(e)
false
case Success(acls) => Set(userAcl).equals(acls.asScala.toSet)
}
} else {
- Try(results.all.get) match {
+ Try(results.values.get) match {
case Failure(e) =>
verifyCauseIsClusterAuth(e)
true
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
index 431b53b..b9288d7 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
@@ -287,13 +287,13 @@ public class ClientCompatibilityTest {
}
});
while (true) {
- Collection<TopicListing> listings = client.listTopics().descriptions().get();
+ Collection<TopicListing> listings = client.listTopics().listings().get();
if (!testConfig.createTopicsSupported)
break;
boolean foundNewTopic = false;
for (TopicListing listing : listings) {
if (listing.name().equals("newtopic")) {
- if (listing.internal())
+ if (listing.isInternal())
throw new KafkaException("Did not expect newtopic to be an internal topic.");
foundNewTopic = true;
}
@@ -308,7 +308,7 @@ public class ClientCompatibilityTest {
@Override
public void invoke() throws Throwable {
try {
- client.describeAcls(AclBindingFilter.ANY).all().get();
+ client.describeAcls(AclBindingFilter.ANY).values().get();
} catch (ExecutionException e) {
if (e.getCause() instanceof SecurityDisabledException)
return;