You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/31 16:37:43 UTC
[2/3] kafka git commit: KAFKA-5265;
Move ACLs, Config, Topic classes into org.apache.kafka.common
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
deleted file mode 100644
index a51c1c8..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.Collection;
-
-/**
- * The results of the describeCluster call.
- */
-@InterfaceStability.Unstable
-public class DescribeClusterResults {
- private final KafkaFuture<Collection<Node>> nodes;
- private final KafkaFuture<Node> controller;
- private final KafkaFuture<String> clusterId;
-
- DescribeClusterResults(KafkaFuture<Collection<Node>> nodes,
- KafkaFuture<Node> controller,
- KafkaFuture<String> clusterId) {
- this.nodes = nodes;
- this.controller = controller;
- this.clusterId = clusterId;
- }
-
- /**
- * Returns a future which yields a collection of nodes.
- */
- public KafkaFuture<Collection<Node>> nodes() {
- return nodes;
- }
-
- /**
- * Returns a future which yields the current controller id.
- * Note that this may yield null, if the controller ID is not yet known.
- */
- public KafkaFuture<Node> controller() {
- return controller;
- }
-
- /**
- * Returns a future which yields the current cluster Id.
- * Note that this may yield null, if the cluster version is too old.
- */
- public KafkaFuture<String> clusterId() {
- return clusterId;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
new file mode 100644
index 0000000..2379724
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.config.ConfigResource;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+@InterfaceStability.Unstable
+public class DescribeConfigsResult {
+
+ private final Map<ConfigResource, KafkaFuture<Config>> futures;
+
+ DescribeConfigsResult(Map<ConfigResource, KafkaFuture<Config>> futures) {
+ this.futures = futures;
+ }
+
+ public Map<ConfigResource, KafkaFuture<Config>> results() {
+ return futures;
+ }
+
+ public KafkaFuture<Map<ConfigResource, Config>> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+ thenApply(new KafkaFuture.Function<Void, Map<ConfigResource, Config>>() {
+ @Override
+ public Map<ConfigResource, Config> apply(Void v) {
+ Map<ConfigResource, Config> configs = new HashMap<>(futures.size());
+ for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : futures.entrySet()) {
+ try {
+ configs.put(entry.getKey(), entry.getValue().get());
+ } catch (InterruptedException | ExecutionException e) {
+ // This should be unreachable, because allOf ensured that all the futures
+ // completed successfully.
+ throw new RuntimeException(e);
+ }
+ }
+ return configs;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java
deleted file mode 100644
index c29872a..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-@InterfaceStability.Unstable
-public class DescribeConfigsResults {
-
- private final Map<ConfigResource, KafkaFuture<Config>> futures;
-
- DescribeConfigsResults(Map<ConfigResource, KafkaFuture<Config>> futures) {
- this.futures = futures;
- }
-
- public Map<ConfigResource, KafkaFuture<Config>> results() {
- return futures;
- }
-
- public KafkaFuture<Map<ConfigResource, Config>> all() {
- return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
- thenApply(new KafkaFuture.Function<Void, Map<ConfigResource, Config>>() {
- @Override
- public Map<ConfigResource, Config> apply(Void v) {
- Map<ConfigResource, Config> configs = new HashMap<>(futures.size());
- for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : futures.entrySet()) {
- try {
- configs.put(entry.getKey(), entry.getValue().get());
- } catch (InterruptedException | ExecutionException e) {
- // This should be unreachable, because allOf ensured that all the futures
- // completed successfully.
- throw new RuntimeException(e);
- }
- }
- return configs;
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
new file mode 100644
index 0000000..e7cd6b3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * The results of the describeTopic call.
+ */
+@InterfaceStability.Unstable
+public class DescribeTopicsResult {
+ private final Map<String, KafkaFuture<TopicDescription>> futures;
+
+ DescribeTopicsResult(Map<String, KafkaFuture<TopicDescription>> futures) {
+ this.futures = futures;
+ }
+
+ /**
+ * Return a map from topic names to futures which can be used to check the status of
+ * individual topics.
+ */
+ public Map<String, KafkaFuture<TopicDescription>> results() {
+ return futures;
+ }
+
+ /**
+ * Return a future which succeeds only if all the topic descriptions succeed.
+ */
+ public KafkaFuture<Map<String, TopicDescription>> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+ thenApply(new KafkaFuture.Function<Void, Map<String, TopicDescription>>() {
+ @Override
+ public Map<String, TopicDescription> apply(Void v) {
+ Map<String, TopicDescription> descriptions = new HashMap<>(futures.size());
+ for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) {
+ try {
+ descriptions.put(entry.getKey(), entry.getValue().get());
+ } catch (InterruptedException | ExecutionException e) {
+ // This should be unreachable, because allOf ensured that all the futures
+ // completed successfully.
+ throw new RuntimeException(e);
+ }
+ }
+ return descriptions;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java
deleted file mode 100644
index 5c309bb..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-/**
- * The results of the describeTopic call.
- */
-@InterfaceStability.Unstable
-public class DescribeTopicsResults {
- private final Map<String, KafkaFuture<TopicDescription>> futures;
-
- DescribeTopicsResults(Map<String, KafkaFuture<TopicDescription>> futures) {
- this.futures = futures;
- }
-
- /**
- * Return a map from topic names to futures which can be used to check the status of
- * individual topics.
- */
- public Map<String, KafkaFuture<TopicDescription>> results() {
- return futures;
- }
-
- /**
- * Return a future which succeeds only if all the topic descriptions succeed.
- */
- public KafkaFuture<Map<String, TopicDescription>> all() {
- return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
- thenApply(new KafkaFuture.Function<Void, Map<String, TopicDescription>>() {
- @Override
- public Map<String, TopicDescription> apply(Void v) {
- Map<String, TopicDescription> descriptions = new HashMap<>(futures.size());
- for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) {
- try {
- descriptions.put(entry.getKey(), entry.getValue().get());
- } catch (InterruptedException | ExecutionException e) {
- // This should be unreachable, because allOf ensured that all the futures
- // completed successfully.
- throw new RuntimeException(e);
- }
- }
- return descriptions;
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 98fc3f3..9fa0cad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -25,14 +25,18 @@ import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
-import org.apache.kafka.clients.admin.DeleteAclsResults.FilterResult;
-import org.apache.kafka.clients.admin.DeleteAclsResults.FilterResults;
+import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
+import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.DisconnectException;
@@ -995,7 +999,7 @@ public class KafkaAdminClient extends AdminClient {
}
@Override
- public CreateTopicResults createTopics(final Collection<NewTopic> newTopics,
+ public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
final CreateTopicsOptions options) {
final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(newTopics.size());
final Map<String, CreateTopicsRequest.TopicDetails> topicsMap = new HashMap<>(newTopics.size());
@@ -1046,11 +1050,11 @@ public class KafkaAdminClient extends AdminClient {
completeAllExceptionally(topicFutures.values(), throwable);
}
}, now);
- return new CreateTopicResults(new HashMap<String, KafkaFuture<Void>>(topicFutures));
+ return new CreateTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
}
@Override
- public DeleteTopicResults deleteTopics(final Collection<String> topicNames,
+ public DeleteTopicsResult deleteTopics(final Collection<String> topicNames,
DeleteTopicsOptions options) {
final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(topicNames.size());
for (String topicName : topicNames) {
@@ -1099,11 +1103,11 @@ public class KafkaAdminClient extends AdminClient {
completeAllExceptionally(topicFutures.values(), throwable);
}
}, now);
- return new DeleteTopicResults(new HashMap<String, KafkaFuture<Void>>(topicFutures));
+ return new DeleteTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
}
@Override
- public ListTopicsResults listTopics(final ListTopicsOptions options) {
+ public ListTopicsResult listTopics(final ListTopicsOptions options) {
final KafkaFutureImpl<Map<String, TopicListing>> topicListingFuture = new KafkaFutureImpl<>();
final long now = time.milliseconds();
runnable.call(new Call("listTopics", calcDeadlineMs(now, options.timeoutMs()),
@@ -1132,11 +1136,11 @@ public class KafkaAdminClient extends AdminClient {
topicListingFuture.completeExceptionally(throwable);
}
}, now);
- return new ListTopicsResults(topicListingFuture);
+ return new ListTopicsResult(topicListingFuture);
}
@Override
- public DescribeTopicsResults describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options) {
+ public DescribeTopicsResult describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options) {
final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size());
final ArrayList<String> topicNamesList = new ArrayList<>();
for (String topicName : topicNames) {
@@ -1190,11 +1194,11 @@ public class KafkaAdminClient extends AdminClient {
completeAllExceptionally(topicFutures.values(), throwable);
}
}, now);
- return new DescribeTopicsResults(new HashMap<String, KafkaFuture<TopicDescription>>(topicFutures));
+ return new DescribeTopicsResult(new HashMap<String, KafkaFuture<TopicDescription>>(topicFutures));
}
@Override
- public DescribeClusterResults describeCluster(DescribeClusterOptions options) {
+ public DescribeClusterResult describeCluster(DescribeClusterOptions options) {
final KafkaFutureImpl<Collection<Node>> describeClusterFuture = new KafkaFutureImpl<>();
final KafkaFutureImpl<Node> controllerFuture = new KafkaFutureImpl<>();
final KafkaFutureImpl<String> clusterIdFuture = new KafkaFutureImpl<>();
@@ -1223,11 +1227,11 @@ public class KafkaAdminClient extends AdminClient {
}
}, now);
- return new DescribeClusterResults(describeClusterFuture, controllerFuture, clusterIdFuture);
+ return new DescribeClusterResult(describeClusterFuture, controllerFuture, clusterIdFuture);
}
@Override
- public ApiVersionsResults apiVersions(Collection<Node> nodes, ApiVersionsOptions options) {
+ public ApiVersionsResult apiVersions(Collection<Node> nodes, ApiVersionsOptions options) {
final long now = time.milliseconds();
final long deadlineMs = calcDeadlineMs(now, options.timeoutMs());
Map<Node, KafkaFuture<NodeApiVersions>> nodeFutures = new HashMap<>();
@@ -1254,12 +1258,12 @@ public class KafkaAdminClient extends AdminClient {
}
}, now);
}
- return new ApiVersionsResults(nodeFutures);
+ return new ApiVersionsResult(nodeFutures);
}
@Override
- public DescribeAclsResults describeAcls(final AclBindingFilter filter, DescribeAclsOptions options) {
+ public DescribeAclsResult describeAcls(final AclBindingFilter filter, DescribeAclsOptions options) {
final long now = time.milliseconds();
final KafkaFutureImpl<Collection<AclBinding>> future = new KafkaFutureImpl<>();
runnable.call(new Call("describeAcls", calcDeadlineMs(now, options.timeoutMs()),
@@ -1285,11 +1289,11 @@ public class KafkaAdminClient extends AdminClient {
future.completeExceptionally(throwable);
}
}, now);
- return new DescribeAclsResults(future);
+ return new DescribeAclsResult(future);
}
@Override
- public CreateAclsResults createAcls(Collection<AclBinding> acls, CreateAclsOptions options) {
+ public CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options) {
final long now = time.milliseconds();
final Map<AclBinding, KafkaFutureImpl<Void>> futures = new HashMap<>();
final List<AclCreation> aclCreations = new ArrayList<>();
@@ -1340,11 +1344,11 @@ public class KafkaAdminClient extends AdminClient {
completeAllExceptionally(futures.values(), throwable);
}
}, now);
- return new CreateAclsResults(new HashMap<AclBinding, KafkaFuture<Void>>(futures));
+ return new CreateAclsResult(new HashMap<AclBinding, KafkaFuture<Void>>(futures));
}
@Override
- public DeleteAclsResults deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options) {
+ public DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options) {
final long now = time.milliseconds();
final Map<AclBindingFilter, KafkaFutureImpl<FilterResults>> futures = new HashMap<>();
final List<AclBindingFilter> filterList = new ArrayList<>();
@@ -1392,11 +1396,11 @@ public class KafkaAdminClient extends AdminClient {
completeAllExceptionally(futures.values(), throwable);
}
}, now);
- return new DeleteAclsResults(new HashMap<AclBindingFilter, KafkaFuture<FilterResults>>(futures));
+ return new DeleteAclsResult(new HashMap<AclBindingFilter, KafkaFuture<FilterResults>>(futures));
}
@Override
- public DescribeConfigsResults describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options) {
+ public DescribeConfigsResult describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options) {
final Map<ConfigResource, KafkaFutureImpl<Config>> singleRequestFutures = new HashMap<>();
final Collection<Resource> singleRequestResources = new ArrayList<>(configResources.size());
@@ -1487,7 +1491,7 @@ public class KafkaAdminClient extends AdminClient {
Map<ConfigResource, KafkaFutureImpl<Config>> allFutures = new HashMap<>(configResources.size());
allFutures.putAll(singleRequestFutures);
allFutures.putAll(brokerFutures);
- return new DescribeConfigsResults(new HashMap<ConfigResource, KafkaFuture<Config>>(allFutures));
+ return new DescribeConfigsResult(new HashMap<ConfigResource, KafkaFuture<Config>>(allFutures));
}
private Resource configResourceToResource(ConfigResource configResource) {
@@ -1506,7 +1510,7 @@ public class KafkaAdminClient extends AdminClient {
}
@Override
- public AlterConfigsResults alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options) {
+ public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options) {
final Map<ConfigResource, KafkaFutureImpl<Void>> futures = new HashMap<>(configs.size());
for (ConfigResource configResource : configs.keySet()) {
futures.put(configResource, new KafkaFutureImpl<Void>());
@@ -1548,6 +1552,6 @@ public class KafkaAdminClient extends AdminClient {
completeAllExceptionally(futures.values(), throwable);
}
}, now);
- return new AlterConfigsResults(new HashMap<ConfigResource, KafkaFuture<Void>>(futures));
+ return new AlterConfigsResult(new HashMap<ConfigResource, KafkaFuture<Void>>(futures));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
new file mode 100644
index 0000000..7b2fae8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * The result of the listTopics call.
+ */
+@InterfaceStability.Unstable
+public class ListTopicsResult {
+ final KafkaFuture<Map<String, TopicListing>> future;
+
+ ListTopicsResult(KafkaFuture<Map<String, TopicListing>> future) {
+ this.future = future;
+ }
+
+ /**
+ * Return a future which yields a map of topic names to TopicListing objects.
+ */
+ public KafkaFuture<Map<String, TopicListing>> namesToDescriptions() {
+ return future;
+ }
+
+ /**
+ * Return a future which yields a collection of TopicListing objects.
+ */
+ public KafkaFuture<Collection<TopicListing>> descriptions() {
+ return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<TopicListing>>() {
+ @Override
+ public Collection<TopicListing> apply(Map<String, TopicListing> namesToDescriptions) {
+ return namesToDescriptions.values();
+ }
+ });
+ }
+
+ /**
+ * Return a future which yields a collection of topic names.
+ */
+ public KafkaFuture<Collection<String>> names() {
+ return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<String>>() {
+ @Override
+ public Collection<String> apply(Map<String, TopicListing> namesToDescriptions) {
+ return namesToDescriptions.keySet();
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java
deleted file mode 100644
index 7e9448d..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * The result of the listTopics call.
- */
-@InterfaceStability.Unstable
-public class ListTopicsResults {
- final KafkaFuture<Map<String, TopicListing>> future;
-
- ListTopicsResults(KafkaFuture<Map<String, TopicListing>> future) {
- this.future = future;
- }
-
- /**
- * Return a future which yields a map of topic names to TopicListing objects.
- */
- public KafkaFuture<Map<String, TopicListing>> namesToDescriptions() {
- return future;
- }
-
- /**
- * Return a future which yields a collection of TopicListing objects.
- */
- public KafkaFuture<Collection<TopicListing>> descriptions() {
- return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<TopicListing>>() {
- @Override
- public Collection<TopicListing> apply(Map<String, TopicListing> namesToDescriptions) {
- return namesToDescriptions.values();
- }
- });
- }
-
- /**
- * Return a future which yields a collection of topic names.
- */
- public KafkaFuture<Collection<String>> names() {
- return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<String>>() {
- @Override
- public Collection<String> apply(Map<String, TopicListing> namesToDescriptions) {
- return namesToDescriptions.keySet();
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/Resource.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Resource.java b/clients/src/main/java/org/apache/kafka/clients/admin/Resource.java
deleted file mode 100644
index 9148aac..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Resource.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import java.util.Objects;
-
-/**
- * Represents a cluster resource with a tuple of (type, name).
- */
-public class Resource {
- private final ResourceType resourceType;
- private final String name;
-
- public Resource(ResourceType resourceType, String name) {
- Objects.requireNonNull(resourceType);
- this.resourceType = resourceType;
- Objects.requireNonNull(name);
- this.name = name;
- }
-
- public ResourceType resourceType() {
- return resourceType;
- }
-
- public String name() {
- return name;
- }
-
- /**
- * Create a filter which matches only this Resource.
- */
- public ResourceFilter toFilter() {
- return new ResourceFilter(resourceType, name);
- }
-
- @Override
- public String toString() {
- return "(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ")";
- }
-
- /**
- * Return true if this Resource has any UNKNOWN components.
- */
- public boolean unknown() {
- return resourceType.unknown();
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof Resource))
- return false;
- Resource other = (Resource) o;
- return resourceType.equals(other.resourceType) && Objects.equals(name, other.name);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(resourceType, name);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/ResourceFilter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceFilter.java b/clients/src/main/java/org/apache/kafka/clients/admin/ResourceFilter.java
deleted file mode 100644
index 6f453b6..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceFilter.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import java.util.Objects;
-
-/**
- * A filter which matches Resource objects.
- */
-public class ResourceFilter {
- private final ResourceType resourceType;
- private final String name;
-
- public static final ResourceFilter ANY = new ResourceFilter(ResourceType.ANY, null);
-
- public ResourceFilter(ResourceType resourceType, String name) {
- Objects.requireNonNull(resourceType);
- this.resourceType = resourceType;
- this.name = name;
- }
-
- public ResourceType resourceType() {
- return resourceType;
- }
-
- public String name() {
- return name;
- }
-
- @Override
- public String toString() {
- return "(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ")";
- }
-
- /**
- * Return true if this ResourceFilter has any UNKNOWN components.
- */
- public boolean unknown() {
- return resourceType.unknown();
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof ResourceFilter))
- return false;
- ResourceFilter other = (ResourceFilter) o;
- return resourceType.equals(other.resourceType) && Objects.equals(name, other.name);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(resourceType, name);
- }
-
- public boolean matches(Resource other) {
- if ((name != null) && (!name.equals(other.name())))
- return false;
- if ((resourceType != ResourceType.ANY) && (!resourceType.equals(other.resourceType())))
- return false;
- return true;
- }
-
- public boolean matchesAtMostOne() {
- return findIndefiniteField() == null;
- }
-
- public String findIndefiniteField() {
- if (resourceType == ResourceType.ANY)
- return "Resource type is ANY.";
- if (resourceType == ResourceType.UNKNOWN)
- return "Resource type is UNKNOWN.";
- if (name == null)
- return "Resource name is NULL.";
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java b/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java
deleted file mode 100644
index ca4fa0a..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import java.util.HashMap;
-import java.util.Locale;
-
-/**
- * Represents a type of resource which an ACL can be applied to.
- */
-public enum ResourceType {
- /**
- * Represents any ResourceType which this client cannot understand,
- * perhaps because this client is too old.
- */
- UNKNOWN((byte) 0),
-
- /**
- * In a filter, matches any ResourceType.
- */
- ANY((byte) 1),
-
- /**
- * A Kafka topic.
- */
- TOPIC((byte) 2),
-
- /**
- * A consumer group.
- */
- GROUP((byte) 3),
-
- /**
- * The cluster as a whole.
- */
- CLUSTER((byte) 4),
-
- /**
- * A broker.
- */
- BROKER((byte) 5);
-
- private final static HashMap<Byte, ResourceType> CODE_TO_VALUE = new HashMap<>();
-
- static {
- for (ResourceType resourceType : ResourceType.values()) {
- CODE_TO_VALUE.put(resourceType.code, resourceType);
- }
- }
-
- /**
- * Parse the given string as an ACL resource type.
- *
- * @param str The string to parse.
- *
- * @return The ResourceType, or UNKNOWN if the string could not be matched.
- */
- public static ResourceType fromString(String str) throws IllegalArgumentException {
- try {
- return ResourceType.valueOf(str.toUpperCase(Locale.ROOT));
- } catch (IllegalArgumentException e) {
- return UNKNOWN;
- }
- }
-
- public static ResourceType fromCode(byte code) {
- ResourceType resourceType = CODE_TO_VALUE.get(code);
- if (resourceType == null) {
- return UNKNOWN;
- }
- return resourceType;
- }
-
- private final byte code;
-
- ResourceType(byte code) {
- this.code = code;
- }
-
- public byte code() {
- return code;
- }
-
- public boolean unknown() {
- return this == UNKNOWN;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
index f13dfff..bf1431e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.admin;
+import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.NavigableMap;
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java
deleted file mode 100644
index 5241602..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.utils.Utils;
-
-import java.util.List;
-
-public class TopicPartitionInfo {
- private final int partition;
- private final Node leader;
- private final List<Node> replicas;
- private final List<Node> isr;
-
- public TopicPartitionInfo(int partition, Node leader, List<Node> replicas, List<Node> isr) {
- this.partition = partition;
- this.leader = leader;
- this.replicas = replicas;
- this.isr = isr;
- }
-
- public int partition() {
- return partition;
- }
-
- public Node leader() {
- return leader;
- }
-
- public List<Node> replicas() {
- return replicas;
- }
-
- public List<Node> isr() {
- return isr;
- }
-
- public String toString() {
- return "(partition=" + partition + ", leader=" + leader + ", replicas=" +
- Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java
new file mode 100644
index 0000000..70352bd
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.List;
+
+public class TopicPartitionInfo {
+ private final int partition;
+ private final Node leader;
+ private final List<Node> replicas;
+ private final List<Node> isr;
+
+ public TopicPartitionInfo(int partition, Node leader, List<Node> replicas, List<Node> isr) {
+ this.partition = partition;
+ this.leader = leader;
+ this.replicas = replicas;
+ this.isr = isr;
+ }
+
+ public int partition() {
+ return partition;
+ }
+
+ public Node leader() {
+ return leader;
+ }
+
+ public List<Node> replicas() {
+ return replicas;
+ }
+
+ public List<Node> isr() {
+ return isr;
+ }
+
+ public String toString() {
+ return "(partition=" + partition + ", leader=" + leader + ", replicas=" +
+ Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java
new file mode 100644
index 0000000..68464b3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;
+
+import java.util.Objects;
+
+/**
+ * Represents an access control entry. ACEs are a tuple of principal, host,
+ * operation, and permissionType.
+ */
+public class AccessControlEntry {
+ final AccessControlEntryData data;
+
+ public AccessControlEntry(String principal, String host, AclOperation operation, AclPermissionType permissionType) {
+ Objects.requireNonNull(principal);
+ Objects.requireNonNull(host);
+ Objects.requireNonNull(operation);
+ assert operation != AclOperation.ANY;
+ Objects.requireNonNull(permissionType);
+ assert permissionType != AclPermissionType.ANY;
+ this.data = new AccessControlEntryData(principal, host, operation, permissionType);
+ }
+
+ public String principal() {
+ return data.principal();
+ }
+
+ public String host() {
+ return data.host();
+ }
+
+ public AclOperation operation() {
+ return data.operation();
+ }
+
+ public AclPermissionType permissionType() {
+ return data.permissionType();
+ }
+
+ /**
+ * Create a filter which matches only this AccessControlEntry.
+ */
+ public AccessControlEntryFilter toFilter() {
+ return new AccessControlEntryFilter(data);
+ }
+
+ @Override
+ public String toString() {
+ return data.toString();
+ }
+
+ /**
+ * Return true if this AclResource has any UNKNOWN components.
+ */
+ public boolean unknown() {
+ return data.unknown();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof AccessControlEntry))
+ return false;
+ AccessControlEntry other = (AccessControlEntry) o;
+ return data.equals(other.data);
+ }
+
+ @Override
+ public int hashCode() {
+ return data.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java
new file mode 100644
index 0000000..cf69263
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;
+
+import java.util.Objects;
+
+/**
+ * An internal, private class which contains the data stored in AccessControlEntry and
+ * AccessControlEntryFilter objects.
+ */
+class AccessControlEntryData {
+ private final String principal;
+ private final String host;
+ private final AclOperation operation;
+ private final AclPermissionType permissionType;
+
+ AccessControlEntryData(String principal, String host, AclOperation operation, AclPermissionType permissionType) {
+ this.principal = principal;
+ this.host = host;
+ this.operation = operation;
+ this.permissionType = permissionType;
+ }
+
+ String principal() {
+ return principal;
+ }
+
+ String host() {
+ return host;
+ }
+
+ AclOperation operation() {
+ return operation;
+ }
+
+ AclPermissionType permissionType() {
+ return permissionType;
+ }
+
+ /**
+ * Returns a string describing an ANY or UNKNOWN field, or null if there is
+ * no such field.
+ */
+ public String findIndefiniteField() {
+ if (principal() == null)
+ return "Principal is NULL";
+ if (host() == null)
+ return "Host is NULL";
+ if (operation() == AclOperation.ANY)
+ return "Operation is ANY";
+ if (operation() == AclOperation.UNKNOWN)
+ return "Operation is UNKNOWN";
+ if (permissionType() == AclPermissionType.ANY)
+ return "Permission type is ANY";
+ if (permissionType() == AclPermissionType.UNKNOWN)
+ return "Permission type is UNKNOWN";
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "(principal=" + (principal == null ? "<any>" : principal) +
+ ", host=" + (host == null ? "<any>" : host) +
+ ", operation=" + operation +
+ ", permissionType=" + permissionType + ")";
+ }
+
+ /**
+ * Return true if there are any UNKNOWN components.
+ */
+ boolean unknown() {
+ return operation.unknown() || permissionType.unknown();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof AccessControlEntryData))
+ return false;
+ AccessControlEntryData other = (AccessControlEntryData) o;
+ return Objects.equals(principal, other.principal) &&
+ Objects.equals(host, other.host) &&
+ Objects.equals(operation, other.operation) &&
+ Objects.equals(permissionType, other.permissionType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(principal, host, operation, permissionType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java
new file mode 100644
index 0000000..7817865
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;
+
+import java.util.Objects;
+
+/**
+ * Represents a filter which matches access control entries.
+ */
+public class AccessControlEntryFilter {
+ private final AccessControlEntryData data;
+
+ public static final AccessControlEntryFilter ANY =
+ new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY);
+
+ public AccessControlEntryFilter(String principal, String host, AclOperation operation, AclPermissionType permissionType) {
+ Objects.requireNonNull(operation);
+ Objects.requireNonNull(permissionType);
+ this.data = new AccessControlEntryData(principal, host, operation, permissionType);
+ }
+
+ /**
+ * This is a non-public constructor used in AccessControlEntry#toFilter
+ *
+ * @param data The access control data.
+ */
+ AccessControlEntryFilter(AccessControlEntryData data) {
+ this.data = data;
+ }
+
+ public String principal() {
+ return data.principal();
+ }
+
+ public String host() {
+ return data.host();
+ }
+
+ public AclOperation operation() {
+ return data.operation();
+ }
+
+ public AclPermissionType permissionType() {
+ return data.permissionType();
+ }
+
+ @Override
+ public String toString() {
+ return data.toString();
+ }
+
+ /**
+ * Return true if there are any UNKNOWN components.
+ */
+ public boolean unknown() {
+ return data.unknown();
+ }
+
+ /**
+ * Returns true if this filter matches the given AccessControlEntry.
+ */
+ public boolean matches(AccessControlEntry other) {
+ if ((principal() != null) && (!data.principal().equals(other.principal())))
+ return false;
+ if ((host() != null) && (!host().equals(other.host())))
+ return false;
+ if ((operation() != AclOperation.ANY) && (!operation().equals(other.operation())))
+ return false;
+ if ((permissionType() != AclPermissionType.ANY) && (!permissionType().equals(other.permissionType())))
+ return false;
+ return true;
+ }
+
+ /**
+ * Returns true if this filter could only match one ACE-- in other words, if
+ * there are no ANY or UNKNOWN fields.
+ */
+ public boolean matchesAtMostOne() {
+ return findIndefiniteField() == null;
+ }
+
+ /**
+ * Returns a string describing an ANY or UNKNOWN field, or null if there is
+ * no such field.
+ */
+ public String findIndefiniteField() {
+ return data.findIndefiniteField();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof AccessControlEntryFilter))
+ return false;
+ AccessControlEntryFilter other = (AccessControlEntryFilter) o;
+ return data.equals(other.data);
+ }
+
+ @Override
+ public int hashCode() {
+ return data.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
new file mode 100644
index 0000000..91c1c79
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;
+
+import org.apache.kafka.common.resource.Resource;
+
+import java.util.Objects;
+
+/**
+ * Represents a binding between a resource and an access control entry.
+ */
+public class AclBinding {
+ private final Resource resource;
+ private final AccessControlEntry entry;
+
+ public AclBinding(Resource resource, AccessControlEntry entry) {
+ Objects.requireNonNull(resource);
+ this.resource = resource;
+ Objects.requireNonNull(entry);
+ this.entry = entry;
+ }
+
+ /**
+ * Return true if this binding has any UNKNOWN components.
+ */
+ public boolean unknown() {
+ return resource.unknown() || entry.unknown();
+ }
+
+ public Resource resource() {
+ return resource;
+ }
+
+ public final AccessControlEntry entry() {
+ return entry;
+ }
+
+ /**
+ * Create a filter which matches only this AclBinding.
+ */
+ public AclBindingFilter toFilter() {
+ return new AclBindingFilter(resource.toFilter(), entry.toFilter());
+ }
+
+ @Override
+ public String toString() {
+ return "(resource=" + resource + ", entry=" + entry + ")";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof AclBinding))
+ return false;
+ AclBinding other = (AclBinding) o;
+ return resource.equals(other.resource) && entry.equals(other.entry);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(resource, entry);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
new file mode 100644
index 0000000..765fac2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;
+
+import org.apache.kafka.common.resource.ResourceFilter;
+import org.apache.kafka.common.resource.ResourceType;
+
+import java.util.Objects;
+
+/**
+ * A filter which can match AclBinding objects.
+ */
+public class AclBindingFilter {
+ private final ResourceFilter resourceFilter;
+ private final AccessControlEntryFilter entryFilter;
+
+ /**
+ * A filter which matches any ACL binding.
+ */
+ public static final AclBindingFilter ANY = new AclBindingFilter(
+ new ResourceFilter(ResourceType.ANY, null),
+ new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY));
+
+ public AclBindingFilter(ResourceFilter resourceFilter, AccessControlEntryFilter entryFilter) {
+ Objects.requireNonNull(resourceFilter);
+ this.resourceFilter = resourceFilter;
+ Objects.requireNonNull(entryFilter);
+ this.entryFilter = entryFilter;
+ }
+
+ /**
+ * Return true if this filter has any UNKNOWN components.
+ */
+ public boolean unknown() {
+ return resourceFilter.unknown() || entryFilter.unknown();
+ }
+
+ public ResourceFilter resourceFilter() {
+ return resourceFilter;
+ }
+
+ public final AccessControlEntryFilter entryFilter() {
+ return entryFilter;
+ }
+
+ @Override
+ public String toString() {
+ return "(resourceFilter=" + resourceFilter + ", entryFilter=" + entryFilter + ")";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof AclBindingFilter))
+ return false;
+ AclBindingFilter other = (AclBindingFilter) o;
+ return resourceFilter.equals(other.resourceFilter) && entryFilter.equals(other.entryFilter);
+ }
+
+ public boolean matchesAtMostOne() {
+ return resourceFilter.matchesAtMostOne() && entryFilter.matchesAtMostOne();
+ }
+
+ public String findIndefiniteField() {
+ String indefinite = resourceFilter.findIndefiniteField();
+ if (indefinite != null)
+ return indefinite;
+ return entryFilter.findIndefiniteField();
+ }
+
+ public boolean matches(AclBinding binding) {
+ return resourceFilter.matches(binding.resource()) && entryFilter.matches(binding.entry());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(resourceFilter, entryFilter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
new file mode 100644
index 0000000..c63320d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;
+
+import java.util.HashMap;
+import java.util.Locale;
+
+/**
+ * Represents an operation which an ACL grants or denies permission to perform.
+ */
+public enum AclOperation {
+ /**
+ * Represents any AclOperation which this client cannot understand, perhaps because this
+ * client is too old.
+ */
+ UNKNOWN((byte) 0),
+
+ /**
+ * In a filter, matches any AclOperation.
+ */
+ ANY((byte) 1),
+
+ /**
+ * ALL operation.
+ */
+ ALL((byte) 2),
+
+ /**
+ * READ operation.
+ */
+ READ((byte) 3),
+
+ /**
+ * WRITE operation.
+ */
+ WRITE((byte) 4),
+
+ /**
+ * CREATE operation.
+ */
+ CREATE((byte) 5),
+
+ /**
+ * DELETE operation.
+ */
+ DELETE((byte) 6),
+
+ /**
+ * ALTER operation.
+ */
+ ALTER((byte) 7),
+
+ /**
+ * DESCRIBE operation.
+ */
+ DESCRIBE((byte) 8),
+
+ /**
+ * CLUSTER_ACTION operation.
+ */
+ CLUSTER_ACTION((byte) 9),
+
+ /**
+ * DESCRIBE_CONFIGS operation.
+ */
+ DESCRIBE_CONFIGS((byte) 10),
+
+ /**
+ * ALTER_CONFIGS operation.
+ */
+ ALTER_CONFIGS((byte) 11),
+
+ /**
+ * IDEMPOTENT_WRITE operation.
+ */
+ IDEMPOTENT_WRITE((byte) 12);
+
+ private final static HashMap<Byte, AclOperation> CODE_TO_VALUE = new HashMap<>();
+
+ static {
+ for (AclOperation operation : AclOperation.values()) {
+ CODE_TO_VALUE.put(operation.code, operation);
+ }
+ }
+
+ /**
+ * Parse the given string as an ACL operation.
+ *
+ * @param str The string to parse.
+ *
+ * @return The AclOperation, or UNKNOWN if the string could not be matched.
+ */
+ public static AclOperation fromString(String str) throws IllegalArgumentException {
+ try {
+ return AclOperation.valueOf(str.toUpperCase(Locale.ROOT));
+ } catch (IllegalArgumentException e) {
+ return UNKNOWN;
+ }
+ }
+
+ public static AclOperation fromCode(byte code) {
+ AclOperation operation = CODE_TO_VALUE.get(code);
+ if (operation == null) {
+ return UNKNOWN;
+ }
+ return operation;
+ }
+
+ private final byte code;
+
+ AclOperation(byte code) {
+ this.code = code;
+ }
+
+ public byte code() {
+ return code;
+ }
+
+ public boolean unknown() {
+ return this == UNKNOWN;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java b/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
new file mode 100644
index 0000000..8c77938
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;
+
+import java.util.HashMap;
+import java.util.Locale;
+
+/**
+ * Represents whether an ACL grants or denies permissions.
+ */
+public enum AclPermissionType {
+ /**
+ * Represents any AclPermissionType which this client cannot understand,
+ * perhaps because this client is too old.
+ */
+ UNKNOWN((byte) 0),
+
+ /**
+ * In a filter, matches any AclPermissionType.
+ */
+ ANY((byte) 1),
+
+ /**
+ * Disallows access.
+ */
+ DENY((byte) 2),
+
+ /**
+ * Grants access.
+ */
+ ALLOW((byte) 3);
+
+ private final static HashMap<Byte, AclPermissionType> CODE_TO_VALUE = new HashMap<>();
+
+ static {
+ for (AclPermissionType permissionType : AclPermissionType.values()) {
+ CODE_TO_VALUE.put(permissionType.code, permissionType);
+ }
+ }
+
+ /**
+ * Parse the given string as an ACL permission.
+ *
+ * @param str The string to parse.
+ *
+ * @return The AclPermissionType, or UNKNOWN if the string could not be matched.
+ */
+ public static AclPermissionType fromString(String str) {
+ try {
+ return AclPermissionType.valueOf(str.toUpperCase(Locale.ROOT));
+ } catch (IllegalArgumentException e) {
+ return UNKNOWN;
+ }
+ }
+
+ public static AclPermissionType fromCode(byte code) {
+ AclPermissionType permissionType = CODE_TO_VALUE.get(code);
+ if (permissionType == null) {
+ return UNKNOWN;
+ }
+ return permissionType;
+ }
+
+ private final byte code;
+
+ AclPermissionType(byte code) {
+ this.code = code;
+ }
+
+ public byte code() {
+ return code;
+ }
+
+ public boolean unknown() {
+ return this == UNKNOWN;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
new file mode 100644
index 0000000..5395671
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.config;
+
+public final class ConfigResource {
+
+ public enum Type {
+ BROKER, TOPIC, UNKNOWN;
+ }
+
+ private final Type type;
+ private final String name;
+
+ public ConfigResource(Type type, String name) {
+ this.type = type;
+ this.name = name;
+ }
+
+ public Type type() {
+ return type;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ConfigResource that = (ConfigResource) o;
+
+ return type == that.type && name.equals(that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = type.hashCode();
+ result = 31 * result + name.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "ConfigResource{type=" + type + ", name='" + name + "'}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
new file mode 100755
index 0000000..554c97b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.config;
+
+/**
+ * Keys that can be used to configure a topic. These keys are useful when creating or reconfiguring a
+ * topic using the AdminClient.
+ *
+ * The intended pattern is for broker configs to include a `log.` prefix. For example, to set the default broker
+ * cleanup policy, one would set log.cleanup.policy instead of cleanup.policy. Unfortunately, there are many cases
+ * where this pattern is not followed.
+ */
+// This is a public API, so we should not remove or alter keys without a discussion and a deprecation period.
+// Eventually this should replace LogConfig.scala.
+public class TopicConfig {
+ public static final String SEGMENT_BYTES_CONFIG = "segment.bytes";
+ public static final String SEGMENT_BYTES_DOC = "This configuration controls the segment file size for " +
+ "the log. Retention and cleaning is always done a file at a time so a larger segment size means " +
+ "fewer files but less granular control over retention.";
+
+ public static final String SEGMENT_MS_CONFIG = "segment.ms";
+ public static final String SEGMENT_MS_DOC = "This configuration controls the period of time after " +
+ "which Kafka will force the log to roll even if the segment file isn't full to ensure that retention " +
+ "can delete or compact old data.";
+
+ public static final String SEGMENT_JITTER_MS_CONFIG = "segment.jitter.ms";
+ public static final String SEGMENT_JITTER_MS_DOC = "The maximum random jitter subtracted from the scheduled " +
+ "segment roll time to avoid thundering herds of segment rolling";
+
+ public static final String SEGMENT_INDEX_BYTES_CONFIG = "segment.index.bytes";
+ public static final String SEGMENT_INDEX_BYTES_DOC = "This configuration controls the size of the index that " +
+ "maps offsets to file positions. We preallocate this index file and shrink it only after log " +
+ "rolls. You generally should not need to change this setting.";
+
+ public static final String FLUSH_MESSAGES_INTERVAL_CONFIG = "flush.messages";
+ public static final String FLUSH_MESSAGES_INTERVAL_DOC = "This setting allows specifying an interval at " +
+ "which we will force an fsync of data written to the log. For example if this was set to 1 " +
+ "we would fsync after every message; if it were 5 we would fsync after every five messages. " +
+ "In general we recommend you not set this and use replication for durability and allow the " +
+ "operating system's background flush capabilities as it is more efficient. This setting can " +
+ "be overridden on a per-topic basis (see <a href=\"#topic-config\">the per-topic configuration section</a>).";
+
+ public static final String FLUSH_MS_CONFIG = "flush.ms";
+ public static final String FLUSH_MS_DOC = "This setting allows specifying a time interval at which we will " +
+ "force an fsync of data written to the log. For example if this was set to 1000 " +
+ "we would fsync after 1000 ms had passed. In general we recommend you not set " +
+ "this and use replication for durability and allow the operating system's background " +
+ "flush capabilities as it is more efficient.";
+
+ public static final String RETENTION_BYTES_CONFIG = "retention.bytes";
+ public static final String RETENTION_BYTES_DOC = "This configuration controls the maximum size a log can grow " +
+ "to before we will discard old log segments to free up space if we are using the " +
+ "\"delete\" retention policy. By default there is no size limit only a time limit.";
+
+ public static final String RETENTION_MS_CONFIG = "retention.ms";
+ public static final String RETENTION_MS_DOC = "This configuration controls the maximum time we will retain a " +
+ "log before we will discard old log segments to free up space if we are using the " +
+ "\"delete\" retention policy. This represents an SLA on how soon consumers must read " +
+ "their data.";
+
+ public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes";
+ public static final String MAX_MESSAGE_BYTES_DOC = "This is largest message size Kafka will allow to be " +
+ "appended. Note that if you increase this size you must also increase your consumer's fetch size so " +
+ "they can fetch messages this large.";
+
+ public static final String INDEX_INTERVAL_BYTES_CONFIG = "index.interval.bytes";
+ public static final String INDEX_INTERVAL_BYTES_DOCS = "This setting controls how frequently " +
+ "Kafka adds an index entry to it's offset index. The default setting ensures that we index a " +
+ "message roughly every 4096 bytes. More indexing allows reads to jump closer to the exact " +
+ "position in the log but makes the index larger. You probably don't need to change this.";
+
+ public static final String FILE_DELETE_DELAY_MS_CONFIG = "file.delete.delay.ms";
+ public static final String FILE_DELETE_DELAY_MS_DOC = "The time to wait before deleting a file from the " +
+ "filesystem";
+
+ public static final String DELETE_RETENTION_MS_CONFIG = "delete.retention.ms";
+ public static final String DELETE_RETENTION_MS_DOC = "The amount of time to retain delete tombstone markers " +
+ "for <a href=\"#compaction\">log compacted</a> topics. This setting also gives a bound " +
+ "on the time in which a consumer must complete a read if they begin from offset 0 " +
+ "to ensure that they get a valid snapshot of the final stage (otherwise delete " +
+ "tombstones may be collected before they complete their scan).";
+
+ public static final String MIN_COMPACTION_LAG_MS_CONFIG = "min.compaction.lag.ms";
+ public static final String MIN_COMPACTION_LAG_MS_DOC = "The minimum time a message will remain " +
+ "uncompacted in the log. Only applicable for logs that are being compacted.";
+
+ public static final String MIN_CLEANABLE_DIRTY_RATIO_CONFIG = "min.cleanable.dirty.ratio";
+ public static final String MIN_CLEANABLE_DIRTY_RATIO_DOC = "This configuration controls how frequently " +
+ "the log compactor will attempt to clean the log (assuming <a href=\"#compaction\">log " +
+ "compaction</a> is enabled). By default we will avoid cleaning a log where more than " +
+ "50% of the log has been compacted. This ratio bounds the maximum space wasted in " +
+ "the log by duplicates (at 50% at most 50% of the log could be duplicates). A " +
+ "higher ratio will mean fewer, more efficient cleanings but will mean more wasted " +
+ "space in the log.";
+
+ public static final String CLEANUP_POLICY_CONFIG = "cleanup.policy";
+ public static final String CLEANUP_POLICY_COMPACT = "compact";
+ public static final String CLEANUP_POLICY_DELETE = "delete";
+ public static final String CLEANUP_POLICY_DOC = "A string that is either \"" + CLEANUP_POLICY_DELETE +
+ "\" or \"" + CLEANUP_POLICY_COMPACT + "\". This string designates the retention policy to use on " +
+ "old log segments. The default policy (\"delete\") will discard old segments when their retention " +
+ "time or size limit has been reached. The \"compact\" setting will enable <a href=\"#compaction\">log " +
+ "compaction</a> on the topic.";
+
+ public static final String UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG = "unclean.leader.election.enable";
+ public static final String UNCLEAN_LEADER_ELECTION_ENABLE_DOC = "Indicates whether to enable replicas " +
+ "not in the ISR set to be elected as leader as a last resort, even though doing so may result in data " +
+ "loss.";
+
+ public static final String MIN_IN_SYNC_REPLICAS_CONFIG = "min.insync.replicas";
+ public static final String MIN_IN_SYNC_REPLICAS_DOC = "When a producer sets acks to \"all\" (or \"-1\"), " +
+ "this configuration specifies the minimum number of replicas that must acknowledge " +
+ "a write for the write to be considered successful. If this minimum cannot be met, " +
+ "then the producer will raise an exception (either NotEnoughReplicas or " +
+ "NotEnoughReplicasAfterAppend).<br>When used together, min.insync.replicas and acks " +
+ "allow you to enforce greater durability guarantees. A typical scenario would be to " +
+ "create a topic with a replication factor of 3, set min.insync.replicas to 2, and " +
+ "produce with acks of \"all\". This will ensure that the producer raises an exception " +
+ "if a majority of replicas do not receive a write.";
+
+ public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
+ public static final String COMPRESSION_TYPE_DOC = "Specify the final compression type for a given topic. " +
+ "This configuration accepts the standard compression codecs ('gzip', 'snappy', lz4). It additionally " +
+ "accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the " +
+ "original compression codec set by the producer.";
+
+ public static final String PREALLOCATE_CONFIG = "preallocate";
+ public static final String PREALLOCATE_DOC = "True if we should preallocate the file on disk when " +
+ "creating a new log segment.";
+
+ public static final String MESSAGE_FORMAT_VERSION_CONFIG = "message.format.version";
+ public static final String MESSAGE_FORMAT_VERSION_DOC = "Specify the message format version the broker " +
+ "will use to append messages to the logs. The value should be a valid ApiVersion. Some examples are: " +
+ "0.8.2, 0.9.0.0, 0.10.0, check ApiVersion for more details. By setting a particular message format " +
+ "version, the user is certifying that all the existing messages on disk are smaller or equal than the " +
+ "specified version. Setting this value incorrectly will cause consumers with older versions to break as " +
+ "they will receive messages with a format that they don't understand.";
+
+ public static final String MESSAGE_TIMESTAMP_TYPE_CONFIG = "message.timestamp.type";
+ public static final String MESSAGE_TIMESTAMP_TYPE_DOC = "Define whether the timestamp in the message is " +
+ "message create time or log append time. The value should be either `CreateTime` or `LogAppendTime`";
+
+ public static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG = "message.timestamp.difference.max.ms";
+ public static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC = "The maximum difference allowed between " +
+ "the timestamp when a broker receives a message and the timestamp specified in the message. If " +
+ "message.timestamp.type=CreateTime, a message will be rejected if the difference in timestamp " +
+ "exceeds this threshold. This configuration is ignored if message.timestamp.type=LogAppendTime.";
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
index f792bbd..757b5af 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
@@ -17,11 +17,11 @@
package org.apache.kafka.common.requests;
-import org.apache.kafka.clients.admin.AccessControlEntry;
-import org.apache.kafka.clients.admin.AclBinding;
-import org.apache.kafka.clients.admin.Resource;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.resource.Resource;
import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
index 8a9ee19..246b5e5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
@@ -16,11 +16,11 @@
*/
package org.apache.kafka.common.requests;
-import org.apache.kafka.clients.admin.AccessControlEntryFilter;
-import org.apache.kafka.clients.admin.AclBindingFilter;
-import org.apache.kafka.clients.admin.ResourceFilter;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.resource.ResourceFilter;
import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;