You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/01 23:24:48 UTC
[1/3] kafka git commit: KAFKA-3265;
Add a public AdminClient API in Java (KIP-117)
Repository: kafka
Updated Branches:
refs/heads/trunk c96656efb -> 4aed28d18
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 80e9191..65bec4a 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -83,114 +83,388 @@ import java.util.Map;
* Do not add exceptions that occur only on the client or only on the server here.
*/
public enum Errors {
- UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")),
- NONE(0, null),
- OFFSET_OUT_OF_RANGE(1,
- new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
- CORRUPT_MESSAGE(2,
- new CorruptRecordException("This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.")),
- UNKNOWN_TOPIC_OR_PARTITION(3,
- new UnknownTopicOrPartitionException("This server does not host this topic-partition.")),
- INVALID_FETCH_SIZE(4,
- new InvalidFetchSizeException("The requested fetch size is invalid.")),
- LEADER_NOT_AVAILABLE(5,
- new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
- NOT_LEADER_FOR_PARTITION(6,
- new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
- REQUEST_TIMED_OUT(7,
- new TimeoutException("The request timed out.")),
- BROKER_NOT_AVAILABLE(8,
- new BrokerNotAvailableException("The broker is not available.")),
- REPLICA_NOT_AVAILABLE(9,
- new ReplicaNotAvailableException("The replica is not available for the requested topic-partition")),
- MESSAGE_TOO_LARGE(10,
- new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
- STALE_CONTROLLER_EPOCH(11,
- new ControllerMovedException("The controller moved to another broker.")),
- OFFSET_METADATA_TOO_LARGE(12,
- new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
- NETWORK_EXCEPTION(13,
- new NetworkException("The server disconnected before a response was received.")),
- COORDINATOR_LOAD_IN_PROGRESS(14,
- new CoordinatorLoadInProgressException("The coordinator is loading and hence can't process requests.")),
- COORDINATOR_NOT_AVAILABLE(15,
- new CoordinatorNotAvailableException("The coordinator is not available.")),
- NOT_COORDINATOR(16,
- new NotCoordinatorException("This is not the correct coordinator.")),
- INVALID_TOPIC_EXCEPTION(17,
- new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
- RECORD_LIST_TOO_LARGE(18,
- new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")),
- NOT_ENOUGH_REPLICAS(19,
- new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")),
- NOT_ENOUGH_REPLICAS_AFTER_APPEND(20,
- new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")),
- INVALID_REQUIRED_ACKS(21,
- new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")),
- ILLEGAL_GENERATION(22,
- new IllegalGenerationException("Specified group generation id is not valid.")),
+ UNKNOWN(-1, "The server experienced an unexpected error when processing the request",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new UnknownServerException(message);
+ }
+ }),
+ NONE(0, null,
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return null;
+ }
+ }),
+ OFFSET_OUT_OF_RANGE(1, "The requested offset is not within the range of offsets maintained by the server.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new OffsetOutOfRangeException(message);
+ }
+ }),
+ CORRUPT_MESSAGE(2, "This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new CorruptRecordException(message);
+ }
+ }),
+ UNKNOWN_TOPIC_OR_PARTITION(3, "This server does not host this topic-partition.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new UnknownTopicOrPartitionException(message);
+ }
+ }),
+ INVALID_FETCH_SIZE(4, "The requested fetch size is invalid.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new InvalidFetchSizeException(message);
+ }
+ }),
+ LEADER_NOT_AVAILABLE(5, "There is no leader for this topic-partition as we are in the middle of a leadership election.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new LeaderNotAvailableException(message);
+ }
+ }),
+ NOT_LEADER_FOR_PARTITION(6, "This server is not the leader for that topic-partition.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new NotLeaderForPartitionException(message);
+ }
+ }),
+ REQUEST_TIMED_OUT(7, "The request timed out.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new TimeoutException(message);
+ }
+ }),
+ BROKER_NOT_AVAILABLE(8, "The broker is not available.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new BrokerNotAvailableException(message);
+ }
+ }),
+ REPLICA_NOT_AVAILABLE(9, "The replica is not available for the requested topic-partition",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new ReplicaNotAvailableException(message);
+ }
+ }),
+ MESSAGE_TOO_LARGE(10, "The request included a message larger than the max message size the server will accept.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new RecordTooLargeException(message);
+ }
+ }),
+ STALE_CONTROLLER_EPOCH(11, "The controller moved to another broker.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new ControllerMovedException(message);
+ }
+ }),
+ OFFSET_METADATA_TOO_LARGE(12, "The metadata field of the offset request was too large.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new OffsetMetadataTooLarge(message);
+ }
+ }),
+ NETWORK_EXCEPTION(13, "The server disconnected before a response was received.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new NetworkException(message);
+ }
+ }),
+ COORDINATOR_LOAD_IN_PROGRESS(14, "The coordinator is loading and hence can't process requests.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new CoordinatorLoadInProgressException(message);
+ }
+ }),
+ COORDINATOR_NOT_AVAILABLE(15, "The coordinator is not available.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new CoordinatorNotAvailableException(message);
+ }
+ }),
+ NOT_COORDINATOR(16, "This is not the correct coordinator.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new NotCoordinatorException(message);
+ }
+ }),
+ INVALID_TOPIC_EXCEPTION(17, "The request attempted to perform an operation on an invalid topic.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new InvalidTopicException(message);
+ }
+ }),
+ RECORD_LIST_TOO_LARGE(18, "The request included message batch larger than the configured segment size on the server.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new RecordBatchTooLargeException(message);
+ }
+ }),
+ NOT_ENOUGH_REPLICAS(19, "Messages are rejected since there are fewer in-sync replicas than required.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new NotEnoughReplicasException(message);
+ }
+ }),
+ NOT_ENOUGH_REPLICAS_AFTER_APPEND(20, "Messages are written to the log, but to fewer in-sync replicas than required.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new NotEnoughReplicasAfterAppendException(message);
+ }
+ }),
+ INVALID_REQUIRED_ACKS(21, "Produce request specified an invalid value for required acks.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new InvalidRequiredAcksException(message);
+ }
+ }),
+ ILLEGAL_GENERATION(22, "Specified group generation id is not valid.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new IllegalGenerationException(message);
+ }
+ }),
INCONSISTENT_GROUP_PROTOCOL(23,
- new InconsistentGroupProtocolException("The group member's supported protocols are incompatible with those of existing members.")),
- INVALID_GROUP_ID(24,
- new InvalidGroupIdException("The configured groupId is invalid")),
- UNKNOWN_MEMBER_ID(25,
- new UnknownMemberIdException("The coordinator is not aware of this member.")),
+ "The group member's supported protocols are incompatible with those of existing members.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new InconsistentGroupProtocolException(message);
+ }
+ }),
+ INVALID_GROUP_ID(24, "The configured groupId is invalid",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new InvalidGroupIdException(message);
+ }
+ }),
+ UNKNOWN_MEMBER_ID(25, "The coordinator is not aware of this member.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new UnknownMemberIdException(message);
+ }
+ }),
INVALID_SESSION_TIMEOUT(26,
- new InvalidSessionTimeoutException("The session timeout is not within the range allowed by the broker " +
- "(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms).")),
- REBALANCE_IN_PROGRESS(27,
- new RebalanceInProgressException("The group is rebalancing, so a rejoin is needed.")),
- INVALID_COMMIT_OFFSET_SIZE(28,
- new InvalidCommitOffsetSizeException("The committing offset data size is not valid")),
- TOPIC_AUTHORIZATION_FAILED(29,
- new TopicAuthorizationException("Topic authorization failed.")),
- GROUP_AUTHORIZATION_FAILED(30,
- new GroupAuthorizationException("Group authorization failed.")),
- CLUSTER_AUTHORIZATION_FAILED(31,
- new ClusterAuthorizationException("Cluster authorization failed.")),
- INVALID_TIMESTAMP(32,
- new InvalidTimestampException("The timestamp of the message is out of acceptable range.")),
- UNSUPPORTED_SASL_MECHANISM(33,
- new UnsupportedSaslMechanismException("The broker does not support the requested SASL mechanism.")),
- ILLEGAL_SASL_STATE(34,
- new IllegalSaslStateException("Request is not valid given the current SASL state.")),
- UNSUPPORTED_VERSION(35,
- new UnsupportedVersionException("The version of API is not supported.")),
- TOPIC_ALREADY_EXISTS(36,
- new TopicExistsException("Topic with this name already exists.")),
- INVALID_PARTITIONS(37,
- new InvalidPartitionsException("Number of partitions is invalid.")),
- INVALID_REPLICATION_FACTOR(38,
- new InvalidReplicationFactorException("Replication-factor is invalid.")),
- INVALID_REPLICA_ASSIGNMENT(39,
- new InvalidReplicaAssignmentException("Replica assignment is invalid.")),
- INVALID_CONFIG(40,
- new InvalidConfigurationException("Configuration is invalid.")),
- NOT_CONTROLLER(41,
- new NotControllerException("This is not the correct controller for this cluster.")),
- INVALID_REQUEST(42,
- new InvalidRequestException("This most likely occurs because of a request being malformed by the client library or" +
- " the message was sent to an incompatible broker. See the broker logs for more details.")),
- UNSUPPORTED_FOR_MESSAGE_FORMAT(43,
- new UnsupportedForMessageFormatException("The message format version on the broker does not support the request.")),
- POLICY_VIOLATION(44,
- new PolicyViolationException("Request parameters do not satisfy the configured policy.")),
- OUT_OF_ORDER_SEQUENCE_NUMBER(45,
- new OutOfOrderSequenceException("The broker received an out of order sequence number")),
- DUPLICATE_SEQUENCE_NUMBER(46,
- new DuplicateSequenceNumberException("The broker received a duplicate sequence number")),
- INVALID_PRODUCER_EPOCH(47,
- new ProducerFencedException("Producer attempted an operation with an old epoch")),
- INVALID_TXN_STATE(48,
- new InvalidTxnStateException("The producer attempted a transactional operation in an invalid state")),
- INVALID_PID_MAPPING(49,
- new InvalidPidMappingException("The PID mapping is invalid")),
- INVALID_TRANSACTION_TIMEOUT(50,
- new InvalidTxnTimeoutException("The transaction timeout is larger than the maximum value allowed by the broker " +
- "(as configured by max.transaction.timeout.ms).")),
- CONCURRENT_TRANSACTIONS(51,
- new ConcurrentTransactionsException("The producer attempted to update a transaction " +
- "while another concurrent operation on the same transaction was ongoing"));
+ "The session timeout is not within the range allowed by the broker " +
+ "(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms).",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new InvalidSessionTimeoutException(message);
+ }
+ }),
+ REBALANCE_IN_PROGRESS(27, "The group is rebalancing, so a rejoin is needed.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new RebalanceInProgressException(message);
+ }
+ }),
+ INVALID_COMMIT_OFFSET_SIZE(28, "The committing offset data size is not valid",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new InvalidCommitOffsetSizeException(message);
+ }
+ }),
+ TOPIC_AUTHORIZATION_FAILED(29, "Topic authorization failed.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new TopicAuthorizationException(message);
+ }
+ }),
+ GROUP_AUTHORIZATION_FAILED(30, "Group authorization failed.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new GroupAuthorizationException(message);
+ }
+ }),
+ CLUSTER_AUTHORIZATION_FAILED(31, "Cluster authorization failed.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new ClusterAuthorizationException(message);
+ }
+ }),
+ INVALID_TIMESTAMP(32, "The timestamp of the message is out of acceptable range.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new InvalidTimestampException(message);
+ }
+ }),
+ UNSUPPORTED_SASL_MECHANISM(33, "The broker does not support the requested SASL mechanism.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new UnsupportedSaslMechanismException(message);
+ }
+ }),
+ ILLEGAL_SASL_STATE(34, "Request is not valid given the current SASL state.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new IllegalSaslStateException(message);
+ }
+ }),
+ UNSUPPORTED_VERSION(35, "The version of API is not supported.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new UnsupportedVersionException(message);
+ }
+ }),
+ TOPIC_ALREADY_EXISTS(36, "Topic with this name already exists.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new TopicExistsException(message);
+ }
+ }),
+ INVALID_PARTITIONS(37, "Number of partitions is invalid.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new InvalidPartitionsException(message);
+ }
+ }),
+ INVALID_REPLICATION_FACTOR(38, "Replication-factor is invalid.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new InvalidReplicationFactorException(message);
+ }
+ }),
+ INVALID_REPLICA_ASSIGNMENT(39, "Replica assignment is invalid.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new InvalidReplicaAssignmentException(message);
+ }
+ }),
+ INVALID_CONFIG(40, "Configuration is invalid.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new InvalidConfigurationException(message);
+ }
+ }),
+ NOT_CONTROLLER(41, "This is not the correct controller for this cluster.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new NotControllerException(message);
+ }
+ }),
+ INVALID_REQUEST(42, "This most likely occurs because of a request being malformed by the " +
+ "client library or the message was sent to an incompatible broker. See the broker logs " +
+ "for more details.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new InvalidRequestException(message);
+ }
+ }),
+ UNSUPPORTED_FOR_MESSAGE_FORMAT(43, "The message format version on the broker does not support the request.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new UnsupportedForMessageFormatException(message);
+ }
+ }),
+ POLICY_VIOLATION(44, "Request parameters do not satisfy the configured policy.",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new PolicyViolationException(message);
+ }
+ }),
+ OUT_OF_ORDER_SEQUENCE_NUMBER(45, "The broker received an out of order sequence number",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new OutOfOrderSequenceException(message);
+ }
+ }),
+ DUPLICATE_SEQUENCE_NUMBER(46, "The broker received a duplicate sequence number",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new DuplicateSequenceNumberException(message);
+ }
+ }),
+ INVALID_PRODUCER_EPOCH(47, "Producer attempted an operation with an old epoch",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new ProducerFencedException(message);
+ }
+ }),
+ INVALID_TXN_STATE(48, "The producer attempted a transactional operation in an invalid state",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new InvalidTxnStateException(message);
+ }
+ }),
+ INVALID_PID_MAPPING(49, "The PID mapping is invalid",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new InvalidPidMappingException(message);
+ }
+ }),
+ INVALID_TRANSACTION_TIMEOUT(50, "The transaction timeout is larger than the maximum value allowed by " +
+ "the broker (as configured by max.transaction.timeout.ms).",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new InvalidTxnTimeoutException(message);
+ }
+ }),
+ CONCURRENT_TRANSACTIONS(51, "The producer attempted to update a transaction " +
+ "while another concurrent operation on the same transaction was ongoing",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new ConcurrentTransactionsException(message);
+ }
+ });
+
+ private interface ApiExceptionBuilder {
+ ApiException build(String message);
+ }
private static final Logger log = LoggerFactory.getLogger(Errors.class);
@@ -206,11 +480,13 @@ public enum Errors {
}
private final short code;
+ private final ApiExceptionBuilder builder;
private final ApiException exception;
- Errors(int code, ApiException exception) {
+ Errors(int code, String defaultExceptionString, ApiExceptionBuilder builder) {
this.code = (short) code;
- this.exception = exception;
+ this.builder = builder;
+ this.exception = builder.build(defaultExceptionString);
}
/**
@@ -221,6 +497,21 @@ public enum Errors {
}
/**
+ * Create an instance of the ApiException that contains the given error message.
+ *
+ * @param message The message string to set.
+ * @return The exception.
+ */
+ public ApiException exception(String message) {
+ if (message == null) {
+ // If no error message was specified, return an exception with the default error message.
+ return exception;
+ }
+ // Return an exception with the given error message.
+ return builder.build(message);
+ }
+
+ /**
* Returns the class name of the exception or null if this is {@code Errors.NONE}.
*/
public String exceptionName() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index 54f9764..2c2b2dd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
@@ -61,6 +62,10 @@ public class CreateTopicsResponse extends AbstractResponse {
return message;
}
+ public ApiException exception() {
+ return error.exception(message);
+ }
+
@Override
public String toString() {
return "Error(error=" + error + ", message=" + message + ")";
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index bd79653..017fdf4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -260,7 +260,8 @@ public class MetadataResponse extends AbstractResponse {
}
}
- return new Cluster(this.clusterId, this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED), internalTopics);
+ return new Cluster(this.clusterId, this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
+ internalTopics, this.controller);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 1a4de98..cb0ff89 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -659,7 +659,7 @@ public class Utils {
/**
* Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level.
*/
- public static void closeQuietly(Closeable closeable, String name) {
+ public static void closeQuietly(AutoCloseable closeable, String name) {
if (closeable != null) {
try {
closeable.close();
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
new file mode 100644
index 0000000..36cb8e8
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.CreateTopicsResponse.Error;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * A unit test for KafkaAdminClient.
+ *
+ * See for an integration test of the KafkaAdminClient.
+ * Also see KafkaAdminClientIntegrationTest for a unit test of the admin client.
+ */
+public class KafkaAdminClientTest {
+ @Rule
+ final public Timeout globalTimeout = Timeout.millis(120000);
+
+ @Test
+ public void testGetOrCreateListValue() {
+ Map<String, List<String>> map = new HashMap<>();
+ List<String> fooList = KafkaAdminClient.getOrCreateListValue(map, "foo");
+ assertNotNull(fooList);
+ fooList.add("a");
+ fooList.add("b");
+ List<String> fooList2 = KafkaAdminClient.getOrCreateListValue(map, "foo");
+ assertEquals(fooList, fooList2);
+ assertTrue(fooList2.contains("a"));
+ assertTrue(fooList2.contains("b"));
+ List<String> barList = KafkaAdminClient.getOrCreateListValue(map, "bar");
+ assertNotNull(barList);
+ assertTrue(barList.isEmpty());
+ }
+
+ @Test
+ public void testCalcTimeoutMsRemainingAsInt() {
+ assertEquals(0, KafkaAdminClient.calcTimeoutMsRemainingAsInt(1000, 1000));
+ assertEquals(100, KafkaAdminClient.calcTimeoutMsRemainingAsInt(1000, 1100));
+ assertEquals(Integer.MAX_VALUE, KafkaAdminClient.calcTimeoutMsRemainingAsInt(0, Long.MAX_VALUE));
+ assertEquals(Integer.MIN_VALUE, KafkaAdminClient.calcTimeoutMsRemainingAsInt(Long.MAX_VALUE, 0));
+ }
+
+ @Test
+ public void testPrettyPrintException() {
+ assertEquals("Null exception.", KafkaAdminClient.prettyPrintException(null));
+ assertEquals("TimeoutException", KafkaAdminClient.prettyPrintException(new TimeoutException()));
+ assertEquals("TimeoutException: The foobar timed out.",
+ KafkaAdminClient.prettyPrintException(new TimeoutException("The foobar timed out.")));
+ }
+
+ private static Map<String, Object> newStrMap(String... vals) {
+ Map<String, Object> map = new HashMap<>();
+ map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121");
+ map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
+ if (vals.length % 2 != 0) {
+ throw new IllegalStateException();
+ }
+ for (int i = 0; i < vals.length; i += 2) {
+ map.put(vals[i], vals[i + 1]);
+ }
+ return map;
+ }
+
+ private static AdminClientConfig newConfMap(String... vals) {
+ return new AdminClientConfig(newStrMap(vals));
+ }
+
+ @Test
+ public void testGenerateClientId() {
+ Set<String> ids = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ String id = KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG, ""));
+ assertTrue("Got duplicate id " + id, !ids.contains(id));
+ ids.add(id);
+ }
+ assertEquals("myCustomId",
+ KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG, "myCustomId")));
+ }
+
+ private static class MockKafkaAdminClientContext implements AutoCloseable {
+ final static String CLUSTER_ID = "mockClusterId";
+ final AdminClientConfig adminClientConfig;
+ final Metadata metadata;
+ final HashMap<Integer, Node> nodes;
+ final MockClient mockClient;
+ final AdminClient client;
+ Cluster cluster;
+
+ MockKafkaAdminClientContext(Map<String, Object> config) {
+ this.adminClientConfig = new AdminClientConfig(config);
+ this.metadata = new Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
+ adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
+ this.nodes = new HashMap<Integer, Node>();
+ this.nodes.put(0, new Node(0, "localhost", 8121));
+ this.nodes.put(1, new Node(1, "localhost", 8122));
+ this.nodes.put(2, new Node(2, "localhost", 8123));
+ this.mockClient = new MockClient(Time.SYSTEM, this.metadata);
+ this.client = KafkaAdminClient.create(adminClientConfig, mockClient, metadata);
+ this.cluster = new Cluster(CLUSTER_ID, nodes.values(),
+ Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(),
+ Collections.<String>emptySet(), nodes.get(0));
+ }
+
+ @Override
+ public void close() {
+ this.client.close();
+ }
+ }
+
+ @Test
+ public void testCloseAdminClient() throws Exception {
+ try (MockKafkaAdminClientContext ctx = new MockKafkaAdminClientContext(newStrMap())) {
+ }
+ }
+
+ private static void assertFutureError(Future<?> future, Class<? extends Throwable> exceptionClass)
+ throws InterruptedException {
+ try {
+ future.get();
+ fail("Expected a " + exceptionClass.getSimpleName() + " exception, but got success.");
+ } catch (ExecutionException ee) {
+ Throwable cause = ee.getCause();
+ assertEquals("Expected a " + exceptionClass.getSimpleName() + " exception, but got " +
+ cause.getClass().getSimpleName(),
+ exceptionClass, cause.getClass());
+ }
+ }
+
+ /**
+ * Test that the client properly times out when we don't receive any metadata.
+ */
+ @Test
+ public void testTimeoutWithoutMetadata() throws Exception {
+ try (MockKafkaAdminClientContext ctx = new MockKafkaAdminClientContext(newStrMap(
+ AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
+ ctx.mockClient.setNodeApiVersions(NodeApiVersions.create());
+ ctx.mockClient.setNode(new Node(0, "localhost", 8121));
+ ctx.mockClient.prepareResponse(new CreateTopicsResponse(new HashMap<String, Error>() {{
+ put("myTopic", new Error(Errors.NONE, ""));
+ }}));
+ KafkaFuture<Void> future = ctx.client.
+ createTopics(Collections.singleton(new NewTopic("myTopic", new HashMap<Integer, List<Integer>>() {{
+ put(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 2}));
+ }})), new CreateTopicsOptions().timeoutMs(1000)).all();
+ assertFutureError(future, TimeoutException.class);
+ }
+ }
+
+ @Test
+ public void testCreateTopics() throws Exception {
+ try (MockKafkaAdminClientContext ctx = new MockKafkaAdminClientContext(newStrMap())) {
+ ctx.mockClient.setNodeApiVersions(NodeApiVersions.create());
+ ctx.mockClient.prepareMetadataUpdate(ctx.cluster, Collections.<String>emptySet());
+ ctx.mockClient.setNode(ctx.nodes.get(0));
+ ctx.mockClient.prepareResponse(new CreateTopicsResponse(new HashMap<String, Error>() {{
+ put("myTopic", new Error(Errors.NONE, ""));
+ }}));
+ KafkaFuture<Void> future = ctx.client.
+ createTopics(Collections.singleton(new NewTopic("myTopic", new HashMap<Integer, List<Integer>>() {{
+ put(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 2}));
+ }})), new CreateTopicsOptions().timeoutMs(10000)).all();
+ future.get();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
new file mode 100644
index 0000000..39868e0
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A unit test for KafkaFuture.
+ */
+public class KafkaFutureTest {
+ @Rule
+ final public Timeout globalTimeout = Timeout.millis(120000);
+
+ @Test
+ public void testCompleteFutures() throws Exception {
+ KafkaFutureImpl<Integer> future123 = new KafkaFutureImpl<>();
+ assertTrue(future123.complete(123));
+ assertEquals(Integer.valueOf(123), future123.get());
+ assertFalse(future123.complete(456));
+ assertTrue(future123.isDone());
+ assertFalse(future123.isCancelled());
+ assertFalse(future123.isCompletedExceptionally());
+
+ KafkaFuture<Integer> future456 = KafkaFuture.completedFuture(456);
+ assertEquals(Integer.valueOf(456), future456.get());
+
+ KafkaFutureImpl<Integer> futureFail = new KafkaFutureImpl<>();
+ futureFail.completeExceptionally(new RuntimeException("We require more vespene gas"));
+ try {
+ futureFail.get();
+ Assert.fail("Expected an exception");
+ } catch (ExecutionException e) {
+ assertEquals(RuntimeException.class, e.getCause().getClass());
+ Assert.assertEquals("We require more vespene gas", e.getCause().getMessage());
+ }
+ }
+
+ @Test
+ public void testCompletingFutures() throws Exception {
+ final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+ CompleterThread myThread = new CompleterThread(future, "You must construct additional pylons.");
+ assertFalse(future.isDone());
+ assertFalse(future.isCompletedExceptionally());
+ assertFalse(future.isCancelled());
+ assertEquals("I am ready", future.getNow("I am ready"));
+ myThread.start();
+ String str = future.get(5, TimeUnit.MINUTES);
+ assertEquals("You must construct additional pylons.", str);
+ assertEquals("You must construct additional pylons.", future.getNow("I am ready"));
+ assertTrue(future.isDone());
+ assertFalse(future.isCompletedExceptionally());
+ assertFalse(future.isCancelled());
+ myThread.join();
+ assertEquals(null, myThread.testException);
+ }
+
+ private static class CompleterThread<T> extends Thread {
+ private final KafkaFutureImpl<T> future;
+ private final T value;
+ Throwable testException = null;
+
+ CompleterThread(KafkaFutureImpl<T> future, T value) {
+ this.future = future;
+ this.value = value;
+ }
+
+ @Override
+ public void run() {
+ try {
+ try {
+ Thread.sleep(0, 200);
+ } catch (InterruptedException e) {
+ }
+ future.complete(value);
+ } catch (Throwable testException) {
+ this.testException = testException;
+ }
+ }
+ }
+
+ private static class WaiterThread<T> extends Thread {
+ private final KafkaFutureImpl<T> future;
+ private final T expected;
+ Throwable testException = null;
+
+ WaiterThread(KafkaFutureImpl<T> future, T expected) {
+ this.future = future;
+ this.expected = expected;
+ }
+
+ @Override
+ public void run() {
+ try {
+ T value = future.get();
+ assertEquals(expected, value);
+ } catch (Throwable testException) {
+ this.testException = testException;
+ }
+ }
+ }
+
+ @Test
+ public void testAllOfFutures() throws Exception {
+ final int numThreads = 5;
+ final List<KafkaFutureImpl<Integer>> futures = new ArrayList<>();
+ for (int i = 0; i < numThreads; i++) {
+ futures.add(new KafkaFutureImpl<Integer>());
+ }
+ KafkaFuture<Void> allFuture = KafkaFuture.allOf(futures.toArray(new KafkaFuture[0]));
+ final List<CompleterThread> completerThreads = new ArrayList<>();
+ final List<WaiterThread> waiterThreads = new ArrayList<>();
+ for (int i = 0; i < numThreads; i++) {
+ completerThreads.add(new CompleterThread<>(futures.get(i), i));
+ waiterThreads.add(new WaiterThread<>(futures.get(i), i));
+ }
+ assertFalse(allFuture.isDone());
+ for (int i = 0; i < numThreads; i++) {
+ waiterThreads.get(i).start();
+ }
+ for (int i = 0; i < numThreads - 1; i++) {
+ completerThreads.get(i).start();
+ }
+ assertFalse(allFuture.isDone());
+ completerThreads.get(numThreads - 1).start();
+ allFuture.get();
+ assertTrue(allFuture.isDone());
+ for (int i = 0; i < numThreads; i++) {
+ assertEquals(Integer.valueOf(i), futures.get(i).get());
+ }
+ for (int i = 0; i < numThreads; i++) {
+ completerThreads.get(i).join();
+ waiterThreads.get(i).join();
+ assertEquals(null, completerThreads.get(i).testException);
+ assertEquals(null, waiterThreads.get(i).testException);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 7342bbb..d297b70 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -40,6 +40,11 @@ import org.apache.kafka.common.{Cluster, Node, TopicPartition}
import scala.collection.JavaConverters._
import scala.util.Try
+/**
+ * A Scala administrative client for Kafka which supports managing and inspecting topics, brokers,
+ * and configurations. This client is deprecated, and will be replaced by KafkaAdminClient.
+ * @see KafkaAdminClient
+ */
class AdminClient(val time: Time,
val requestTimeoutMs: Int,
val retryBackoffMs: Long,
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
deleted file mode 100644
index ff9fef0..0000000
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ /dev/null
@@ -1,263 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.api
-
-import java.util.Collections
-import java.util.concurrent.TimeUnit
-
-import kafka.admin.AdminClient
-import kafka.admin.AdminClient.DeleteRecordsResult
-import kafka.server.KafkaConfig
-import java.lang.{Long => JLong}
-import kafka.utils.{Logging, TestUtils}
-import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, ProducerConfig}
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.{Errors, ApiKeys}
-import org.apache.kafka.common.requests.DeleteRecordsRequest
-import org.junit.{After, Before, Test}
-import org.junit.Assert._
-import scala.collection.JavaConverters._
-
-class AdminClientTest extends IntegrationTestHarness with Logging {
-
- val producerCount = 1
- val consumerCount = 2
- val serverCount = 3
- val groupId = "my-test"
- val clientId = "consumer-498"
-
- val topic = "topic"
- val part = 0
- val tp = new TopicPartition(topic, part)
- val part2 = 1
- val tp2 = new TopicPartition(topic, part2)
-
- var client: AdminClient = null
-
- // configure the servers and clients
- this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
- this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
- this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
- this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
- this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
- this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
- this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
- this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
- this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
- this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
-
- @Before
- override def setUp() {
- super.setUp()
- client = AdminClient.createSimplePlaintext(this.brokerList)
- TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers)
- }
-
- @After
- override def tearDown() {
- client.close()
- super.tearDown()
- }
-
- @Test
- def testSeekToBeginningAfterDeleteRecords() {
- val consumer = consumers.head
- subscribeAndWaitForAssignment(topic, consumer)
-
- sendRecords(producers.head, 10, tp)
- consumer.seekToBeginning(Collections.singletonList(tp))
- assertEquals(0L, consumer.position(tp))
-
- client.deleteRecordsBefore(Map((tp, 5L))).get()
- consumer.seekToBeginning(Collections.singletonList(tp))
- assertEquals(5L, consumer.position(tp))
-
- client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
- consumer.seekToBeginning(Collections.singletonList(tp))
- assertEquals(10L, consumer.position(tp))
- }
-
- @Test
- def testConsumeAfterDeleteRecords() {
- val consumer = consumers.head
- subscribeAndWaitForAssignment(topic, consumer)
-
- sendRecords(producers.head, 10, tp)
- var messageCount = 0
- TestUtils.waitUntilTrue(() => {
- messageCount += consumer.poll(0).count()
- messageCount == 10
- }, "Expected 10 messages", 3000L)
-
- client.deleteRecordsBefore(Map((tp, 3L))).get()
- consumer.seek(tp, 1)
- messageCount = 0
- TestUtils.waitUntilTrue(() => {
- messageCount += consumer.poll(0).count()
- messageCount == 7
- }, "Expected 7 messages", 3000L)
-
- client.deleteRecordsBefore(Map((tp, 8L))).get()
- consumer.seek(tp, 1)
- messageCount = 0
- TestUtils.waitUntilTrue(() => {
- messageCount += consumer.poll(0).count()
- messageCount == 2
- }, "Expected 2 messages", 3000L)
- }
-
- @Test
- def testLogStartOffsetCheckpoint() {
- subscribeAndWaitForAssignment(topic, consumers.head)
-
- sendRecords(producers.head, 10, tp)
- assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
-
- for (i <- 0 until serverCount)
- killBroker(i)
- restartDeadBrokers()
-
- client.close()
- brokerList = TestUtils.bootstrapServers(servers, listenerName)
- client = AdminClient.createSimplePlaintext(brokerList)
-
- TestUtils.waitUntilTrue(() => {
- // Need to retry if leader is not available for the partition
- client.deleteRecordsBefore(Map((tp, 0L))).get(1000L, TimeUnit.MILLISECONDS)(tp).equals(DeleteRecordsResult(5L, null))
- }, "Expected low watermark of the partition to be 5L")
- }
-
- @Test
- def testLogStartOffsetAfterDeleteRecords() {
- subscribeAndWaitForAssignment(topic, consumers.head)
-
- sendRecords(producers.head, 10, tp)
- client.deleteRecordsBefore(Map((tp, 3L))).get()
-
- for (i <- 0 until serverCount)
- assertEquals(3, servers(i).replicaManager.getReplica(tp).get.logStartOffset)
- }
-
- @Test
- def testOffsetsForTimesAfterDeleteRecords() {
- val consumer = consumers.head
- subscribeAndWaitForAssignment(topic, consumer)
-
- sendRecords(producers.head, 10, tp)
- assertEquals(0L, consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp).offset())
-
- client.deleteRecordsBefore(Map((tp, 5L))).get()
- assertEquals(5L, consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp).offset())
-
- client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
- assertNull(consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp))
- }
-
- @Test
- def testDeleteRecordsWithException() {
- subscribeAndWaitForAssignment(topic, consumers.head)
-
- sendRecords(producers.head, 10, tp)
- // Should get success result
- assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
- // OffsetOutOfRangeException if offset > high_watermark
- assertEquals(DeleteRecordsResult(-1L, Errors.OFFSET_OUT_OF_RANGE.exception()), client.deleteRecordsBefore(Map((tp, 20))).get()(tp))
-
- val nonExistPartition = new TopicPartition(topic, 3)
- // UnknownTopicOrPartitionException if user tries to delete records of a non-existent partition
- assertEquals(DeleteRecordsResult(-1L, Errors.LEADER_NOT_AVAILABLE.exception()),
- client.deleteRecordsBefore(Map((nonExistPartition, 20))).get()(nonExistPartition))
- }
-
- @Test
- def testListGroups() {
- subscribeAndWaitForAssignment(topic, consumers.head)
-
- val groups = client.listAllGroupsFlattened
- assertFalse(groups.isEmpty)
- val group = groups.head
- assertEquals(groupId, group.groupId)
- assertEquals("consumer", group.protocolType)
- }
-
- @Test
- def testListAllBrokerVersionInfo() {
- subscribeAndWaitForAssignment(topic, consumers.head)
-
- val brokerVersionInfos = client.listAllBrokerVersionInfo
- val brokers = brokerList.split(",")
- assertEquals(brokers.size, brokerVersionInfos.size)
- for ((node, tryBrokerVersionInfo) <- brokerVersionInfos) {
- val hostStr = s"${node.host}:${node.port}"
- assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
- val brokerVersionInfo = tryBrokerVersionInfo.get
- assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
- }
- }
-
- @Test
- def testGetConsumerGroupSummary() {
- subscribeAndWaitForAssignment(topic, consumers.head)
-
- val group = client.describeConsumerGroup(groupId)
- assertEquals("range", group.assignmentStrategy)
- assertEquals("Stable", group.state)
- assertFalse(group.consumers.isEmpty)
-
- val member = group.consumers.get.head
- assertEquals(clientId, member.clientId)
- assertFalse(member.host.isEmpty)
- assertFalse(member.consumerId.isEmpty)
- }
-
- @Test
- def testDescribeConsumerGroup() {
- subscribeAndWaitForAssignment(topic, consumers.head)
-
- val consumerGroupSummary = client.describeConsumerGroup(groupId)
- assertEquals(1, consumerGroupSummary.consumers.get.size)
- assertEquals(List(tp, tp2), consumerGroupSummary.consumers.get.flatMap(_.assignment))
- }
-
- @Test
- def testDescribeConsumerGroupForNonExistentGroup() {
- val nonExistentGroup = "non" + groupId
- assertTrue("Expected empty ConsumerSummary list", client.describeConsumerGroup(nonExistentGroup).consumers.get.isEmpty)
- }
-
- private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {
- consumer.subscribe(Collections.singletonList(topic))
- TestUtils.waitUntilTrue(() => {
- consumer.poll(0)
- !consumer.assignment.isEmpty
- }, "Expected non-empty assignment")
- }
-
- private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
- numRecords: Int,
- tp: TopicPartition) {
- val futures = (0 until numRecords).map { i =>
- val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i".getBytes, s"$i".getBytes)
- debug(s"Sending this record: $record")
- producer.send(record)
- }
-
- futures.foreach(_.get)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
new file mode 100644
index 0000000..9c5468f
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import java.util
+import java.util.Properties
+import java.util.concurrent.ExecutionException
+
+import org.apache.kafka.common.utils.Utils
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import org.apache.kafka.clients.admin._
+import kafka.utils.{Logging, TestUtils}
+import org.apache.kafka.clients.admin.NewTopic
+import org.apache.kafka.common.KafkaFuture
+import org.apache.kafka.common.errors.TopicExistsException
+import org.apache.kafka.common.protocol.ApiKeys
+import org.junit.{After, Rule, Test}
+import org.junit.rules.Timeout
+import org.junit.Assert._
+
+import scala.collection.JavaConverters._
+
+/**
+ * An integration test of the KafkaAdminClient.
+ *
+ * Also see {@link org.apache.kafka.clients.admin.KafkaAdminClientTest} for a unit test of the admin client.
+ */
+class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
+
+ @Rule
+ def globalTimeout = Timeout.millis(120000)
+
+ var client: AdminClient = null
+
+ @After
+ def closeClient(): Unit = {
+ if (client != null)
+ Utils.closeQuietly(client, "AdminClient")
+ }
+
+ val brokerCount = 3
+ lazy val serverConfig = new Properties
+
+ def createConfig(): util.Map[String, Object] = {
+ val config = new util.HashMap[String, Object]
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ val securityProps: util.Map[Object, Object] =
+ TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+ securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+ config
+ }
+
+ def waitForTopics(client: AdminClient, expectedPresent: Seq[String], expectedMissing: Seq[String]): Unit = {
+ TestUtils.waitUntilTrue(() => {
+ val topics = client.listTopics().names().get()
+ expectedPresent.forall(topicName => topics.contains(topicName)) &&
+ expectedMissing.forall(topicName => !topics.contains(topicName))
+ }, "timed out waiting for topics")
+ }
+
+ def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable]): Unit = {
+ try {
+ future.get()
+ fail("Expected CompletableFuture.get to return an exception")
+ } catch {
+ case e: ExecutionException =>
+ val cause = e.getCause()
+ assertTrue("Expected an exception of type " + clazz.getName + "; got type " +
+ cause.getClass().getName, clazz.isInstance(cause))
+ }
+ }
+
+ @Test
+ def testClose(): Unit = {
+ val client = AdminClient.create(createConfig())
+ client.close()
+ client.close() // double close has no effect
+ }
+
+ @Test
+ def testListNodes(): Unit = {
+ client = AdminClient.create(createConfig())
+ val brokerStrs = brokerList.split(",").toList.sorted
+ var nodeStrs : List[String] = null
+ do {
+ var nodes = client.describeCluster().nodes().get().asScala
+ nodeStrs = nodes.map ( node => s"${node.host}:${node.port}" ).toList.sorted
+ } while (nodeStrs.size < brokerStrs.size)
+ assertEquals(brokerStrs.mkString(","), nodeStrs.mkString(","))
+ client.close()
+ }
+
+ @Test
+ def testCreateDeleteTopics(): Unit = {
+ client = AdminClient.create(createConfig())
+ val newTopics : List[NewTopic] = List(
+ new NewTopic("mytopic", 1, 1),
+ new NewTopic("mytopic2", 1, 1))
+ client.createTopics(newTopics.asJava,
+ new CreateTopicsOptions().validateOnly(true)).all().get()
+ waitForTopics(client, List(), List("mytopic", "mytopic2"))
+
+ client.createTopics(newTopics.asJava).all().get()
+ waitForTopics(client, List("mytopic", "mytopic2"), List())
+
+ val results = client.createTopics(newTopics.asJava).results()
+ assert(results.containsKey("mytopic"))
+ assertFutureExceptionTypeEquals(results.get("mytopic"), classOf[TopicExistsException])
+ assert(results.containsKey("mytopic2"))
+ assertFutureExceptionTypeEquals(results.get("mytopic2"), classOf[TopicExistsException])
+
+ val deleteTopics : Set[String] = Set("mytopic", "mytopic2")
+ client.deleteTopics(deleteTopics.asJava).all().get()
+ waitForTopics(client, List(), List("mytopic", "mytopic2"))
+
+ client.close()
+ }
+
+ @Test
+ def testGetAllBrokerVersions(): Unit = {
+ client = AdminClient.create(createConfig())
+ val nodes = client.describeCluster().nodes().get()
+ val nodesToVersions = client.apiVersions(nodes).all().get()
+ val brokers = brokerList.split(",")
+ assert(brokers.size == nodesToVersions.size())
+ for ((node, brokerVersionInfo) <- nodesToVersions.asScala) {
+ val hostStr = s"${node.host}:${node.port}"
+ assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
+ assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
+ }
+ client.close()
+ }
+
+ override def generateConfigs() = {
+ val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
+ trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
+ cfgs.foreach { config =>
+ config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}")
+ config.remove(KafkaConfig.InterBrokerSecurityProtocolProp)
+ config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value)
+ config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}")
+ config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true");
+ }
+ cfgs.foreach(_.putAll(serverConfig))
+ cfgs.map(KafkaConfig.fromProps)
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
new file mode 100644
index 0000000..434a47f
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import java.util.Collections
+import java.util.concurrent.TimeUnit
+
+import kafka.admin.AdminClient
+import kafka.admin.AdminClient.DeleteRecordsResult
+import kafka.server.KafkaConfig
+import java.lang.{Long => JLong}
+import kafka.utils.{Logging, TestUtils}
+import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, ProducerConfig}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{Errors, ApiKeys}
+import org.apache.kafka.common.requests.DeleteRecordsRequest
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+import scala.collection.JavaConverters._
+
+/**
+ * Tests for the deprecated Scala AdminClient.
+ */
+class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
+
+ val producerCount = 1
+ val consumerCount = 2
+ val serverCount = 3
+ val groupId = "my-test"
+ val clientId = "consumer-498"
+
+ val topic = "topic"
+ val part = 0
+ val tp = new TopicPartition(topic, part)
+ val part2 = 1
+ val tp2 = new TopicPartition(topic, part2)
+
+ var client: AdminClient = null
+
+ // configure the servers and clients
+ this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
+ this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
+ this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+ this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
+ this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
+ this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+ this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
+ this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+ this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
+
+ @Before
+ override def setUp() {
+ super.setUp()
+ client = AdminClient.createSimplePlaintext(this.brokerList)
+ TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers)
+ }
+
+ @After
+ override def tearDown() {
+ client.close()
+ super.tearDown()
+ }
+
+ @Test
+ def testSeekToBeginningAfterDeleteRecords() {
+ val consumer = consumers.head
+ subscribeAndWaitForAssignment(topic, consumer)
+
+ sendRecords(producers.head, 10, tp)
+ consumer.seekToBeginning(Collections.singletonList(tp))
+ assertEquals(0L, consumer.position(tp))
+
+ client.deleteRecordsBefore(Map((tp, 5L))).get()
+ consumer.seekToBeginning(Collections.singletonList(tp))
+ assertEquals(5L, consumer.position(tp))
+
+ client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
+ consumer.seekToBeginning(Collections.singletonList(tp))
+ assertEquals(10L, consumer.position(tp))
+ }
+
+ @Test
+ def testConsumeAfterDeleteRecords() {
+ val consumer = consumers.head
+ subscribeAndWaitForAssignment(topic, consumer)
+
+ sendRecords(producers.head, 10, tp)
+ var messageCount = 0
+ TestUtils.waitUntilTrue(() => {
+ messageCount += consumer.poll(0).count()
+ messageCount == 10
+ }, "Expected 10 messages", 3000L)
+
+ client.deleteRecordsBefore(Map((tp, 3L))).get()
+ consumer.seek(tp, 1)
+ messageCount = 0
+ TestUtils.waitUntilTrue(() => {
+ messageCount += consumer.poll(0).count()
+ messageCount == 7
+ }, "Expected 7 messages", 3000L)
+
+ client.deleteRecordsBefore(Map((tp, 8L))).get()
+ consumer.seek(tp, 1)
+ messageCount = 0
+ TestUtils.waitUntilTrue(() => {
+ messageCount += consumer.poll(0).count()
+ messageCount == 2
+ }, "Expected 2 messages", 3000L)
+ }
+
+ @Test
+ def testLogStartOffsetCheckpoint() {
+ subscribeAndWaitForAssignment(topic, consumers.head)
+
+ sendRecords(producers.head, 10, tp)
+ assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
+
+ for (i <- 0 until serverCount)
+ killBroker(i)
+ restartDeadBrokers()
+
+ client.close()
+ brokerList = TestUtils.bootstrapServers(servers, listenerName)
+ client = AdminClient.createSimplePlaintext(brokerList)
+
+ TestUtils.waitUntilTrue(() => {
+ // Need to retry if leader is not available for the partition
+ client.deleteRecordsBefore(Map((tp, 0L))).get(1000L, TimeUnit.MILLISECONDS)(tp).equals(DeleteRecordsResult(5L, null))
+ }, "Expected low watermark of the partition to be 5L")
+ }
+
+ @Test
+ def testLogStartOffsetAfterDeleteRecords() {
+ subscribeAndWaitForAssignment(topic, consumers.head)
+
+ sendRecords(producers.head, 10, tp)
+ client.deleteRecordsBefore(Map((tp, 3L))).get()
+
+ for (i <- 0 until serverCount)
+ assertEquals(3, servers(i).replicaManager.getReplica(tp).get.logStartOffset)
+ }
+
+ @Test
+ def testOffsetsForTimesAfterDeleteRecords() {
+ val consumer = consumers.head
+ subscribeAndWaitForAssignment(topic, consumer)
+
+ sendRecords(producers.head, 10, tp)
+ assertEquals(0L, consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp).offset())
+
+ client.deleteRecordsBefore(Map((tp, 5L))).get()
+ assertEquals(5L, consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp).offset())
+
+ client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
+ assertNull(consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp))
+ }
+
+ @Test
+ def testDeleteRecordsWithException() {
+ subscribeAndWaitForAssignment(topic, consumers.head)
+
+ sendRecords(producers.head, 10, tp)
+ // Should get success result
+ assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
+ // OffsetOutOfRangeException if offset > high_watermark
+ assertEquals(DeleteRecordsResult(-1L, Errors.OFFSET_OUT_OF_RANGE.exception()), client.deleteRecordsBefore(Map((tp, 20))).get()(tp))
+
+ val nonExistPartition = new TopicPartition(topic, 3)
+ // UnknownTopicOrPartitionException if user tries to delete records of a non-existent partition
+ assertEquals(DeleteRecordsResult(-1L, Errors.LEADER_NOT_AVAILABLE.exception()),
+ client.deleteRecordsBefore(Map((nonExistPartition, 20))).get()(nonExistPartition))
+ }
+
+ @Test
+ def testListGroups() {
+ subscribeAndWaitForAssignment(topic, consumers.head)
+
+ val groups = client.listAllGroupsFlattened
+ assertFalse(groups.isEmpty)
+ val group = groups.head
+ assertEquals(groupId, group.groupId)
+ assertEquals("consumer", group.protocolType)
+ }
+
+ @Test
+ def testListAllBrokerVersionInfo() {
+ subscribeAndWaitForAssignment(topic, consumers.head)
+
+ val brokerVersionInfos = client.listAllBrokerVersionInfo
+ val brokers = brokerList.split(",")
+ assertEquals(brokers.size, brokerVersionInfos.size)
+ for ((node, tryBrokerVersionInfo) <- brokerVersionInfos) {
+ val hostStr = s"${node.host}:${node.port}"
+ assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
+ val brokerVersionInfo = tryBrokerVersionInfo.get
+ assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
+ }
+ }
+
+ @Test
+ def testGetConsumerGroupSummary() {
+ subscribeAndWaitForAssignment(topic, consumers.head)
+
+ val group = client.describeConsumerGroup(groupId)
+ assertEquals("range", group.assignmentStrategy)
+ assertEquals("Stable", group.state)
+ assertFalse(group.consumers.isEmpty)
+
+ val member = group.consumers.get.head
+ assertEquals(clientId, member.clientId)
+ assertFalse(member.host.isEmpty)
+ assertFalse(member.consumerId.isEmpty)
+ }
+
+ @Test
+ def testDescribeConsumerGroup() {
+ subscribeAndWaitForAssignment(topic, consumers.head)
+
+ val consumerGroupSummary = client.describeConsumerGroup(groupId)
+ assertEquals(1, consumerGroupSummary.consumers.get.size)
+ assertEquals(List(tp, tp2), consumerGroupSummary.consumers.get.flatMap(_.assignment))
+ }
+
+ @Test
+ def testDescribeConsumerGroupForNonExistentGroup() {
+ val nonExistentGroup = "non" + groupId
+ assertTrue("Expected empty ConsumerSummary list", client.describeConsumerGroup(nonExistentGroup).consumers.get.isEmpty)
+ }
+
+ private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {
+ consumer.subscribe(Collections.singletonList(topic))
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(0)
+ !consumer.assignment.isEmpty
+ }, "Expected non-empty assignment")
+ }
+
+ private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
+ numRecords: Int,
+ tp: TopicPartition) {
+ val futures = (0 until numRecords).map { i =>
+ val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i".getBytes, s"$i".getBytes)
+ debug(s"Sending this record: $record")
+ producer.send(record)
+ }
+
+ futures.foreach(_.get)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
new file mode 100644
index 0000000..f20ed0f
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package kafka.api
+
+import java.io.File
+
+import org.apache.kafka.common.protocol.SecurityProtocol
+import kafka.server.KafkaConfig
+
+class SaslSslAdminClientIntegrationTest extends KafkaAdminClientIntegrationTest with SaslTestHarness {
+ override protected val zkSaslEnabled = true
+ this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+ override protected def securityProtocol = SecurityProtocol.SASL_SSL
+ override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 02b5fe3..e826c7f 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -561,6 +561,9 @@ object TestUtils extends Logging {
def consumerSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties =
securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "consumer", saslProperties)
+ def adminClientSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties =
+ securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "admin-client", saslProperties)
+
/**
* Create a new consumer with a few pre-configured properties.
*/
[2/3] kafka git commit: KAFKA-3265;
Add a public AdminClient API in Java (KIP-117)
Posted by ij...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
new file mode 100644
index 0000000..cf37aa7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -0,0 +1,1065 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.BrokerNotAvailableException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ApiVersionsRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.DeleteTopicsRequest;
+import org.apache.kafka.common.requests.DeleteTopicsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.common.utils.Utils.closeQuietly;
+
+/**
+ * An administrative client for Kafka which supports managing and inspecting topics, brokers,
+ * and configurations.
+ */
+@InterfaceStability.Unstable
+public class KafkaAdminClient extends AdminClient {
+ private static final Logger log = LoggerFactory.getLogger(KafkaAdminClient.class);
+
+ /**
+ * The maximum number of times to retry a call before failing it.
+ */
+ private static final int MAX_CALL_RETRIES = 5;
+
+ /**
+ * The next integer to use to name a KafkaAdminClient which the user hasn't specified an explicit name for.
+ */
+ private static final AtomicInteger ADMIN_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
+
+ /**
+ * The prefix to use for the JMX metrics for this class
+ */
+ private static final String JMX_PREFIX = "kafka.admin.client";
+
+ /**
+ * The default timeout to use for an operation.
+ */
+ private final int defaultTimeoutMs;
+
+ /**
+ * The name of this AdminClient instance.
+ */
+ private final String clientId;
+
+ /**
+ * Provides the time.
+ */
+ private final Time time;
+
+ /**
+ * The cluster metadata used by the KafkaClient.
+ */
+ private final Metadata metadata;
+
+ /**
+ * The metrics for this KafkaAdminClient.
+ */
+ private final Metrics metrics;
+
+ /**
+ * The network client to use.
+ */
+ private final KafkaClient client;
+
+ /**
+ * The runnable used in the service thread for this admin client.
+ */
+ private final AdminClientRunnable runnable;
+
+ /**
+ * The network service thread for this admin client.
+ */
+ private final Thread thread;
+
+ /**
+ * True if this client is closed.
+ */
+ private volatile boolean closed = false;
+
+ /**
+ * Get or create a list value from a map.
+ *
+ * @param map The map to get or create the element from.
+ * @param key The key.
+ * @param <K> The key type.
+ * @param <V> The value type.
+ * @return The list value.
+ */
+ static <K, V> List<V> getOrCreateListValue(Map<K, List<V>> map, K key) {
+ List<V> list = map.get(key);
+ if (list != null)
+ return list;
+ list = new LinkedList<>();
+ map.put(key, list);
+ return list;
+ }
+
+ /**
+ * Send an exception to every element in a collection of KafkaFutureImpls.
+ *
+ * @param futures The collection of KafkaFutureImpl objects.
+ * @param exc The exception
+ * @param <T> The KafkaFutureImpl result type.
+ */
+ private static <T> void completeAllExceptionally(Collection<KafkaFutureImpl<T>> futures, Throwable exc) {
+ for (KafkaFutureImpl<?> future : futures) {
+ future.completeExceptionally(exc);
+ }
+ }
+
+ /**
+ * Get the current time remaining before a deadline as an integer.
+ *
+ * @param now The current time in milliseconds.
+ * @param deadlineMs The deadline time in milliseconds.
+ * @return The time delta in milliseconds.
+ */
+ static int calcTimeoutMsRemainingAsInt(long now, long deadlineMs) {
+ long deltaMs = deadlineMs - now;
+ if (deltaMs > Integer.MAX_VALUE)
+ deltaMs = Integer.MAX_VALUE;
+ else if (deltaMs < Integer.MIN_VALUE)
+ deltaMs = Integer.MIN_VALUE;
+ return (int) deltaMs;
+ }
+
+ /**
+ * Generate the client id based on the configuration.
+ *
+ * @param config The configuration
+ *
+ * @return The client id
+ */
+ static String generateClientId(AdminClientConfig config) {
+ String clientId = config.getString(AdminClientConfig.CLIENT_ID_CONFIG);
+ if (!clientId.isEmpty())
+ return clientId;
+ return "adminclient-" + ADMIN_CLIENT_ID_SEQUENCE.getAndIncrement();
+ }
+
+ /**
+ * Get the deadline for a particular call.
+ *
+ * @param now The current time in milliseconds.
+ * @param optionTimeoutMs The timeout option given by the user.
+ *
+ * @return The deadline in milliseconds.
+ */
+ private long calcDeadlineMs(long now, Integer optionTimeoutMs) {
+ if (optionTimeoutMs != null)
+ return now + Math.max(0, optionTimeoutMs);
+ return now + defaultTimeoutMs;
+ }
+
+ /**
+ * Pretty-print an exception.
+ *
+ * @param throwable The exception.
+ *
+ * @return A compact human-readable string.
+ */
+ static String prettyPrintException(Throwable throwable) {
+ if (throwable == null)
+ return "Null exception.";
+ if (throwable.getMessage() != null) {
+ return throwable.getClass().getSimpleName() + ": " + throwable.getMessage();
+ }
+ return throwable.getClass().getSimpleName();
+ }
+
+ static KafkaAdminClient create(AdminClientConfig config) {
+ Metadata metadata = null;
+ Metrics metrics = null;
+ NetworkClient networkClient = null;
+ Time time = Time.SYSTEM;
+ String clientId = generateClientId(config);
+ ChannelBuilder channelBuilder = null;
+ Selector selector = null;
+ ApiVersions apiVersions = new ApiVersions();
+
+ try {
+ metadata = new Metadata(config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
+ config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
+ List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
+ MetricsReporter.class);
+ Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
+ MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG))
+ .timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
+ .recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG)))
+ .tags(metricTags);
+ reporters.add(new JmxReporter(JMX_PREFIX));
+ metrics = new Metrics(metricConfig, reporters, time);
+ String metricGrpPrefix = "admin-client";
+ channelBuilder = ClientUtils.createChannelBuilder(config);
+ selector = new Selector(config.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
+ metrics, time, metricGrpPrefix, channelBuilder);
+ networkClient = new NetworkClient(
+ selector,
+ metadata,
+ clientId,
+ 100,
+ config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG),
+ config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),
+ config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),
+ config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG),
+ time,
+ true,
+ apiVersions);
+ channelBuilder = null;
+ return new KafkaAdminClient(config, clientId, time, metadata, metrics, networkClient);
+ } catch (Throwable exc) {
+ closeQuietly(metrics, "Metrics");
+ closeQuietly(networkClient, "NetworkClient");
+ closeQuietly(selector, "Selector");
+ closeQuietly(channelBuilder, "ChannelBuilder");
+ throw new KafkaException("Failed create new KafkaAdminClient", exc);
+ }
+ }
+
+ static KafkaAdminClient create(AdminClientConfig config, KafkaClient client, Metadata metadata) {
+ Metrics metrics = null;
+ Time time = Time.SYSTEM;
+ String clientId = generateClientId(config);
+
+ try {
+ metrics = new Metrics(new MetricConfig(), new LinkedList<MetricsReporter>(), time);
+ return new KafkaAdminClient(config, clientId, time, metadata, metrics, client);
+ } catch (Throwable exc) {
+ closeQuietly(metrics, "Metrics");
+ throw new KafkaException("Failed create new KafkaAdminClient", exc);
+ }
+ }
+
+ private KafkaAdminClient(AdminClientConfig config, String clientId, Time time, Metadata metadata,
+ Metrics metrics, KafkaClient client) {
+ this.defaultTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
+ this.clientId = clientId;
+ this.time = time;
+ this.metadata = metadata;
+ List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
+ config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
+ this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
+ this.metrics = metrics;
+ this.client = client;
+ this.runnable = new AdminClientRunnable();
+ String threadName = "kafka-admin-client-thread" + (clientId.length() > 0 ? " | " + clientId : "");
+ this.thread = new KafkaThread(threadName, runnable, false);
+ config.logUnused();
+ log.debug("Created Kafka admin client {}", this.clientId);
+ thread.start();
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ client.wakeup(); // Wake the thread, if it is blocked inside poll().
+ try {
+ // Wait for the thread to be joined.
+ thread.join();
+ log.debug("{}: closed.", clientId);
+ } catch (InterruptedException e) {
+ log.debug("{}: interrupted while joining I/O thread", clientId, e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * An interface for providing a node for a call.
+ */
+ private interface NodeProvider {
+ Node provide();
+ }
+
+ /**
+ * Provides a constant node which is known at construction time.
+ */
+ private static class ConstantAdminNodeProvider implements NodeProvider {
+ private final Node node;
+
+ ConstantAdminNodeProvider(Node node) {
+ this.node = node;
+ }
+
+ @Override
+ public Node provide() {
+ return node;
+ }
+ }
+
+ /**
+ * Provides the controller node.
+ */
+ private class ControllerNodeProvider implements NodeProvider {
+ @Override
+ public Node provide() {
+ return metadata.fetch().controller();
+ }
+ }
+
+ /**
+ * Provides the least loaded node.
+ */
+ private class LeastLoadedNodeProvider implements NodeProvider {
+ @Override
+ public Node provide() {
+ return client.leastLoadedNode(time.milliseconds());
+ }
+ }
+
+ private abstract class Call {
+ private final String callName;
+ private final long deadlineMs;
+ private final NodeProvider nodeProvider;
+ private int tries = 0;
+
+ Call(String callName, long deadlineMs, NodeProvider nodeProvider) {
+ this.callName = callName;
+ this.deadlineMs = deadlineMs;
+ this.nodeProvider = nodeProvider;
+ }
+
+ /**
+ * Handle a failure.
+ *
+ * Depending on what the exception is and how many times we have already tried, we may choose to
+ * fail the Call, or retry it. It is important to print the stack traces here in some cases,
+ * since they are not necessarily preserved in ApiVersionException objects.
+ *
+ * @param now The current time in milliseconds.
+ * @param throwable The failure exception.
+ */
+ final void fail(long now, Throwable throwable) {
+ // If this is an UnsupportedVersionException that we can retry, do so.
+ if ((throwable instanceof UnsupportedVersionException) &&
+ handleUnsupportedVersionException((UnsupportedVersionException) throwable)) {
+ log.trace("{} attempting protocol downgrade.", this);
+ runnable.call(this, now);
+ return;
+ }
+ tries++;
+ // If the call has timed out, fail.
+ if (calcTimeoutMsRemainingAsInt(now, deadlineMs) < 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("{} timed out at {} after {} attempt(s)", this, now, tries,
+ new Exception(prettyPrintException(throwable)));
+ }
+ handleFailure(throwable);
+ return;
+ }
+ // If the exception is not retryable, fail.
+ if (!(throwable instanceof RetriableException)) {
+ if (log.isDebugEnabled()) {
+ log.debug("{} failed with non-retriable exception after {} attempt(s)", this, tries,
+ new Exception(prettyPrintException(throwable)));
+ }
+ handleFailure(throwable);
+ return;
+ }
+ // If we are out of retries, fail.
+ if (tries > MAX_CALL_RETRIES) {
+ if (log.isDebugEnabled()) {
+ log.debug("{} failed after {} attempt(s)", this, tries,
+ new Exception(prettyPrintException(throwable)));
+ }
+ handleFailure(throwable);
+ return;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("{} failed: {}. Beginning retry #{}",
+ this, prettyPrintException(throwable), tries);
+ }
+ runnable.call(this, now);
+ }
+
+ /**
+ * Create an AbstractRequest.Builder for this Call.
+ *
+ * @param timeoutMs The timeout in milliseconds.
+ *
+ * @return The AbstractRequest builder.
+ */
+ abstract AbstractRequest.Builder createRequest(int timeoutMs);
+
+ /**
+ * Process the call response.
+ *
+ * @param abstractResponse The AbstractResponse.
+ *
+ * @return True if the response has been processed; false to re-submit the request.
+ */
+ abstract void handleResponse(AbstractResponse abstractResponse);
+
+ /**
+ * Handle a failure. This will only be called if the failure exception was not
+ * retryable, or if we hit a timeout.
+ *
+ * @param throwable The exception.
+ */
+ abstract void handleFailure(Throwable throwable);
+
+ /**
+ * Handle an UnsupportedVersionException.
+ *
+ * @param exception The exception.
+ *
+ * @return True if the exception can be handled; false otherwise.
+ */
+ boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "Call(callName=" + callName + ", deadlineMs=" + deadlineMs + ")";
+ }
+ }
+
+ private final class AdminClientRunnable implements Runnable {
+ /**
+ * Pending calls. Protected by the object monitor.
+ */
+ private List<Call> newCalls = new LinkedList<>();
+
+ /**
+ * Check if the AdminClient metadata is ready.
+ * We need to know who the controller is, and have a non-empty view of the cluster.
+ *
+ * @param prevMetadataVersion The previous metadata version which wasn't usable.
+ * @return null if the metadata is usable; the current metadata
+ * version otherwise
+ */
+ private Integer checkMetadataReady(Integer prevMetadataVersion) {
+ if (prevMetadataVersion != null) {
+ if (prevMetadataVersion == metadata.version())
+ return prevMetadataVersion;
+ }
+ Cluster cluster = metadata.fetch();
+ if (cluster.nodes().isEmpty()) {
+ log.trace("{}: metadata is not ready yet. No cluster nodes found.", clientId);
+ return metadata.requestUpdate();
+ }
+ if (cluster.controller() == null) {
+ log.trace("{}: metadata is not ready yet. No controller found.", clientId);
+ return metadata.requestUpdate();
+ }
+ if (prevMetadataVersion != null) {
+ log.trace("{}: metadata is now ready.", clientId);
+ }
+ return null;
+ }
+
+ /**
+ * Time out a list of calls.
+ *
+ * @param now The current time in milliseconds.
+ * @param calls The collection of calls. Must be sorted from oldest to newest.
+ */
+ private int timeoutCalls(long now, Collection<Call> calls) {
+ int numTimedOut = 0;
+ for (Iterator<Call> iter = calls.iterator(); iter.hasNext(); ) {
+ Call call = iter.next();
+ if (calcTimeoutMsRemainingAsInt(now, call.deadlineMs) < 0) {
+ call.fail(now, new TimeoutException());
+ iter.remove();
+ numTimedOut++;
+ }
+ }
+ return numTimedOut;
+ }
+
+ /**
+ * Time out the elements in the newCalls list which are expired.
+ *
+ * @param now The current time in milliseconds.
+ */
+ private synchronized void timeoutNewCalls(long now) {
+ int numTimedOut = timeoutCalls(now, newCalls);
+ if (numTimedOut > 0) {
+ log.debug("{}: timed out {} new calls.", clientId, numTimedOut);
+ }
+ }
+
+ /**
+ * Time out calls which have been assigned to nodes.
+ *
+ * @param now The current time in milliseconds.
+ * @param callsToSend A map of nodes to the calls they need to handle.
+ */
+ private void timeoutCallsToSend(long now, Map<Node, List<Call>> callsToSend) {
+ int numTimedOut = 0;
+ for (List<Call> callList : callsToSend.values()) {
+ numTimedOut += timeoutCalls(now, callList);
+ }
+ if (numTimedOut > 0)
+ log.debug("{}: timed out {} call(s) with assigned nodes.", clientId, numTimedOut);
+ }
+
+ /**
+ * Choose nodes for the calls in the callsToSend list.
+ *
+ * This function holds the lock for the minimum amount of time, to avoid blocking
+ * users of AdminClient who will also take the lock to add new calls.
+ *
+ * @param now The current time in milliseconds.
+ * @param callsToSend A map of nodes to the calls they need to handle.
+ *
+ * @return The new calls we need to process.
+ */
+ private void chooseNodesForNewCalls(long now, Map<Node, List<Call>> callsToSend) {
+ List<Call> newCallsToAdd = null;
+ synchronized (this) {
+ if (newCalls.isEmpty()) {
+ return;
+ }
+ newCallsToAdd = newCalls;
+ newCalls = new LinkedList<>();
+ }
+ for (Call call : newCallsToAdd) {
+ chooseNodeForNewCall(now, callsToSend, call);
+ }
+ }
+
+ /**
+ * Choose a node for a new call.
+ *
+ * @param now The current time in milliseconds.
+ * @param callsToSend A map of nodes to the calls they need to handle.
+ * @param call The call.
+ */
+ private void chooseNodeForNewCall(long now, Map<Node, List<Call>> callsToSend, Call call) {
+ Node node = call.nodeProvider.provide();
+ if (node == null) {
+ call.fail(now, new BrokerNotAvailableException(
+ String.format("Error choosing node for %s: no node found.", call.callName)));
+ return;
+ }
+ log.trace("{}: assigned {} to {}", clientId, call, node);
+ getOrCreateListValue(callsToSend, node).add(call);
+ }
+
+ /**
+ * Send the calls which are ready.
+ *
+ * @param now The current time in milliseconds.
+ * @param callsToSend The calls to send, by node.
+ * @param correlationIdToCalls A map of correlation IDs to calls.
+ * @param callsInFlight A map of nodes to the calls they have in flight.
+ *
+ * @return The minimum timeout we need for poll().
+ */
+ private long sendEligibleCalls(long now, Map<Node, List<Call>> callsToSend,
+ Map<Integer, Call> correlationIdToCalls, Map<String, List<Call>> callsInFlight) {
+ long pollTimeout = Long.MAX_VALUE;
+ for (Iterator<Map.Entry<Node, List<Call>>> iter = callsToSend.entrySet().iterator();
+ iter.hasNext(); ) {
+ Map.Entry<Node, List<Call>> entry = iter.next();
+ List<Call> calls = entry.getValue();
+ if (calls.isEmpty()) {
+ iter.remove();
+ continue;
+ }
+ Node node = entry.getKey();
+ if (!client.ready(node, now)) {
+ long nodeTimeout = client.connectionDelay(node, now);
+ pollTimeout = Math.min(pollTimeout, nodeTimeout);
+ log.trace("{}: client is not ready to send to {}. Must delay {} ms", clientId, node, nodeTimeout);
+ continue;
+ }
+ Call call = calls.remove(0);
+ int timeoutMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs);
+ AbstractRequest.Builder<?> requestBuilder = null;
+ try {
+ requestBuilder = call.createRequest(timeoutMs);
+ } catch (Throwable throwable) {
+ call.fail(now, new KafkaException(String.format(
+ "Internal error sending %s to %s.", call.callName, node)));
+ continue;
+ }
+ ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true);
+ log.trace("{}: sending {} to {}. correlationId={}", clientId, requestBuilder, node,
+ clientRequest.correlationId());
+ client.send(clientRequest, now);
+ getOrCreateListValue(callsInFlight, node.idString()).add(call);
+ correlationIdToCalls.put(clientRequest.correlationId(), call);
+ }
+ return pollTimeout;
+ }
+
+ /**
+ * Time out expired calls that are in flight.
+ *
+ * Calls that are in flight may have been partially or completely sent over the wire. They may
+ * even be in the process of being processed by the remote server. At the moment, our only option
+ * to time them out is to close the entire connection.
+ *
+ * @param now The current time in milliseconds.
+ * @param callsInFlight A map of nodes to the calls they have in flight.
+ */
+ private void timeoutCallsInFlight(long now, Map<String, List<Call>> callsInFlight) {
+ int numTimedOut = 0;
+ for (Map.Entry<String, List<Call>> entry : callsInFlight.entrySet()) {
+ List<Call> contexts = entry.getValue();
+ if (contexts.isEmpty())
+ continue;
+ String nodeId = entry.getKey();
+ // We assume that the first element in the list is the earliest. So it should be the
+ // only one we need to check the timeout for.
+ Call call = contexts.get(0);
+ if (calcTimeoutMsRemainingAsInt(now, call.deadlineMs) < 0) {
+ log.debug("{}: Closing connection to {} to time out {}", clientId, nodeId, call);
+ client.close(nodeId);
+ numTimedOut++;
+ // We don't remove anything from the callsInFlight data structure. Because the connection
+ // has been closed, the calls should be returned by the next client#poll(),
+ // and handled at that point.
+ }
+ }
+ if (numTimedOut > 0)
+ log.debug("{}: timed out {} call(s) in flight.", clientId, numTimedOut);
+ }
+
+ /**
+ * Handle responses from the server.
+ *
+ * @param now The current time in milliseconds.
+ * @param responses The latest responses from KafkaClient.
+ * @param correlationIdToCall A map of correlation IDs to calls.
+ * @param callsInFlight A map of nodes to the calls they have in flight.
+ **/
+ private void handleResponses(long now, List<ClientResponse> responses, Map<String, List<Call>> callsInFlight,
+ Map<Integer, Call> correlationIdToCall) {
+ for (ClientResponse response : responses) {
+ int correlationId = response.requestHeader().correlationId();
+
+ Call call = correlationIdToCall.get(correlationId);
+ if (call == null) {
+ // If the server returns information about a correlation ID we didn't use yet,
+ // an internal server error has occurred. Close the connection and log an error message.
+ log.error("Internal server error on {}: server returned information about unknown " +
+ "correlation ID {}", response.destination(), correlationId);
+ client.close(response.destination());
+ continue;
+ }
+
+ // Stop tracking this call.
+ correlationIdToCall.remove(correlationId);
+ getOrCreateListValue(callsInFlight, response.requestHeader().clientId()).remove(call);
+
+ // Handle the result of the call. This may involve retrying the call, if we got a
+ // retryible exception.
+ if (response.versionMismatch() != null) {
+ call.fail(now, response.versionMismatch());
+ } else if (response.wasDisconnected()) {
+ call.fail(now, new DisconnectException(String.format(
+ "Cancelled %s request with correlation id %s due to node %s being disconnected",
+ call.callName, correlationId, response.destination())));
+ } else {
+ try {
+ call.handleResponse(response.responseBody());
+ if (log.isTraceEnabled())
+ log.trace("{}: {} got response {}", clientId, call, response.responseBody());
+ } catch (Throwable t) {
+ if (log.isTraceEnabled())
+ log.trace("{}: {} handleResponse failed with {}", clientId, call, prettyPrintException(t));
+ call.fail(now, t);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ /**
+ * Maps nodes to calls that we want to send.
+ */
+ Map<Node, List<Call>> callsToSend = new HashMap<>();
+
+ /**
+ * Maps node ID strings to calls that have been sent.
+ */
+ Map<String, List<Call>> callsInFlight = new HashMap<>();
+
+ /**
+ * Maps correlation IDs to calls that have been sent.
+ */
+ Map<Integer, Call> correlationIdToCalls = new HashMap<>();
+
+ /**
+ * The previous metadata version which wasn't usable, or null if there is none.
+ */
+ Integer prevMetadataVersion = null;
+
+ long now = time.milliseconds();
+ log.trace("{} thread starting", clientId);
+ while (true) {
+ // Check if the AdminClient is shutting down.
+ if (closed)
+ break;
+
+ // Handle timeouts.
+ timeoutNewCalls(now);
+ timeoutCallsToSend(now, callsToSend);
+ timeoutCallsInFlight(now, callsInFlight);
+
+ // Handle new calls and metadata update requests.
+ prevMetadataVersion = checkMetadataReady(prevMetadataVersion);
+ long pollTimeout = 1200000;
+ if (prevMetadataVersion == null) {
+ chooseNodesForNewCalls(now, callsToSend);
+ pollTimeout = Math.min(pollTimeout,
+ sendEligibleCalls(now, callsToSend, correlationIdToCalls, callsInFlight));
+ }
+
+ // Wait for network responses.
+ log.trace("{}: entering KafkaClient#poll(timeout={})", clientId, pollTimeout);
+ List<ClientResponse> responses = client.poll(pollTimeout, now);
+ log.trace("{}: KafkaClient#poll retrieved {} response(s)", clientId, responses.size());
+
+ // Update the current time and handle the latest responses.
+ now = time.milliseconds();
+ handleResponses(now, responses, callsInFlight, correlationIdToCalls);
+ }
+ int numTimedOut = 0;
+ synchronized (this) {
+ numTimedOut += timeoutCalls(Long.MAX_VALUE, newCalls);
+ }
+ numTimedOut += timeoutCalls(Long.MAX_VALUE, correlationIdToCalls.values());
+ if (numTimedOut > 0) {
+ log.debug("{}: timed out {} remaining operations.", clientId, numTimedOut);
+ }
+ closeQuietly(client, "KafkaClient");
+ closeQuietly(metrics, "Metrics");
+ log.debug("{}: exiting AdminClientRunnable thread.", clientId);
+ }
+
+ void call(Call call, long now) {
+ if (log.isDebugEnabled()) {
+ log.debug("{}: queueing {} with a timeout {} ms from now.",
+ clientId, call, call.deadlineMs - now);
+ }
+ synchronized (this) {
+ newCalls.add(call);
+ }
+ client.wakeup();
+ }
+ }
+
+ @Override
+ public CreateTopicResults createTopics(final Collection<NewTopic> newTopics,
+ final CreateTopicsOptions options) {
+ final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(newTopics.size());
+ for (NewTopic newTopic : newTopics) {
+ topicFutures.put(newTopic.name(), new KafkaFutureImpl<Void>());
+ }
+ final Map<String, CreateTopicsRequest.TopicDetails> topicsMap = new HashMap<>(newTopics.size());
+ for (NewTopic newTopic : newTopics) {
+ topicsMap.put(newTopic.name(), newTopic.convertToTopicDetails());
+ }
+ final long now = time.milliseconds();
+ runnable.call(new Call("createTopics", calcDeadlineMs(now, options.timeoutMs()),
+ new ControllerNodeProvider()) {
+
+ @Override
+ public AbstractRequest.Builder createRequest(int timeoutMs) {
+ return new CreateTopicsRequest.Builder(topicsMap, timeoutMs, options.validateOnly());
+ }
+
+ @Override
+ public void handleResponse(AbstractResponse abstractResponse) {
+ CreateTopicsResponse response = (CreateTopicsResponse) abstractResponse;
+ // Handle server responses for particular topics.
+ for (Map.Entry<String, CreateTopicsResponse.Error> entry : response.errors().entrySet()) {
+ KafkaFutureImpl<Void> future = topicFutures.get(entry.getKey());
+ if (future == null) {
+ log.warn("Server response mentioned unknown topic {}", entry.getKey());
+ } else {
+ ApiException exception = entry.getValue().exception();
+ if (exception != null) {
+ future.completeExceptionally(exception);
+ } else {
+ future.complete(null);
+ }
+ }
+ }
+ // The server should send back a response for every topic. But do a sanity check anyway.
+ for (Map.Entry<String, KafkaFutureImpl<Void>> entry : topicFutures.entrySet()) {
+ KafkaFutureImpl<Void> future = entry.getValue();
+ if (!future.isDone()) {
+ future.completeExceptionally(new ApiException("The server response did not " +
+ "contain a reference to node " + entry.getKey()));
+ }
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ completeAllExceptionally(topicFutures.values(), throwable);
+ }
+ }, now);
+ return new CreateTopicResults(new HashMap<String, KafkaFuture<Void>>(topicFutures));
+ }
+
+ @Override
+ public DeleteTopicResults deleteTopics(final Collection<String> topicNames,
+ DeleteTopicsOptions options) {
+ final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(topicNames.size());
+ for (String topicName : topicNames) {
+ topicFutures.put(topicName, new KafkaFutureImpl<Void>());
+ }
+ final long now = time.milliseconds();
+ runnable.call(new Call("deleteTopics", calcDeadlineMs(now, options.timeoutMs()),
+ new ControllerNodeProvider()) {
+
+ @Override
+ AbstractRequest.Builder createRequest(int timeoutMs) {
+ return new DeleteTopicsRequest.Builder(new HashSet<>(topicNames), timeoutMs);
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ DeleteTopicsResponse response = (DeleteTopicsResponse) abstractResponse;
+ // Handle server responses for particular topics.
+ for (Map.Entry<String, Errors> entry : response.errors().entrySet()) {
+ KafkaFutureImpl<Void> future = topicFutures.get(entry.getKey());
+ if (future == null) {
+ log.warn("Server response mentioned unknown topic {}", entry.getKey());
+ } else {
+ ApiException exception = entry.getValue().exception();
+ if (exception != null) {
+ future.completeExceptionally(exception);
+ } else {
+ future.complete(null);
+ }
+ }
+ }
+ // The server should send back a response for every topic. But do a sanity check anyway.
+ for (Map.Entry<String, KafkaFutureImpl<Void>> entry : topicFutures.entrySet()) {
+ KafkaFutureImpl<Void> future = entry.getValue();
+ if (!future.isDone()) {
+ future.completeExceptionally(new ApiException("The server response did not " +
+ "contain a reference to node " + entry.getKey()));
+ }
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ completeAllExceptionally(topicFutures.values(), throwable);
+ }
+ }, now);
+ return new DeleteTopicResults(new HashMap<String, KafkaFuture<Void>>(topicFutures));
+ }
+
+ @Override
+ public ListTopicsResults listTopics(final ListTopicsOptions options) {
+ final KafkaFutureImpl<Map<String, TopicListing>> topicListingFuture = new KafkaFutureImpl<>();
+ final long now = time.milliseconds();
+ runnable.call(new Call("listTopics", calcDeadlineMs(now, options.timeoutMs()),
+ new LeastLoadedNodeProvider()) {
+
+ @Override
+ AbstractRequest.Builder createRequest(int timeoutMs) {
+ return MetadataRequest.Builder.allTopics();
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ MetadataResponse response = (MetadataResponse) abstractResponse;
+ Cluster cluster = response.cluster();
+ Map<String, TopicListing> topicListing = new HashMap<>();
+ for (String topicName : cluster.topics()) {
+ boolean internal = cluster.internalTopics().contains(topicName);
+ if (!internal || options.listInternal())
+ topicListing.put(topicName, new TopicListing(topicName, internal));
+ }
+ topicListingFuture.complete(topicListing);
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ topicListingFuture.completeExceptionally(throwable);
+ }
+ }, now);
+ return new ListTopicsResults(topicListingFuture);
+ }
+
+ @Override
+ public DescribeTopicsResults describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options) {
+ final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size());
+ for (String topicName : topicNames) {
+ topicFutures.put(topicName, new KafkaFutureImpl<TopicDescription>());
+ }
+ final long now = time.milliseconds();
+ runnable.call(new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()),
+ new ControllerNodeProvider()) {
+
+ @Override
+ AbstractRequest.Builder createRequest(int timeoutMs) {
+ return new DeleteTopicsRequest.Builder(new HashSet<>(topicNames), timeoutMs);
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ MetadataResponse response = (MetadataResponse) abstractResponse;
+ // Handle server responses for particular topics.
+ for (Map.Entry<String, KafkaFutureImpl<TopicDescription>> entry : topicFutures.entrySet()) {
+ String topicName = entry.getKey();
+ KafkaFutureImpl<TopicDescription> future = entry.getValue();
+ Errors topicError = response.errors().get(topicName);
+ if (topicError != null) {
+ future.completeExceptionally(topicError.exception());
+ continue;
+ }
+ Cluster cluster = response.cluster();
+ if (!cluster.topics().contains(topicName)) {
+ future.completeExceptionally(new InvalidTopicException("Topic " + topicName + " not found."));
+ continue;
+ }
+ boolean isInternal = cluster.internalTopics().contains(topicName);
+ TreeMap<Integer, TopicPartitionInfo> partitions = new TreeMap<>();
+ List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topicName);
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(
+ partitionInfo.partition(), partitionInfo.leader(), Arrays.asList(partitionInfo.replicas()),
+ Arrays.asList(partitionInfo.inSyncReplicas()));
+ partitions.put(partitionInfo.partition(), topicPartitionInfo);
+ }
+ TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions);
+ future.complete(topicDescription);
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ completeAllExceptionally(topicFutures.values(), throwable);
+ }
+ }, now);
+ return new DescribeTopicsResults(new HashMap<String, KafkaFuture<TopicDescription>>(topicFutures));
+ }
+
+ @Override
+ public DescribeClusterResults describeCluster(DescribeClusterOptions options) {
+ final KafkaFutureImpl<Collection<Node>> describeClusterFuture = new KafkaFutureImpl<>();
+ final long now = time.milliseconds();
+ runnable.call(new Call("listNodes", calcDeadlineMs(now, options.timeoutMs()),
+ new LeastLoadedNodeProvider()) {
+
+ @Override
+ AbstractRequest.Builder createRequest(int timeoutMs) {
+ return new MetadataRequest.Builder(Collections.<String>emptyList());
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ MetadataResponse response = (MetadataResponse) abstractResponse;
+ describeClusterFuture.complete(response.brokers());
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ describeClusterFuture.completeExceptionally(throwable);
+ }
+ }, now);
+ return new DescribeClusterResults(describeClusterFuture);
+ }
+
+ @Override
+ public ApiVersionsResults apiVersions(Collection<Node> nodes, ApiVersionsOptions options) {
+ final long now = time.milliseconds();
+ final long deadlineMs = calcDeadlineMs(now, options.timeoutMs());
+ Map<Node, KafkaFuture<NodeApiVersions>> nodeFutures = new HashMap<>();
+ for (final Node node : nodes) {
+ final KafkaFutureImpl<NodeApiVersions> nodeFuture = new KafkaFutureImpl<>();
+ nodeFutures.put(node, nodeFuture);
+ runnable.call(new Call("apiVersions", deadlineMs, new ConstantAdminNodeProvider(node)) {
+ @Override
+ public AbstractRequest.Builder createRequest(int timeoutMs) {
+ return new ApiVersionsRequest.Builder();
+ }
+
+ @Override
+ public void handleResponse(AbstractResponse abstractResponse) {
+ ApiVersionsResponse response = (ApiVersionsResponse) abstractResponse;
+ nodeFuture.complete(new NodeApiVersions(response.apiVersions()));
+ }
+
+ @Override
+ public void handleFailure(Throwable throwable) {
+ nodeFuture.completeExceptionally(throwable);
+ }
+ }, now);
+ }
+ return new ApiVersionsResults(nodeFutures);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
new file mode 100644
index 0000000..02a0a40
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for listTopics.
+ */
+@InterfaceStability.Unstable
+public class ListTopicsOptions {
+ private Integer timeoutMs = null;
+ private boolean listInternal = false;
+
+ public ListTopicsOptions timeoutMs(Integer timeoutMs) {
+ this.timeoutMs = timeoutMs;
+ return this;
+ }
+
+ public Integer timeoutMs() {
+ return timeoutMs;
+ }
+
+ /**
+ * Set whether we should list internal topics.
+ *
+ * @param listInternal Whether we should list internal topics. null means to use
+ * the default.
+ * @return This ListTopicsOptions object.
+ */
+ public ListTopicsOptions listInternal(boolean listInternal) {
+ this.listInternal = listInternal;
+ return this;
+ }
+
+ public boolean listInternal() {
+ return listInternal;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java
new file mode 100644
index 0000000..7e9448d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * The result of the listTopics call.
+ */
+@InterfaceStability.Unstable
+public class ListTopicsResults {
+ final KafkaFuture<Map<String, TopicListing>> future;
+
+ ListTopicsResults(KafkaFuture<Map<String, TopicListing>> future) {
+ this.future = future;
+ }
+
+ /**
+ * Return a future which yields a map of topic names to TopicListing objects.
+ */
+ public KafkaFuture<Map<String, TopicListing>> namesToDescriptions() {
+ return future;
+ }
+
+ /**
+ * Return a future which yields a collection of TopicListing objects.
+ */
+ public KafkaFuture<Collection<TopicListing>> descriptions() {
+ return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<TopicListing>>() {
+ @Override
+ public Collection<TopicListing> apply(Map<String, TopicListing> namesToDescriptions) {
+ return namesToDescriptions.values();
+ }
+ });
+ }
+
+ /**
+ * Return a future which yields a collection of topic names.
+ */
+ public KafkaFuture<Collection<String>> names() {
+ return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<String>>() {
+ @Override
+ public Collection<String> apply(Map<String, TopicListing> namesToDescriptions) {
+ return namesToDescriptions.keySet();
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
new file mode 100644
index 0000000..a1f6fb5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A request to create a new topic through the AdminClient API.
+ */
+public class NewTopic {
+ private final String name;
+ private final int numPartitions;
+ private final short replicationFactor;
+ private final Map<Integer, List<Integer>> replicasAssignments;
+ private Map<String, String> configs = null;
+
+ /**
+ * Create a new topic with a fixed replication factor and number of partitions.
+ */
+ public NewTopic(String name, int numPartitions, short replicationFactor) {
+ this.name = name;
+ this.numPartitions = numPartitions;
+ this.replicationFactor = replicationFactor;
+ this.replicasAssignments = null;
+ }
+
+ /**
+ * A request to create a new topic with a specific replica assignment configuration.
+ */
+ public NewTopic(String name, Map<Integer, List<Integer>> replicasAssignments) {
+ this.name = name;
+ this.numPartitions = -1;
+ this.replicationFactor = -1;
+ this.replicasAssignments = replicasAssignments;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Set the configuration to use on the new topic.
+ *
+ * @param configs The configuration map.
+ * @return This NewTopic object.
+ */
+ public NewTopic configs(Map<String, String> configs) {
+ this.configs = configs;
+ return this;
+ }
+
+ TopicDetails convertToTopicDetails() {
+ if (replicasAssignments != null) {
+ if (configs != null) {
+ return new TopicDetails(replicasAssignments, configs);
+ } else {
+ return new TopicDetails(replicasAssignments);
+ }
+ } else {
+ if (configs != null) {
+ return new TopicDetails(numPartitions, replicationFactor, configs);
+ } else {
+ return new TopicDetails(numPartitions, replicationFactor);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
new file mode 100644
index 0000000..2fc4442
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.NavigableMap;
+
+/**
+ * A detailed description of a single topic in the cluster.
+ */
+public class TopicDescription {
+ private final String name;
+ private final boolean internal;
+ private final NavigableMap<Integer, TopicPartitionInfo> partitions;
+
+ TopicDescription(String name, boolean internal,
+ NavigableMap<Integer, TopicPartitionInfo> partitions) {
+ this.name = name;
+ this.internal = internal;
+ this.partitions = partitions;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public boolean internal() {
+ return internal;
+ }
+
+ public NavigableMap<Integer, TopicPartitionInfo> partitions() {
+ return partitions;
+ }
+
+ @Override
+ public String toString() {
+ return "(name=" + name + ", internal=" + internal + ", partitions=" +
+ Utils.mkString(partitions, "[", "]", "=", ",") + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
new file mode 100644
index 0000000..4c25551
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+/**
+ * A listing of a topic in the cluster.
+ */
+public class TopicListing {
+ private final String name;
+ private final boolean internal;
+
+ TopicListing(String name, boolean internal) {
+ this.name = name;
+ this.internal = internal;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public boolean internal() {
+ return internal;
+ }
+
+ @Override
+ public String toString() {
+ return "(name=" + name + ", internal=" + internal + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java
new file mode 100644
index 0000000..b304802
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.List;
+
+public class TopicPartitionInfo {
+ private final int partition;
+ private final Node leader;
+ private final List<Node> replicas;
+ private final List<Node> isr;
+
+ TopicPartitionInfo(int partition, Node leader, List<Node> replicas, List<Node> isr) {
+ this.partition = partition;
+ this.leader = leader;
+ this.replicas = replicas;
+ this.isr = isr;
+ }
+
+ public int partition() {
+ return partition;
+ }
+
+ public Node leader() {
+ return leader;
+ }
+
+ public List<Node> replicas() {
+ return replicas;
+ }
+
+ public List<Node> isr() {
+ return isr;
+ }
+
+ public String toString() {
+ return "(partition=" + partition + ", leader=" + leader + ", replicas=" +
+ Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index ba1d2af..6619b4c 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -37,6 +37,7 @@ public final class Cluster {
private final List<Node> nodes;
private final Set<String> unauthorizedTopics;
private final Set<String> internalTopics;
+ private final Node controller;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
private final Map<String, List<PartitionInfo>> partitionsByTopic;
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
@@ -54,7 +55,7 @@ public final class Cluster {
public Cluster(Collection<Node> nodes,
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics) {
- this(null, false, nodes, partitions, unauthorizedTopics, Collections.<String>emptySet());
+ this(null, false, nodes, partitions, unauthorizedTopics, Collections.<String>emptySet(), null);
}
@@ -68,7 +69,21 @@ public final class Cluster {
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics,
Set<String> internalTopics) {
- this(clusterId, false, nodes, partitions, unauthorizedTopics, internalTopics);
+ this(clusterId, false, nodes, partitions, unauthorizedTopics, internalTopics, null);
+ }
+
+ /**
+ * Create a new cluster with the given id, nodes and partitions
+ * @param nodes The nodes in the cluster
+ * @param partitions Information about a subset of the topic-partitions this cluster hosts
+ */
+ public Cluster(String clusterId,
+ Collection<Node> nodes,
+ Collection<PartitionInfo> partitions,
+ Set<String> unauthorizedTopics,
+ Set<String> internalTopics,
+ Node controller) {
+ this(clusterId, false, nodes, partitions, unauthorizedTopics, internalTopics, controller);
}
private Cluster(String clusterId,
@@ -76,7 +91,8 @@ public final class Cluster {
Collection<Node> nodes,
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics,
- Set<String> internalTopics) {
+ Set<String> internalTopics,
+ Node controller) {
this.isBootstrapConfigured = isBootstrapConfigured;
this.clusterResource = new ClusterResource(clusterId);
// make a randomized, unmodifiable copy of the nodes
@@ -130,6 +146,7 @@ public final class Cluster {
this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics);
this.internalTopics = Collections.unmodifiableSet(internalTopics);
+ this.controller = controller;
}
/**
@@ -137,7 +154,7 @@ public final class Cluster {
*/
public static Cluster empty() {
return new Cluster(null, new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(),
- Collections.<String>emptySet());
+ Collections.<String>emptySet(), null);
}
/**
@@ -150,7 +167,7 @@ public final class Cluster {
int nodeId = -1;
for (InetSocketAddress address : addresses)
nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
- return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(), Collections.<String>emptySet());
+ return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(), Collections.<String>emptySet(), null);
}
/**
@@ -160,7 +177,7 @@ public final class Cluster {
Map<TopicPartition, PartitionInfo> combinedPartitions = new HashMap<>(this.partitionsByTopicPartition);
combinedPartitions.putAll(partitions);
return new Cluster(clusterResource.clusterId(), this.nodes, combinedPartitions.values(),
- new HashSet<>(this.unauthorizedTopics), new HashSet<>(this.internalTopics));
+ new HashSet<>(this.unauthorizedTopics), new HashSet<>(this.internalTopics), this.controller);
}
/**
@@ -265,6 +282,10 @@ public final class Cluster {
return clusterResource;
}
+ public Node controller() {
+ return controller;
+ }
+
@Override
public String toString() {
return "Cluster(id = " + clusterResource.clusterId() + ", nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
new file mode 100644
index 0000000..3c51fbe
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A flexible future which supports call chaining and other asynchronous programming patterns.
+ */
+public abstract class KafkaFuture<T> implements Future<T> {
+ /**
+ * A function which takes objects of type A and returns objects of type B.
+ */
+ public static abstract class Function<A, B> {
+ public abstract B apply(A a);
+ }
+
+ /**
+ * A consumer of two different types of object.
+ */
+ public static abstract class BiConsumer<A, B> {
+ public abstract void accept(A a, B b);
+ }
+
+ private static class AllOfAdapter<R> extends BiConsumer<R, Throwable> {
+ private int remainingResponses;
+ private KafkaFuture future;
+
+ public AllOfAdapter(int remainingResponses, KafkaFuture future) {
+ this.remainingResponses = remainingResponses;
+ this.future = future;
+ }
+
+ @Override
+ public synchronized void accept(R newValue, Throwable exception) {
+ if (remainingResponses <= 0)
+ return;
+ if (exception != null) {
+ remainingResponses = 0;
+ future.completeExceptionally(exception);
+ } else {
+ remainingResponses--;
+ if (remainingResponses <= 0)
+ future.complete(null);
+ }
+ }
+ }
+
+ /**
+ * Returns a new KafkaFuture that is already completed with the given value.
+ */
+ public static <U> KafkaFuture<U> completedFuture(U value) {
+ KafkaFuture<U> future = new KafkaFutureImpl<U>();
+ future.complete(value);
+ return future;
+ }
+
+ /**
+ * Returns a new KafkaFuture that is completed when all the given futures have completed. If
+ * any future throws an exception, the returned future returns it. If multiple futures throw
+ * an exception, which one gets returned is arbitrarily chosen.
+ */
+ public static KafkaFuture<Void> allOf(KafkaFuture<?>... futures) {
+ KafkaFuture<Void> allOfFuture = new KafkaFutureImpl<Void>();
+ AllOfAdapter allOfWaiter = new AllOfAdapter(futures.length, allOfFuture);
+ for (KafkaFuture<?> future : futures) {
+ future.addWaiter(allOfWaiter);
+ }
+ return allOfFuture;
+ }
+
+ /**
+ * Returns a new KafkaFuture that, when this future completes normally, is executed with this
+ * futures's result as the argument to the supplied function.
+ */
+ public abstract <R> KafkaFuture<R> thenApply(Function<T, R> function);
+
+ protected abstract void addWaiter(BiConsumer<? super T, ? super Throwable> action);
+
+ /**
+ * If not already completed, sets the value returned by get() and related methods to the given
+ * value.
+ */
+ protected abstract boolean complete(T newValue);
+
+ /**
+ * If not already completed, causes invocations of get() and related methods to throw the given
+ * exception.
+ */
+ protected abstract boolean completeExceptionally(Throwable newException);
+
+ /**
+ * If not already completed, completes this future with a CancellationException. Dependent
+ * futures that have not already completed will also complete exceptionally, with a
+ * CompletionException caused by this CancellationException.
+ */
+ @Override
+ public abstract boolean cancel(boolean mayInterruptIfRunning);
+
+ /**
+ * Waits if necessary for this future to complete, and then returns its result.
+ */
+ @Override
+ public abstract T get() throws InterruptedException, ExecutionException;
+
+ /**
+ * Waits if necessary for at most the given time for this future to complete, and then returns
+ * its result, if available.
+ */
+ @Override
+ public abstract T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
+ TimeoutException;
+
+ /**
+ * Returns the result value (or throws any encountered exception) if completed, else returns
+ * the given valueIfAbsent.
+ */
+ public abstract T getNow(T valueIfAbsent) throws InterruptedException, ExecutionException;
+
+ /**
+ * Returns true if this CompletableFuture was cancelled before it completed normally.
+ */
+ @Override
+ public abstract boolean isCancelled();
+
+ /**
+ * Returns true if this CompletableFuture completed exceptionally, in any way.
+ */
+ public abstract boolean isCompletedExceptionally();
+
+ /**
+ * Returns true if completed in any fashion: normally, exceptionally, or via cancellation.
+ */
+ @Override
+ public abstract boolean isDone();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
new file mode 100644
index 0000000..01355c6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.common.KafkaFuture;
+
+/**
+ * A flexible future which supports call chaining and other asynchronous programming patterns.
+ * This will eventually become a thin shim on top of Java 8's CompletableFuture.
+ */
+public class KafkaFutureImpl<T> extends KafkaFuture<T> {
+ /**
+ * A convenience method that throws the current exception, wrapping it if needed.
+ *
+ * In general, KafkaFuture throws CancellationException and InterruptedException directly, and
+ * wraps all other exceptions in an ExecutionException.
+ */
+ private static void wrapAndThrow(Throwable t) throws InterruptedException, ExecutionException {
+ if (t instanceof CancellationException) {
+ throw (CancellationException) t;
+ } else if (t instanceof InterruptedException) {
+ throw (InterruptedException) t;
+ } else {
+ throw new ExecutionException(t);
+ }
+ }
+
+ private static class Applicant<A, B> extends BiConsumer<A, Throwable> {
+ private final Function<A, B> function;
+ private final KafkaFutureImpl<B> future;
+
+ Applicant(Function<A, B> function, KafkaFutureImpl<B> future) {
+ this.function = function;
+ this.future = future;
+ }
+
+ @Override
+ public void accept(A a, Throwable exception) {
+ if (exception != null) {
+ future.completeExceptionally(exception);
+ } else {
+ try {
+ B b = function.apply(a);
+ future.complete(b);
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ }
+ }
+ }
+
+ private static class SingleWaiter<R> extends BiConsumer<R, Throwable> {
+ private R value = null;
+ private Throwable exception = null;
+ private boolean done = false;
+
+ @Override
+ public synchronized void accept(R newValue, Throwable newException) {
+ this.value = newValue;
+ this.exception = newException;
+ this.done = true;
+ this.notifyAll();
+ }
+
+ synchronized R await() throws InterruptedException, ExecutionException {
+ while (true) {
+ if (exception != null)
+ wrapAndThrow(exception);
+ if (done)
+ return value;
+ this.wait();
+ }
+ }
+
+ R await(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ long startMs = System.currentTimeMillis();
+ long waitTimeMs = (unit.toMillis(timeout) > 0) ? unit.toMillis(timeout) : 1;
+ long delta = 0;
+ synchronized (this) {
+ while (true) {
+ if (exception != null)
+ wrapAndThrow(exception);
+ if (done)
+ return value;
+ if (delta > waitTimeMs) {
+ throw new TimeoutException();
+ }
+ this.wait(waitTimeMs - delta);
+ delta = System.currentTimeMillis() - startMs;
+ }
+ }
+ }
+ }
+
+ /**
+ * True if this future is done.
+ */
+ private boolean done = false;
+
+ /**
+ * The value of this future, or null. Protected by the object monitor.
+ */
+ private T value = null;
+
+ /**
+ * The exception associated with this future, or null. Protected by the object monitor.
+ */
+ private Throwable exception = null;
+
+ /**
+ * A list of objects waiting for this future to complete (either successfully or
+ * exceptionally). Protected by the object monitor.
+ */
+ private List<BiConsumer<? super T, ? super Throwable>> waiters = new ArrayList<>();
+
+ /**
+ * Returns a new KafkaFuture that, when this future completes normally, is executed with this
+ * futures's result as the argument to the supplied function.
+ */
+ @Override
+ public <R> KafkaFuture<R> thenApply(Function<T, R> function) {
+ KafkaFutureImpl<R> future = new KafkaFutureImpl<R>();
+ addWaiter(new Applicant(function, future));
+ return future;
+ }
+
+ @Override
+ protected synchronized void addWaiter(BiConsumer<? super T, ? super Throwable> action) {
+ if (exception != null) {
+ action.accept(null, exception);
+ } else if (done) {
+ action.accept(value, null);
+ } else {
+ waiters.add(action);
+ }
+ }
+
+ @Override
+ public synchronized boolean complete(T newValue) {
+ List<BiConsumer<? super T, ? super Throwable>> oldWaiters = null;
+ synchronized (this) {
+ if (done)
+ return false;
+ value = newValue;
+ done = true;
+ oldWaiters = waiters;
+ waiters = null;
+ }
+ for (BiConsumer<? super T, ? super Throwable> waiter : oldWaiters) {
+ waiter.accept(newValue, null);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean completeExceptionally(Throwable newException) {
+ List<BiConsumer<? super T, ? super Throwable>> oldWaiters = null;
+ synchronized (this) {
+ if (done)
+ return false;
+ exception = newException;
+ done = true;
+ oldWaiters = waiters;
+ waiters = null;
+ }
+ for (BiConsumer<? super T, ? super Throwable> waiter : oldWaiters) {
+ waiter.accept(null, newException);
+ }
+ return true;
+ }
+
+ /**
+ * If not already completed, completes this future with a CancellationException. Dependent
+ * futures that have not already completed will also complete exceptionally, with a
+ * CompletionException caused by this CancellationException.
+ */
+ @Override
+ public synchronized boolean cancel(boolean mayInterruptIfRunning) {
+ if (completeExceptionally(new CancellationException()))
+ return true;
+ return exception instanceof CancellationException;
+ }
+
+ /**
+ * Waits if necessary for this future to complete, and then returns its result.
+ */
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ SingleWaiter<T> waiter = new SingleWaiter<T>();
+ addWaiter(waiter);
+ return waiter.await();
+ }
+
+ /**
+ * Waits if necessary for at most the given time for this future to complete, and then returns
+ * its result, if available.
+ */
+ @Override
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
+ TimeoutException {
+ SingleWaiter<T> waiter = new SingleWaiter<T>();
+ addWaiter(waiter);
+ return waiter.await(timeout, unit);
+ }
+
+ /**
+ * Returns the result value (or throws any encountered exception) if completed, else returns
+ * the given valueIfAbsent.
+ */
+ @Override
+ public synchronized T getNow(T valueIfAbsent) throws InterruptedException, ExecutionException {
+ if (exception != null)
+ wrapAndThrow(exception);
+ if (done)
+ return value;
+ return valueIfAbsent;
+ }
+
+ /**
+ * Returns true if this CompletableFuture was cancelled before it completed normally.
+ */
+ @Override
+ public synchronized boolean isCancelled() {
+ return (exception != null) && (exception instanceof CancellationException);
+ }
+
+ /**
+ * Returns true if this CompletableFuture completed exceptionally, in any way.
+ */
+ @Override
+ public synchronized boolean isCompletedExceptionally() {
+ return exception != null;
+ }
+
+ /**
+ * Returns true if completed in any fashion: normally, exceptionally, or via cancellation.
+ */
+ @Override
+ public synchronized boolean isDone() {
+ return done;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
index 303d76f..0e5ca78 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.KafkaException;
/**
* A ChannelBuilder interface to build Channel based on configs
*/
-public interface ChannelBuilder {
+public interface ChannelBuilder extends AutoCloseable {
/**
* Configure this class with the given key-value pairs
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 8dd3ad6..312e1f5 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -79,7 +79,7 @@ import org.slf4j.LoggerFactory;
*
* This class is not thread safe!
*/
-public class Selector implements Selectable {
+public class Selector implements Selectable, AutoCloseable {
public static final long NO_IDLE_TIMEOUT_MS = -1;
private static final Logger log = LoggerFactory.getLogger(Selector.class);
[3/3] kafka git commit: KAFKA-3265;
Add a public AdminClient API in Java (KIP-117)
Posted by ij...@apache.org.
KAFKA-3265; Add a public AdminClient API in Java (KIP-117)
Author: Colin P. Mccabe <cm...@confluent.io>
Reviewers: Dan Norwood <no...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #2472 from cmccabe/KAFKA-3265
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4aed28d1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4aed28d1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4aed28d1
Branch: refs/heads/trunk
Commit: 4aed28d1897c6c5293f372cb4fc44ab363dfc365
Parents: c96656e
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Tue May 2 00:16:01 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue May 2 00:20:22 2017 +0100
----------------------------------------------------------------------
checkstyle/import-control.xml | 5 +
checkstyle/suppressions.xml | 4 +-
.../java/org/apache/kafka/clients/Metadata.java | 4 +-
.../apache/kafka/clients/admin/AdminClient.java | 186 +++
.../kafka/clients/admin/AdminClientConfig.java | 163 +++
.../kafka/clients/admin/ApiVersionsOptions.java | 37 +
.../kafka/clients/admin/ApiVersionsResults.java | 63 ++
.../kafka/clients/admin/CreateTopicResults.java | 49 +
.../clients/admin/CreateTopicsOptions.java | 47 +
.../kafka/clients/admin/DeleteTopicResults.java | 50 +
.../clients/admin/DeleteTopicsOptions.java | 37 +
.../clients/admin/DescribeClusterOptions.java | 37 +
.../clients/admin/DescribeClusterResults.java | 43 +
.../clients/admin/DescribeTopicsOptions.java | 37 +
.../clients/admin/DescribeTopicsResults.java | 68 ++
.../kafka/clients/admin/KafkaAdminClient.java | 1065 ++++++++++++++++++
.../kafka/clients/admin/ListTopicsOptions.java | 54 +
.../kafka/clients/admin/ListTopicsResults.java | 67 ++
.../apache/kafka/clients/admin/NewTopic.java | 85 ++
.../kafka/clients/admin/TopicDescription.java | 56 +
.../kafka/clients/admin/TopicListing.java | 44 +
.../kafka/clients/admin/TopicPartitionInfo.java | 58 +
.../java/org/apache/kafka/common/Cluster.java | 33 +-
.../org/apache/kafka/common/KafkaFuture.java | 155 +++
.../kafka/common/internals/KafkaFutureImpl.java | 264 +++++
.../kafka/common/network/ChannelBuilder.java | 2 +-
.../apache/kafka/common/network/Selector.java | 2 +-
.../apache/kafka/common/protocol/Errors.java | 507 +++++++--
.../common/requests/CreateTopicsResponse.java | 5 +
.../kafka/common/requests/MetadataResponse.java | 3 +-
.../org/apache/kafka/common/utils/Utils.java | 2 +-
.../clients/admin/KafkaAdminClientTest.java | 206 ++++
.../apache/kafka/common/KafkaFutureTest.java | 164 +++
.../main/scala/kafka/admin/AdminClient.scala | 5 +
.../integration/kafka/api/AdminClientTest.scala | 263 -----
.../api/KafkaAdminClientIntegrationTest.scala | 162 +++
.../kafka/api/LegacyAdminClientTest.scala | 266 +++++
.../api/SaslSslAdminClientIntegrationTest.scala | 26 +
.../test/scala/unit/kafka/utils/TestUtils.scala | 3 +
39 files changed, 3943 insertions(+), 384 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index d7851a5..d40c4d4 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -47,6 +47,7 @@
<subpackage name="common">
<disallow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common" exact-match="true" />
+ <allow pkg="org.apache.kafka.common.internals" exact-match="true" />
<allow pkg="org.apache.kafka.test" />
<subpackage name="config">
@@ -134,6 +135,10 @@
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.apache.kafka.clients.producer" />
</subpackage>
+
+ <subpackage name="admin">
+ <allow pkg="org.apache.kafka.clients.admin" />
+ </subpackage>
</subpackage>
<subpackage name="server">
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index eae8dde..dd41f94 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -8,7 +8,7 @@
<!-- Clients -->
<suppress checks="ClassFanOutComplexity"
- files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest).java"/>
+ files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest|KafkaAdminClient).java"/>
<suppress checks="ClassFanOutComplexity"
files=".*/protocol/Errors.java"/>
<suppress checks="ClassFanOutComplexity"
@@ -35,7 +35,7 @@
files="DefaultRecordBatch.java"/>
<suppress checks="ClassDataAbstractionCoupling"
- files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager).java"/>
+ files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|KafkaAdminClient).java"/>
<suppress checks="ClassDataAbstractionCoupling"
files=".*/protocol/Errors.java"/>
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 5bfdb64..9ff629d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -332,6 +332,7 @@ public final class Metadata {
Collection<PartitionInfo> partitionInfos = new ArrayList<>();
List<Node> nodes = Collections.emptyList();
Set<String> internalTopics = Collections.emptySet();
+ Node controller = null;
String clusterId = null;
if (cluster != null) {
clusterId = cluster.clusterResource().clusterId();
@@ -346,7 +347,8 @@ public final class Metadata {
}
}
nodes = cluster.nodes();
+ controller = cluster.controller();
}
- return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, internalTopics);
+ return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, internalTopics, controller);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
new file mode 100644
index 0000000..a97219b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * The public interface for the {@link KafkaAdminClient}, which supports managing and inspecting topics,
+ * brokers, and configurations.
+ *
+ * @see KafkaAdminClient
+ */
+@InterfaceStability.Unstable
+public abstract class AdminClient implements AutoCloseable {
+ /**
+ * Create a new AdminClient with the given configuration.
+ *
+ * @param conf The configuration.
+ * @return The new KafkaAdminClient.
+ */
+ public static AdminClient create(Map<String, Object> conf) {
+ return KafkaAdminClient.create(new AdminClientConfig(conf));
+ }
+
+ /**
+ * Close the AdminClient and release all associated resources.
+ */
+ public abstract void close();
+
+ /**
+ * Create a batch of new topics with the default options.
+ *
+ * @param newTopics The new topics to create.
+ * @return The CreateTopicsResults.
+ */
+ public CreateTopicResults createTopics(Collection<NewTopic> newTopics) {
+ return createTopics(newTopics, new CreateTopicsOptions());
+ }
+
+ /**
+ * Create a batch of new topics.
+ *
+ * It may take several seconds after AdminClient#createTopics returns
+ * success for all the brokers to become aware that the topics have been created.
+ * During this time, AdminClient#listTopics and AdminClient#describeTopics
+ * may not return information about the new topics.
+ *
+ * @param newTopics The new topics to create.
+ * @param options The options to use when creating the new topics.
+ * @return The CreateTopicsResults.
+ */
+ public abstract CreateTopicResults createTopics(Collection<NewTopic> newTopics,
+ CreateTopicsOptions options);
+
+ /**
+ * Similar to #{@link AdminClient#deleteTopics(Collection<String>, DeleteTopicsOptions),
+ * but uses the default options.
+ *
+ * @param topics The topic names to delete.
+ * @return The DeleteTopicsResults.
+ */
+ public DeleteTopicResults deleteTopics(Collection<String> topics) {
+ return deleteTopics(topics, new DeleteTopicsOptions());
+ }
+
+ /**
+ * Delete a batch of topics.
+ *
+ * It may take several seconds after AdminClient#deleteTopics returns
+ * success for all the brokers to become aware that the topics are gone.
+ * During this time, AdminClient#listTopics and AdminClient#describeTopics
+ * may continue to return information about the deleted topics.
+ *
+ * If delete.topic.enable is false on the brokers, deleteTopics will mark
+ * the topics for deletion, but not actually delete them. The futures will
+ * return successfully in this case.
+ *
+ * @param topics The topic names to delete.
+ * @param options The options to use when deleting the topics.
+ * @return The DeleteTopicsResults.
+ */
+ public abstract DeleteTopicResults deleteTopics(Collection<String> topics, DeleteTopicsOptions options);
+
+ /**
+ * List the topics available in the cluster with the default options.
+ *
+ * @return The ListTopicsResults.
+ */
+ public ListTopicsResults listTopics() {
+ return listTopics(new ListTopicsOptions());
+ }
+
+ /**
+ * List the topics available in the cluster.
+ *
+ * @param options The options to use when listing the topics.
+ * @return The ListTopicsResults.
+ */
+ public abstract ListTopicsResults listTopics(ListTopicsOptions options);
+
+ /**
+ * Descripe an individual topic in the cluster, with the default options.
+ *
+ * See {@link AdminClient#describeTopics(Collection<String>, DescribeTopicsOptions)}
+ *
+ * @param topicNames The names of the topics to describe.
+ *
+ * @return The DescribeTopicsResults.
+ */
+ public DescribeTopicsResults describeTopics(Collection<String> topicNames) {
+ return describeTopics(topicNames, new DescribeTopicsOptions());
+ }
+
+ /**
+ * Descripe an individual topic in the cluster.
+ *
+ * Note that if auto.create.topics.enable is true on the brokers,
+ * AdminClient#describeTopic(topicName) may create a topic named topicName.
+ * There are two workarounds: either use AdminClient#listTopics and ensure
+ * that the topic is present before describing, or disable
+ * auto.create.topics.enable.
+ *
+ * @param topicNames The names of the topics to describe.
+ * @param options The options to use when describing the topic.
+ *
+ * @return The DescribeTopicsResults.
+ */
+ public abstract DescribeTopicsResults describeTopics(Collection<String> topicNames,
+ DescribeTopicsOptions options);
+
+ /**
+ * Get information about the nodes in the cluster, using the default options.
+ *
+ * @return The DescribeClusterResults.
+ */
+ public DescribeClusterResults describeCluster() {
+ return describeCluster(new DescribeClusterOptions());
+ }
+
+ /**
+ * Get information about the nodes in the cluster.
+ *
+ * @param options The options to use when getting information about the cluster.
+ * @return The DescribeClusterResults.
+ */
+ public abstract DescribeClusterResults describeCluster(DescribeClusterOptions options);
+
+ /**
+ * Get information about the api versions of nodes in the cluster with the default options.
+ * See {@link AdminClient#apiVersions(Collection<Node>, ApiVersionsOptions)}
+ *
+ * @param nodes The nodes to get information about, or null to get information about all nodes.
+ * @return The ApiVersionsResults.
+ */
+ public ApiVersionsResults apiVersions(Collection<Node> nodes) {
+ return apiVersions(nodes, new ApiVersionsOptions());
+ }
+
+ /**
+ * Get information about the api versions of nodes in the cluster.
+ *
+ * @param nodes The nodes to get information about, or null to get information about all nodes.
+ * @param options The options to use when getting api versions of the nodes.
+ * @return The ApiVersionsResults.
+ */
+ public abstract ApiVersionsResults apiVersions(Collection<Node> nodes, ApiVersionsOptions options);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
new file mode 100644
index 0000000..368a42e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.metrics.Sensor;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
+/**
+ * The AdminClient configuration keys
+ */
+public class AdminClientConfig extends AbstractConfig {
+ private static final ConfigDef CONFIG;
+
+ /**
+ * <code>bootstrap.servers</code>
+ */
+ public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+ private static final String BOOTSTRAP_SERVERS_DOC = CommonClientConfigs.BOOTSTRAP_SERVERS_DOC;
+
+ /**
+ * <code>reconnect.backoff.ms</code>
+ */
+ public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
+ private static final String RECONNECT_BACKOFF_MS_DOC = CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC;
+
+ /**
+ * <code>retry.backoff.ms</code>
+ */
+ public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
+ private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to " +
+ "retry a failed request. This avoids repeatedly sending requests in a tight loop under " +
+ "some failure scenarios.";
+
+ /** <code>connections.max.idle.ms</code> */
+ public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
+ private static final String CONNECTIONS_MAX_IDLE_MS_DOC = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC;
+
+ /** <code>request.timeout.ms</code> */
+ public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
+ private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
+
+ public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
+ private static final String CLIENT_ID_DOC = CommonClientConfigs.CLIENT_ID_DOC;
+
+ public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
+ private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC;
+
+ public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG;
+ private static final String SEND_BUFFER_DOC = CommonClientConfigs.SEND_BUFFER_DOC;
+
+ public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
+ private static final String RECEIVE_BUFFER_DOC = CommonClientConfigs.RECEIVE_BUFFER_DOC;
+
+ public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
+ private static final String METRIC_REPORTER_CLASSES_DOC = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC;
+
+ public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
+ private static final String METRICS_NUM_SAMPLES_DOC = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC;
+
+ public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
+ private static final String METRICS_SAMPLE_WINDOW_MS_DOC = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC;
+
+ public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
+
+ public static final String SECURITY_PROTOCOL_CONFIG = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+ public static final String DEFAULT_SECURITY_PROTOCOL = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
+ private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC;
+ private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;
+
+ static {
+ CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
+ Type.LIST,
+ Importance.HIGH,
+ BOOTSTRAP_SERVERS_DOC)
+ .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CLIENT_ID_DOC)
+ .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC)
+ .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1), Importance.MEDIUM, SEND_BUFFER_DOC)
+ .define(RECEIVE_BUFFER_CONFIG, Type.INT, 64 * 1024, atLeast(-1), Importance.MEDIUM, RECEIVE_BUFFER_DOC)
+ .define(RECONNECT_BACKOFF_MS_CONFIG,
+ Type.LONG,
+ 50L,
+ atLeast(0L),
+ Importance.LOW,
+ RECONNECT_BACKOFF_MS_DOC)
+ .define(RETRY_BACKOFF_MS_CONFIG,
+ Type.LONG,
+ 100L,
+ atLeast(0L),
+ Importance.LOW,
+ RETRY_BACKOFF_MS_DOC)
+ .define(REQUEST_TIMEOUT_MS_CONFIG,
+ Type.INT,
+ 120000,
+ atLeast(0),
+ Importance.MEDIUM,
+ REQUEST_TIMEOUT_MS_DOC)
+ .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
+ Type.LONG,
+ 5 * 60 * 1000,
+ Importance.MEDIUM,
+ CONNECTIONS_MAX_IDLE_MS_DOC)
+ .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
+ Type.LONG,
+ 30000,
+ atLeast(0),
+ Importance.LOW,
+ METRICS_SAMPLE_WINDOW_MS_DOC)
+ .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
+ .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC)
+ .define(METRICS_RECORDING_LEVEL_CONFIG,
+ Type.STRING,
+ Sensor.RecordingLevel.INFO.toString(),
+ in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
+ Importance.LOW,
+ METRICS_RECORDING_LEVEL_DOC)
+ // security support
+ .define(SECURITY_PROTOCOL_CONFIG,
+ Type.STRING,
+ DEFAULT_SECURITY_PROTOCOL,
+ Importance.MEDIUM,
+ SECURITY_PROTOCOL_DOC)
+ .withClientSslSupport()
+ .withClientSaslSupport();
+ }
+
+ AdminClientConfig(Map<?, ?> props) {
+ super(CONFIG, props);
+ }
+
+ public static Set<String> configNames() {
+ return CONFIG.names();
+ }
+
+ public static void main(String[] args) {
+ System.out.println(CONFIG.toHtmlTable());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsOptions.java
new file mode 100644
index 0000000..cbcd234
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for the apiVersions call.
+ */
+@InterfaceStability.Unstable
+public class ApiVersionsOptions {
+ private Integer timeoutMs = null;
+
+ public ApiVersionsOptions timeoutMs(Integer timeoutMs) {
+ this.timeoutMs = timeoutMs;
+ return this;
+ }
+
+ public Integer timeoutMs() {
+ return timeoutMs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResults.java
new file mode 100644
index 0000000..456c64d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResults.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Results of the apiVersions call.
+ */
+@InterfaceStability.Unstable
+public class ApiVersionsResults {
+ private final Map<Node, KafkaFuture<NodeApiVersions>> futures;
+
+ ApiVersionsResults(Map<Node, KafkaFuture<NodeApiVersions>> futures) {
+ this.futures = futures;
+ }
+
+ public Map<Node, KafkaFuture<NodeApiVersions>> results() {
+ return futures;
+ }
+
+ public KafkaFuture<Map<Node, NodeApiVersions>> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+ thenApply(new KafkaFuture.Function<Void, Map<Node, NodeApiVersions>>() {
+ @Override
+ public Map<Node, NodeApiVersions> apply(Void v) {
+ Map<Node, NodeApiVersions> versions = new HashMap<>(futures.size());
+ for (Map.Entry<Node, KafkaFuture<NodeApiVersions>> entry : futures.entrySet()) {
+ try {
+ versions.put(entry.getKey(), entry.getValue().get());
+ } catch (InterruptedException | ExecutionException e) {
+ // This should be unreachable, because allOf ensured that all the futures
+ // completed successfully.
+ throw new RuntimeException(e);
+ }
+ }
+ return versions;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicResults.java
new file mode 100644
index 0000000..03da7d0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicResults.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * The result of newTopics.
+ */
+@InterfaceStability.Unstable
+public class CreateTopicResults {
+ private final Map<String, KafkaFuture<Void>> futures;
+
+ CreateTopicResults(Map<String, KafkaFuture<Void>> futures) {
+ this.futures = futures;
+ }
+
+ /**
+ * Return a map from topic names to futures, which can be used to check the status of individual
+ * topic creations.
+ */
+ public Map<String, KafkaFuture<Void>> results() {
+ return futures;
+ }
+
+ /**
+ * Return a future which succeeds if all the topic creations succeed.
+ */
+ public KafkaFuture<Void> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
new file mode 100644
index 0000000..c1f3944
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for newTopics.
+ */
+@InterfaceStability.Unstable
+public class CreateTopicsOptions {
+ private Integer timeoutMs = null;
+ private boolean validateOnly = false;
+
+ public CreateTopicsOptions timeoutMs(Integer timeoutMs) {
+ this.timeoutMs = timeoutMs;
+ return this;
+ }
+
+ public Integer timeoutMs() {
+ return timeoutMs;
+ }
+
+ public CreateTopicsOptions validateOnly(boolean validateOnly) {
+ this.validateOnly = validateOnly;
+ return this;
+ }
+
+ public boolean validateOnly() {
+ return validateOnly;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicResults.java
new file mode 100644
index 0000000..3dd4889
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicResults.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * The result of the deleteTopics call.
+ */
+@InterfaceStability.Unstable
+public class DeleteTopicResults {
+ final Map<String, KafkaFuture<Void>> futures;
+
+ DeleteTopicResults(Map<String, KafkaFuture<Void>> futures) {
+ this.futures = futures;
+ }
+
+ /**
+ * Return a map from topic names to futures which can be used to check the status of
+ * individual deletions.
+ */
+ public Map<String, KafkaFuture<Void>> results() {
+ return futures;
+ }
+
+ /**
+ * Return a future which succeeds only if all the topic deletions succeed.
+ */
+ public KafkaFuture<Void> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
new file mode 100644
index 0000000..3630968
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for deleteTopics.
+ */
+@InterfaceStability.Unstable
+public class DeleteTopicsOptions {
+ private Integer timeoutMs = null;
+
+ public DeleteTopicsOptions timeoutMs(Integer timeoutMs) {
+ this.timeoutMs = timeoutMs;
+ return this;
+ }
+
+ public Integer timeoutMs() {
+ return timeoutMs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
new file mode 100644
index 0000000..604ee13
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for the describeCluster call.
+ */
+@InterfaceStability.Unstable
+public class DescribeClusterOptions {
+ private Integer timeoutMs = null;
+
+ public DescribeClusterOptions timeoutMs(Integer timeoutMs) {
+ this.timeoutMs = timeoutMs;
+ return this;
+ }
+
+ public Integer timeoutMs() {
+ return timeoutMs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
new file mode 100644
index 0000000..5ee834b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+
+/**
+ * The results of the describeCluster call.
+ */
+@InterfaceStability.Unstable
+public class DescribeClusterResults {
+ private final KafkaFuture<Collection<Node>> future;
+
+ DescribeClusterResults(KafkaFuture<Collection<Node>> future) {
+ this.future = future;
+ }
+
+ /**
+ * Returns a future which yields a collection of nodes.
+ */
+ public KafkaFuture<Collection<Node>> nodes() {
+ return future;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
new file mode 100644
index 0000000..1bf6632
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for describeTopics.
+ */
+@InterfaceStability.Unstable
+public class DescribeTopicsOptions {
+ private Integer timeoutMs = null;
+
+ public DescribeTopicsOptions timeoutMs(Integer timeoutMs) {
+ this.timeoutMs = timeoutMs;
+ return this;
+ }
+
+ public Integer timeoutMs() {
+ return timeoutMs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java
new file mode 100644
index 0000000..630ba95
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * The results of the describeTopic call.
+ */
+@InterfaceStability.Unstable
+public class DescribeTopicsResults {
+ private final Map<String, KafkaFuture<TopicDescription>> futures;
+
+ DescribeTopicsResults(Map<String, KafkaFuture<TopicDescription>> futures) {
+ this.futures = futures;
+ }
+
+ /**
+ * Return a map from topic names to futures which can be used to check the status of
+ * individual deletions.
+ */
+ public Map<String, KafkaFuture<TopicDescription>> results() {
+ return futures;
+ }
+
+ /**
+ * Return a future which succeeds only if all the topic deletions succeed.
+ */
+ public KafkaFuture<Map<String, TopicDescription>> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+ thenApply(new KafkaFuture.Function<Void, Map<String, TopicDescription>>() {
+ @Override
+ public Map<String, TopicDescription> apply(Void v) {
+ Map<String, TopicDescription> descriptions = new HashMap<>(futures.size());
+ for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) {
+ try {
+ descriptions.put(entry.getKey(), entry.getValue().get());
+ } catch (InterruptedException | ExecutionException e) {
+ // This should be unreachable, because allOf ensured that all the futures
+ // completed successfully.
+ throw new RuntimeException(e);
+ }
+ }
+ return descriptions;
+ }
+ });
+ }
+}