You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/09/02 20:55:25 UTC
[3/3] kafka git commit: KAFKA-2411; remove usage of blocking channel
KAFKA-2411; remove usage of blocking channel
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jun Rao <ju...@gmail.com>, Gwen Shapira <cs...@gmail.com>
Closes #151 from ijuma/kafka-2411-remove-usage-of-blocking-channel
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d02ca36c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d02ca36c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d02ca36c
Branch: refs/heads/trunk
Commit: d02ca36ca1cccdb6962191b97f54ce96b9d75abc
Parents: d0adf6a
Author: Ismael Juma <is...@juma.me.uk>
Authored: Wed Sep 2 11:55:08 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Sep 2 11:55:08 2015 -0700
----------------------------------------------------------------------
.../org/apache/kafka/clients/ClientUtils.java | 23 +-
.../kafka/clients/ClusterConnectionStates.java | 13 +-
.../org/apache/kafka/clients/KafkaClient.java | 18 +-
.../kafka/clients/ManualMetadataUpdater.java | 76 +++++
.../apache/kafka/clients/MetadataUpdater.java | 72 ++++
.../org/apache/kafka/clients/NetworkClient.java | 318 ++++++++++++------
.../errors/BrokerNotAvailableException.java | 32 ++
.../common/errors/ControllerMovedException.java | 32 ++
.../kafka/common/network/ChannelBuilders.java | 52 +++
.../kafka/common/network/NetworkReceive.java | 10 -
.../apache/kafka/common/network/Selectable.java | 20 +-
.../apache/kafka/common/network/Selector.java | 8 +-
.../apache/kafka/common/protocol/ApiKeys.java | 4 +-
.../apache/kafka/common/protocol/Errors.java | 6 +-
.../kafka/common/protocol/ProtoUtils.java | 6 +
.../apache/kafka/common/protocol/Protocol.java | 111 ++++++-
.../kafka/common/requests/AbstractRequest.java | 6 +
.../requests/ControlledShutdownRequest.java | 69 ++++
.../requests/ControlledShutdownResponse.java | 91 ++++++
.../common/requests/LeaderAndIsrRequest.java | 212 ++++++++++++
.../common/requests/LeaderAndIsrResponse.java | 105 ++++++
.../common/requests/UpdateMetadataRequest.java | 291 +++++++++++++++++
.../common/requests/UpdateMetadataResponse.java | 59 ++++
.../org/apache/kafka/clients/MockClient.java | 10 +
.../apache/kafka/clients/NetworkClientTest.java | 23 +-
.../common/requests/RequestResponseTest.java | 119 ++++++-
.../org/apache/kafka/test/MockSelector.java | 4 +
.../kafka/api/ControlledShutdownRequest.scala | 19 +-
.../controller/ControllerChannelManager.scala | 327 ++++++++++++-------
.../kafka/controller/KafkaController.scala | 46 ++-
.../controller/PartitionStateMachine.scala | 4 +-
.../kafka/controller/ReplicaStateMachine.scala | 2 +-
.../kafka/controller/TopicDeletionManager.scala | 20 +-
.../scala/kafka/network/BlockingChannel.scala | 10 +-
.../main/scala/kafka/network/SocketServer.scala | 13 +-
.../main/scala/kafka/server/KafkaServer.scala | 184 +++++++++--
.../kafka/utils/NetworkClientBlockingOps.scala | 142 ++++++++
.../controller/ControllerFailoverTest.scala | 18 +-
.../unit/kafka/server/LeaderElectionTest.scala | 38 ++-
39 files changed, 2229 insertions(+), 384 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index ba3bcbe..e7514f8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -19,10 +19,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.network.ChannelBuilder;
-import org.apache.kafka.common.network.SSLChannelBuilder;
-import org.apache.kafka.common.network.PlaintextChannelBuilder;
import org.apache.kafka.common.security.ssl.SSLFactory;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
@@ -71,25 +70,13 @@ public class ClientUtils {
/**
* @param configs client/server configs
- * returns ChannelBuilder configured channelBuilder based on the configs.
+ * @return configured ChannelBuilder based on the configs.
*/
public static ChannelBuilder createChannelBuilder(Map<String, ?> configs) {
SecurityProtocol securityProtocol = SecurityProtocol.valueOf((String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
- ChannelBuilder channelBuilder = null;
-
- switch (securityProtocol) {
- case SSL:
- channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.CLIENT);
- break;
- case PLAINTEXT:
- channelBuilder = new PlaintextChannelBuilder();
- break;
- default:
- throw new ConfigException("Invalid SecurityProtocol " + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
- }
-
- channelBuilder.configure(configs);
- return channelBuilder;
+ if (securityProtocol != SecurityProtocol.SSL && securityProtocol != SecurityProtocol.PLAINTEXT)
+ throw new ConfigException("Invalid SecurityProtocol " + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
+ return ChannelBuilders.create(securityProtocol, SSLFactory.Mode.CLIENT, configs);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 9ebda5e..6c58211 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -120,6 +120,17 @@ final class ClusterConnectionStates {
NodeConnectionState nodeState = nodeState(id);
nodeState.state = ConnectionState.DISCONNECTED;
}
+
+ /**
+ * Remove the given node from the tracked connection states. The main difference between this and `disconnected`
+ * is the impact on `connectionDelay`: it will be 0 after this call whereas `reconnectBackoffMs` will be taken
+ * into account after `disconnected` is called.
+ *
+ * @param id The connection to remove
+ */
+ public void remove(String id) {
+ nodeState.remove(id);
+ }
/**
* Get the state of a given connection
@@ -158,4 +169,4 @@ final class ClusterConnectionStates {
return "NodeState(" + state + ", " + lastConnectAttemptMs + ")";
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 7ab2503..f46c0d9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -81,6 +81,13 @@ public interface KafkaClient extends Closeable {
public List<ClientResponse> poll(long timeout, long now);
/**
+ * Closes the connection to a particular node (if there is one).
+ *
+ * @param nodeId The id of the node
+ */
+ public void close(String nodeId);
+
+ /**
* Complete all in-flight requests for a given connection
*
* @param id The connection to complete requests for
@@ -127,8 +134,17 @@ public interface KafkaClient extends Closeable {
public RequestHeader nextRequestHeader(ApiKeys key);
/**
+ * Generate a request header for the given API key
+ *
+ * @param key The api key
+ * @param version The api version
+ * @return A request header with the appropriate client id and correlation id
+ */
+ public RequestHeader nextRequestHeader(ApiKeys key, short version);
+
+ /**
* Wake up the client if it is currently blocked waiting for I/O
*/
public void wakeup();
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
new file mode 100644
index 0000000..efbe664
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A simple implementation of `MetadataUpdater` that returns the cluster nodes set via the constructor or via
+ * `setNodes`.
+ *
+ * This is useful in cases where automatic metadata updates are not required. An example is controller/broker
+ * communication.
+ *
+ * This class is not thread-safe!
+ */
+public class ManualMetadataUpdater implements MetadataUpdater {
+
+ private List<Node> nodes;
+
+ public ManualMetadataUpdater() {
+ this(new ArrayList<Node>(0));
+ }
+
+ public ManualMetadataUpdater(List<Node> nodes) {
+ this.nodes = nodes;
+ }
+
+ public void setNodes(List<Node> nodes) {
+ this.nodes = nodes;
+ }
+
+ @Override
+ public List<Node> fetchNodes() {
+ return new ArrayList<>(nodes);
+ }
+
+ @Override
+ public boolean isUpdateDue(long now) {
+ return false;
+ }
+
+ @Override
+ public long maybeUpdate(long now) {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public boolean maybeHandleDisconnection(ClientRequest request) {
+ return false;
+ }
+
+ @Override
+ public boolean maybeHandleCompletedReceive(ClientRequest request, long now, Struct body) {
+ return false;
+ }
+
+ @Override
+ public void requestUpdate() {
+ // Do nothing
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
new file mode 100644
index 0000000..4669a68
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.util.List;
+
+/**
+ * The interface used by `NetworkClient` to request cluster metadata info to be updated and to retrieve the cluster nodes
+ * from such metadata. This is an internal class.
+ * <p>
+ * This class is not thread-safe!
+ */
+interface MetadataUpdater {
+
+ /**
+ * Gets the current cluster info without blocking.
+ */
+ List<Node> fetchNodes();
+
+ /**
+ * Returns true if an update to the cluster metadata info is due.
+ */
+ boolean isUpdateDue(long now);
+
+ /**
+ * Starts a cluster metadata update if needed and possible. Returns the time until the metadata update (which would
+ * be 0 if an update has been started as a result of this call).
+ *
+ * If the implementation relies on `NetworkClient` to send requests, the completed receive will be passed to
+ * `maybeHandleCompletedReceive`.
+ *
+ * The semantics of `needed` and `possible` are implementation-dependent and may take into account a number of
+ * factors like node availability, how long since the last metadata update, etc.
+ */
+ long maybeUpdate(long now);
+
+ /**
+ * If `request` is a metadata request, handles it and return `true`. Otherwise, returns `false`.
+ *
+ * This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own
+ * requests with special handling for disconnections of such requests.
+ */
+ boolean maybeHandleDisconnection(ClientRequest request);
+
+ /**
+ * If `request` is a metadata request, handles it and returns `true`. Otherwise, returns `false`.
+ *
+ * This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own
+ * requests with special handling for completed receives of such requests.
+ */
+ boolean maybeHandleCompletedReceive(ClientRequest request, long now, Struct body);
+
+ /**
+ * Schedules an update of the current cluster metadata info. A subsequent call to `maybeUpdate` would trigger the
+ * start of the update if possible (see `maybeUpdate` for more information).
+ */
+ void requestUpdate();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index b31f7f1..0a6f952 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -48,9 +48,8 @@ public class NetworkClient implements KafkaClient {
/* the selector used to perform network i/o */
private final Selectable selector;
-
- /* the current cluster metadata */
- private final Metadata metadata;
+
+ private final MetadataUpdater metadataUpdater;
/* the state of each node's connection */
private final ClusterConnectionStates connectionStates;
@@ -73,12 +72,6 @@ public class NetworkClient implements KafkaClient {
/* the current correlation id to use when sending requests to servers */
private int correlation;
- /* true iff there is a metadata request that has been sent and for which we have not yet received a response */
- private boolean metadataFetchInProgress;
-
- /* the last timestamp when no broker node is available to connect */
- private long lastNoNodeAvailableMs;
-
public NetworkClient(Selectable selector,
Metadata metadata,
String clientId,
@@ -86,8 +79,43 @@ public class NetworkClient implements KafkaClient {
long reconnectBackoffMs,
int socketSendBuffer,
int socketReceiveBuffer) {
+ this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection,
+ reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer);
+ }
+
+ public NetworkClient(Selectable selector,
+ MetadataUpdater metadataUpdater,
+ String clientId,
+ int maxInFlightRequestsPerConnection,
+ long reconnectBackoffMs,
+ int socketSendBuffer,
+ int socketReceiveBuffer) {
+ this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs,
+ socketSendBuffer, socketReceiveBuffer);
+ }
+
+ private NetworkClient(MetadataUpdater metadataUpdater,
+ Metadata metadata,
+ Selectable selector,
+ String clientId,
+ int maxInFlightRequestsPerConnection,
+ long reconnectBackoffMs,
+ int socketSendBuffer,
+ int socketReceiveBuffer) {
+
+ /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
+ * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
+ * super constructor is invoked.
+ */
+ if (metadataUpdater == null) {
+ if (metadata == null)
+ throw new IllegalArgumentException("`metadata` must not be null");
+ this.metadataUpdater = new DefaultMetadataUpdater(metadata);
+ } else {
+ this.metadataUpdater = metadataUpdater;
+ }
+
this.selector = selector;
- this.metadata = metadata;
this.clientId = clientId;
this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs);
@@ -95,8 +123,6 @@ public class NetworkClient implements KafkaClient {
this.socketReceiveBuffer = socketReceiveBuffer;
this.correlation = 0;
this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE);
- this.metadataFetchInProgress = false;
- this.lastNoNodeAvailableMs = 0;
}
/**
@@ -119,6 +145,17 @@ public class NetworkClient implements KafkaClient {
}
/**
+ * Closes the connection to a particular node (if there is one).
+ *
+ * @param nodeId The id of the node
+ */
+ @Override
+ public void close(String nodeId) {
+ selector.close(nodeId);
+ connectionStates.remove(nodeId);
+ }
+
+ /**
* Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
* disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
* connections.
@@ -154,14 +191,9 @@ public class NetworkClient implements KafkaClient {
*/
@Override
public boolean isReady(Node node, long now) {
- String nodeId = node.idString();
- if (!this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0)
- // if we need to update our metadata now declare all requests unready to make metadata requests first
- // priority
- return false;
- else
- // otherwise we are ready if we are connected and can send more requests
- return canSendRequest(nodeId);
+ // if we need to update our metadata now declare all requests unready to make metadata requests first
+ // priority
+ return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString());
}
/**
@@ -193,7 +225,10 @@ public class NetworkClient implements KafkaClient {
String nodeId = request.request().destination();
if (!canSendRequest(nodeId))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
+ doSend(request);
+ }
+ private void doSend(ClientRequest request) {
this.inFlightRequests.add(request);
selector.send(request.request());
}
@@ -207,16 +242,7 @@ public class NetworkClient implements KafkaClient {
*/
@Override
public List<ClientResponse> poll(long timeout, long now) {
- // should we update our metadata?
- long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
- long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
- long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
- // if there is no node available to connect, back off refreshing metadata
- long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
- waitForMetadataFetch);
- if (metadataTimeout == 0)
- maybeUpdateMetadata(now);
- // do the I/O
+ long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Math.min(timeout, metadataTimeout));
} catch (IOException e) {
@@ -224,7 +250,7 @@ public class NetworkClient implements KafkaClient {
}
// process completed actions
- List<ClientResponse> responses = new ArrayList<ClientResponse>();
+ List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, now);
handleCompletedReceives(responses, now);
handleDisconnections(responses, now);
@@ -304,6 +330,18 @@ public class NetworkClient implements KafkaClient {
}
/**
+ * Generate a request header for the given API key and version
+ *
+ * @param key The api key
+ * @param version The api version
+ * @return A request header with the appropriate client id and correlation id
+ */
+ @Override
+ public RequestHeader nextRequestHeader(ApiKeys key, short version) {
+ return new RequestHeader(key.id, version, clientId, correlation++);
+ }
+
+ /**
* Interrupt the client if it is blocked waiting on I/O.
*/
@Override
@@ -327,8 +365,9 @@ public class NetworkClient implements KafkaClient {
*
* @return The node with the fewest in-flight requests.
*/
+ @Override
public Node leastLoadedNode(long now) {
- List<Node> nodes = this.metadata.fetch().nodes();
+ List<Node> nodes = this.metadataUpdater.fetchNodes();
int inflight = Integer.MAX_VALUE;
Node found = null;
for (int i = 0; i < nodes.size(); i++) {
@@ -378,30 +417,8 @@ public class NetworkClient implements KafkaClient {
short apiKey = req.request().header().apiKey();
Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
correlate(req.request().header(), header);
- if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
- handleMetadataResponse(req.request().header(), body, now);
- } else {
- // need to add body/header to response here
+ if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
responses.add(new ClientResponse(req, now, false, body));
- }
- }
- }
-
- private void handleMetadataResponse(RequestHeader header, Struct body, long now) {
- this.metadataFetchInProgress = false;
- MetadataResponse response = new MetadataResponse(body);
- Cluster cluster = response.cluster();
- // check if any topics metadata failed to get updated
- if (response.errors().size() > 0) {
- log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors());
- }
- // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
- // created which means we will get errors and no nodes until it exists
- if (cluster.nodes().size() > 0) {
- this.metadata.update(cluster, now);
- } else {
- log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
- this.metadata.failedUpdate(now);
}
}
@@ -417,16 +434,13 @@ public class NetworkClient implements KafkaClient {
log.debug("Node {} disconnected.", node);
for (ClientRequest request : this.inFlightRequests.clearAll(node)) {
log.trace("Cancelled request {} due to node {} being disconnected", request, node);
- ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey());
- if (requestKey == ApiKeys.METADATA)
- metadataFetchInProgress = false;
- else
+ if (!metadataUpdater.maybeHandleDisconnection(request))
responses.add(new ClientResponse(request, now, true, null));
}
}
// we got a disconnect so we should probably refresh our metadata and see if that broker is dead
if (this.selector.disconnected().size() > 0)
- this.metadata.requestUpdate();
+ metadataUpdater.requestUpdate();
}
/**
@@ -449,52 +463,6 @@ public class NetworkClient implements KafkaClient {
}
/**
- * Create a metadata request for the given topics
- */
- private ClientRequest metadataRequest(long now, String node, Set<String> topics) {
- MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics));
- RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
- return new ClientRequest(now, true, send, null, true);
- }
-
- /**
- * Add a metadata request to the list of sends if we can make one
- */
- private void maybeUpdateMetadata(long now) {
- // Beware that the behavior of this method and the computation of timeouts for poll() are
- // highly dependent on the behavior of leastLoadedNode.
- Node node = this.leastLoadedNode(now);
- if (node == null) {
- log.debug("Give up sending metadata request since no node is available");
- // mark the timestamp for no node available to connect
- this.lastNoNodeAvailableMs = now;
- return;
- }
- String nodeConnectionId = node.idString();
-
- if (canSendRequest(nodeConnectionId)) {
- Set<String> topics = metadata.topics();
- this.metadataFetchInProgress = true;
- ClientRequest metadataRequest = metadataRequest(now, nodeConnectionId, topics);
- log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
- this.selector.send(metadataRequest.request());
- this.inFlightRequests.add(metadataRequest);
- } else if (connectionStates.canConnect(nodeConnectionId, now)) {
- // we don't have a connection to this node right now, make one
- log.debug("Initialize connection to node {} for sending metadata request", node.id());
- initiateConnect(node, now);
- // If initiateConnect failed immediately, this node will be put into blackout and we
- // should allow immediately retrying in case there is another candidate node. If it
- // is still connecting, the worst case is that we end up setting a longer timeout
- // on the next round and then wait for the response.
- } else { // connected, but can't send more OR connecting
- // In either case, we just need to wait for a network event to let us know the selected
- // connection might be usable again.
- this.lastNoNodeAvailableMs = now;
- }
- }
-
- /**
* Initiate a connection to the given node
*/
private void initiateConnect(Node node, long now) {
@@ -510,9 +478,145 @@ public class NetworkClient implements KafkaClient {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnected(nodeConnectionId);
/* maybe the problem is our metadata, update it */
- metadata.requestUpdate();
+ metadataUpdater.requestUpdate();
log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
}
}
+ class DefaultMetadataUpdater implements MetadataUpdater {
+
+ /* the current cluster metadata */
+ private final Metadata metadata;
+
+ /* true iff there is a metadata request that has been sent and for which we have not yet received a response */
+ private boolean metadataFetchInProgress;
+
+ /* the last timestamp when no broker node is available to connect */
+ private long lastNoNodeAvailableMs;
+
+ DefaultMetadataUpdater(Metadata metadata) {
+ this.metadata = metadata;
+ this.metadataFetchInProgress = false;
+ this.lastNoNodeAvailableMs = 0;
+ }
+
+ @Override
+ public List<Node> fetchNodes() {
+ return metadata.fetch().nodes();
+ }
+
+ @Override
+ public boolean isUpdateDue(long now) {
+ return !this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0;
+ }
+
+ @Override
+ public long maybeUpdate(long now) {
+ // should we update our metadata?
+ long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
+ long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
+ long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
+ // if there is no node available to connect, back off refreshing metadata
+ long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
+ waitForMetadataFetch);
+
+ if (metadataTimeout == 0) {
+ // Beware that the behavior of this method and the computation of timeouts for poll() are
+ // highly dependent on the behavior of leastLoadedNode.
+ Node node = leastLoadedNode(now);
+ maybeUpdate(now, node);
+ }
+
+ return metadataTimeout;
+ }
+
+ @Override
+ public boolean maybeHandleDisconnection(ClientRequest request) {
+ ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey());
+
+ if (requestKey == ApiKeys.METADATA) {
+ metadataFetchInProgress = false;
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
+ short apiKey = req.request().header().apiKey();
+ if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
+ handleResponse(req.request().header(), body, now);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void requestUpdate() {
+ this.metadata.requestUpdate();
+ }
+
+ private void handleResponse(RequestHeader header, Struct body, long now) {
+ this.metadataFetchInProgress = false;
+ MetadataResponse response = new MetadataResponse(body);
+ Cluster cluster = response.cluster();
+ // check if any topics metadata failed to get updated
+ if (response.errors().size() > 0) {
+ log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors());
+ }
+ // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
+ // created which means we will get errors and no nodes until it exists
+ if (cluster.nodes().size() > 0) {
+ this.metadata.update(cluster, now);
+ } else {
+ log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
+ this.metadata.failedUpdate(now);
+ }
+ }
+
+ /**
+ * Create a metadata request for the given topics
+ */
+ private ClientRequest request(long now, String node, Set<String> topics) {
+ MetadataRequest metadata = new MetadataRequest(new ArrayList<>(topics));
+ RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
+ return new ClientRequest(now, true, send, null, true);
+ }
+
+ /**
+ * Add a metadata request to the list of sends if we can make one
+ */
+ private void maybeUpdate(long now, Node node) {
+ if (node == null) {
+ log.debug("Give up sending metadata request since no node is available");
+ // mark the timestamp for no node available to connect
+ this.lastNoNodeAvailableMs = now;
+ return;
+ }
+ String nodeConnectionId = node.idString();
+
+ if (canSendRequest(nodeConnectionId)) {
+ Set<String> topics = metadata.topics();
+ this.metadataFetchInProgress = true;
+ ClientRequest metadataRequest = request(now, nodeConnectionId, topics);
+ log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
+ doSend(metadataRequest);
+ } else if (connectionStates.canConnect(nodeConnectionId, now)) {
+ // we don't have a connection to this node right now, make one
+ log.debug("Initialize connection to node {} for sending metadata request", node.id());
+ initiateConnect(node, now);
+ // If initiateConnect failed immediately, this node will be put into blackout and we
+ // should allow immediately retrying in case there is another candidate node. If it
+ // is still connecting, the worst case is that we end up setting a longer timeout
+ // on the next round and then wait for the response.
+ } else { // connected, but can't send more OR connecting
+ // In either case, we just need to wait for a network event to let us know the selected
+ // connection might be usable again.
+ this.lastNoNodeAvailableMs = now;
+ }
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/errors/BrokerNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/BrokerNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/BrokerNotAvailableException.java
new file mode 100644
index 0000000..f78f061
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/BrokerNotAvailableException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+public class BrokerNotAvailableException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public BrokerNotAvailableException(String message) {
+ super(message);
+ }
+
+ public BrokerNotAvailableException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/errors/ControllerMovedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ControllerMovedException.java b/clients/src/main/java/org/apache/kafka/common/errors/ControllerMovedException.java
new file mode 100644
index 0000000..8dd7487
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ControllerMovedException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+public class ControllerMovedException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public ControllerMovedException(String message) {
+ super(message);
+ }
+
+ public ControllerMovedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
new file mode 100644
index 0000000..2332d3f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.ssl.SSLFactory;
+
+import java.util.Map;
+
+public class ChannelBuilders {
+
+ private ChannelBuilders() { }
+
+ /**
+ * @param securityProtocol the securityProtocol
+ * @param mode the SSL mode, it must be non-null if `securityProcol` is `SSL` and it is ignored otherwise
+ * @param configs client/server configs
+ * @return the configured `ChannelBuilder`
+ * @throws IllegalArgumentException if `mode` invariants described above is not maintained
+ */
+ public static ChannelBuilder create(SecurityProtocol securityProtocol, SSLFactory.Mode mode, Map<String, ?> configs) {
+ ChannelBuilder channelBuilder = null;
+
+ switch (securityProtocol) {
+ case SSL:
+ if (mode == null)
+ throw new IllegalArgumentException("`mode` must be non-null if `securityProtocol` is `SSL`");
+ channelBuilder = new SSLChannelBuilder(mode);
+ break;
+ case PLAINTEXT:
+ case TRACE:
+ channelBuilder = new PlaintextChannelBuilder();
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol);
+ }
+
+ channelBuilder.configure(configs);
+ return channelBuilder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
index 2a1568e..409775c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
@@ -107,14 +107,4 @@ public class NetworkReceive implements Receive {
return this.buffer;
}
- // Used only by BlockingChannel, so we may be able to get rid of this when/if we get rid of BlockingChannel
- @Deprecated
- public long readCompletely(ReadableByteChannel channel) throws IOException {
- int totalRead = 0;
- while (!complete()) {
- totalRead += readFromReadableChannel(channel);
- }
- return totalRead;
- }
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
index 39eae4a..70e74bd 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
@@ -23,6 +23,11 @@ import java.util.List;
public interface Selectable {
/**
+ * See {@link #connect(String, InetSocketAddress, int, int) connect()}
+ */
+ public static final int USE_DEFAULT_BUFFER_SIZE = -1;
+
+ /**
* Begin establishing a socket connection to the given address identified by the given address
* @param id The id for this connection
* @param address The address to connect to
@@ -48,7 +53,12 @@ public interface Selectable {
public void close();
/**
- * Queue the given request for sending in the subsequent {@poll(long)} calls
+ * Close the connection identified by the given id
+ */
+ public void close(String nodeId);
+
+ /**
+ * Queue the given request for sending in the subsequent {@link #poll(long) poll()} calls
* @param send The request to send
*/
public void send(Send send);
@@ -61,23 +71,23 @@ public interface Selectable {
public void poll(long timeout) throws IOException;
/**
- * The list of sends that completed on the last {@link #poll(long, List) poll()} call.
+ * The list of sends that completed on the last {@link #poll(long) poll()} call.
*/
public List<Send> completedSends();
/**
- * The list of receives that completed on the last {@link #poll(long, List) poll()} call.
+ * The list of receives that completed on the last {@link #poll(long) poll()} call.
*/
public List<NetworkReceive> completedReceives();
/**
- * The list of connections that finished disconnecting on the last {@link #poll(long, List) poll()}
+ * The list of connections that finished disconnecting on the last {@link #poll(long) poll()}
* call.
*/
public List<String> disconnected();
/**
- * The list of connections that completed their connection on the last {@link #poll(long, List) poll()}
+ * The list of connections that completed their connection on the last {@link #poll(long) poll()}
* call.
*/
public List<String> connected();
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index f49d54c..4aa5cbb 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -146,8 +146,10 @@ public class Selector implements Selectable {
socketChannel.configureBlocking(false);
Socket socket = socketChannel.socket();
socket.setKeepAlive(true);
- socket.setSendBufferSize(sendBufferSize);
- socket.setReceiveBufferSize(receiveBufferSize);
+ if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
+ socket.setSendBufferSize(sendBufferSize);
+ if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
+ socket.setReceiveBufferSize(receiveBufferSize);
socket.setTcpNoDelay(true);
try {
socketChannel.connect(address);
@@ -182,7 +184,7 @@ public class Selector implements Selectable {
*/
@Override
public void disconnect(String id) {
- KafkaChannel channel = channelForId(id);
+ KafkaChannel channel = this.channels.get(id);
if (channel != null)
channel.disconnect();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index b39e9bb..46ddddb 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -49,7 +49,7 @@ public enum ApiKeys {
MAX_API_KEY = maxKey;
}
- /** the perminant and immutable id of an API--this can't change ever */
+ /** the permanent and immutable id of an API--this can't change ever */
public final short id;
/** an english description of the api--this is for debugging and can change */
@@ -63,4 +63,4 @@ public enum ApiKeys {
public static ApiKeys forId(int id) {
return codeToType[id];
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index e17e390..641afa1 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -20,6 +20,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.errors.*;
+import org.apache.kafka.common.errors.ControllerMovedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,11 +46,14 @@ public enum Errors {
new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
REQUEST_TIMED_OUT(7,
new TimeoutException("The request timed out.")),
- // TODO: errorCode 8 for BrokerNotAvailable
+ BROKER_NOT_AVAILABLE(8,
+ new BrokerNotAvailableException("The broker is not available.")),
REPLICA_NOT_AVAILABLE(9,
new ApiException("The replica is not available for the requested topic-partition")),
MESSAGE_TOO_LARGE(10,
new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
+ STALE_CONTROLLER_EPOCH(11,
+ new ControllerMovedException("The controller moved to another broker.")),
OFFSET_METADATA_TOO_LARGE(12,
new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
NETWORK_EXCEPTION(13,
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
index c2cbbbd..85357ab 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
@@ -29,6 +29,8 @@ public class ProtoUtils {
Schema[] versions = schemas[apiKey];
if (version < 0 || version > versions.length)
throw new IllegalArgumentException("Invalid version for API key " + apiKey + ": " + version);
+ if (versions[version] == null)
+ throw new IllegalArgumentException("Unsupported version for API key " + apiKey + ": " + version);
return versions[version];
}
@@ -62,4 +64,8 @@ public class ProtoUtils {
return (Struct) currentResponseSchema(apiKey).read(buffer);
}
+ public static Struct parseResponse(int apiKey, int version, ByteBuffer buffer) {
+ return (Struct) responseSchema(apiKey, version).read(buffer);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index a951e90..b72db4f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -41,7 +41,7 @@ public class Protocol {
public static final Schema METADATA_REQUEST_V0 = new Schema(new Field("topics",
new ArrayOf(STRING),
- "An array of topics to fetch metadata for. If no topics are specified fetch metadtata for all topics."));
+ "An array of topics to fetch metadata for. If no topics are specified fetch metadata for all topics."));
public static final Schema BROKER = new Schema(new Field("node_id", INT32, "The broker id."),
new Field("host", STRING, "The hostname of the broker."),
@@ -396,6 +396,25 @@ public class Protocol {
public static final Schema[] CONSUMER_METADATA_REQUEST = new Schema[] {CONSUMER_METADATA_REQUEST_V0};
public static final Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] {CONSUMER_METADATA_RESPONSE_V0};
+ /* Controlled shutdown api */
+ public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = new Schema(new Field("broker_id",
+ INT32,
+ "The id of the broker for which controlled shutdown has been requested."));
+
+ public static final Schema CONTROLLED_SHUTDOWN_PARTITION_V1 = new Schema(new Field("topic", STRING),
+ new Field("partition",
+ INT32,
+ "Topic partition id."));
+
+ public static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V1 = new Schema(new Field("error_code", INT16),
+ new Field("partitions_remaining",
+ new ArrayOf(CONTROLLED_SHUTDOWN_PARTITION_V1),
+ "The partitions that the broker still leads."));
+
+ /* V0 is not supported as it would require changes to the request header not to include `clientId` */
+ public static final Schema[] CONTROLLED_SHUTDOWN_REQUEST = new Schema[] {null, CONTROLLED_SHUTDOWN_REQUEST_V1};
+ public static final Schema[] CONTROLLED_SHUTDOWN_RESPONSE = new Schema[] {null, CONTROLLED_SHUTDOWN_RESPONSE_V1};
+
/* Join group api */
public static final Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id",
STRING,
@@ -442,6 +461,39 @@ public class Protocol {
public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
+ /* Leader and ISR api */
+ public static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0 =
+ new Schema(new Field("topic", STRING, "Topic name."),
+ new Field("partition", INT32, "Topic partition id."),
+ new Field("controller_epoch", INT32, "The controller epoch."),
+ new Field("leader", INT32, "The broker id for the leader."),
+ new Field("leader_epoch", INT32, "The leader epoch."),
+ new Field("isr", new ArrayOf(INT32), "The in sync replica ids."),
+ new Field("zk_version", INT32, "The ZK version."),
+ new Field("replicas", new ArrayOf(INT32), "The replica ids."));
+
+ public static final Schema LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0 =
+ new Schema(new Field("id", INT32, "The broker id."),
+ new Field("host", STRING, "The hostname of the broker."),
+ new Field("port", INT32, "The port on which the broker accepts requests."));
+
+ public static final Schema LEADER_AND_ISR_REQUEST_V0 = new Schema(new Field("controller_id", INT32, "The controller id."),
+ new Field("controller_epoch", INT32, "The controller epoch."),
+ new Field("partition_states",
+ new ArrayOf(LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0)),
+ new Field("live_leaders", new ArrayOf(LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0)));
+
+ public static final Schema LEADER_AND_ISR_RESPONSE_PARTITION_V0 = new Schema(new Field("topic", STRING, "Topic name."),
+ new Field("partition", INT32, "Topic partition id."),
+ new Field("error_code", INT16, "Error code."));
+
+ public static final Schema LEADER_AND_ISR_RESPONSE_V0 = new Schema(new Field("error_code", INT16, "Error code."),
+ new Field("partitions",
+ new ArrayOf(LEADER_AND_ISR_RESPONSE_PARTITION_V0)));
+
+ public static final Schema[] LEADER_AND_ISR_REQUEST = new Schema[] {LEADER_AND_ISR_REQUEST_V0};
+ public static final Schema[] LEADER_AND_ISR_RESPONSE = new Schema[] {LEADER_AND_ISR_RESPONSE_V0};
+
/* Replica api */
public static final Schema STOP_REPLICA_REQUEST_PARTITION_V0 = new Schema(new Field("topic", STRING, "Topic name."),
new Field("partition", INT32, "Topic partition id."));
@@ -465,7 +517,50 @@ public class Protocol {
public static final Schema[] STOP_REPLICA_REQUEST = new Schema[] {STOP_REPLICA_REQUEST_V0};
public static final Schema[] STOP_REPLICA_RESPONSE = new Schema[] {STOP_REPLICA_RESPONSE_V0};
- /* an array of all requests and responses with all schema versions */
+ /* Update metadata api */
+
+ public static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V0 = LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0;
+
+ public static final Schema UPDATE_METADATA_REQUEST_BROKER_V0 =
+ new Schema(new Field("id", INT32, "The broker id."),
+ new Field("host", STRING, "The hostname of the broker."),
+ new Field("port", INT32, "The port on which the broker accepts requests."));
+
+ public static final Schema UPDATE_METADATA_REQUEST_V0 = new Schema(new Field("controller_id", INT32, "The controller id."),
+ new Field("controller_epoch", INT32, "The controller epoch."),
+ new Field("partition_states",
+ new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V0)),
+ new Field("live_brokers",
+ new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V0)));
+
+ public static final Schema UPDATE_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16, "Error code."));
+
+ public static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V1 = UPDATE_METADATA_REQUEST_PARTITION_STATE_V0;
+
+ public static final Schema UPDATE_METADATA_REQUEST_END_POINT_V1 =
+ // for some reason, V1 sends `port` before `host` while V0 sends `host` before `port
+ new Schema(new Field("port", INT32, "The port on which the broker accepts requests."),
+ new Field("host", STRING, "The hostname of the broker."),
+ new Field("security_protocol_type", INT16, "The security protocol type."));
+
+ public static final Schema UPDATE_METADATA_REQUEST_BROKER_V1 =
+ new Schema(new Field("id", INT32, "The broker id."),
+ new Field("end_points", new ArrayOf(UPDATE_METADATA_REQUEST_END_POINT_V1)));
+
+ public static final Schema UPDATE_METADATA_REQUEST_V1 = new Schema(new Field("controller_id", INT32, "The controller id."),
+ new Field("controller_epoch", INT32, "The controller epoch."),
+ new Field("partition_states",
+ new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V1)),
+ new Field("live_brokers",
+ new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V1)));
+
+ public static final Schema UPDATE_METADATA_RESPONSE_V1 = UPDATE_METADATA_RESPONSE_V0;
+
+ public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1};
+ public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1};
+
+ /* an array of all requests and responses with all schema versions; a null value in the inner array means that the
+ * particular version is not supported */
public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -477,10 +572,10 @@ public class Protocol {
REQUESTS[ApiKeys.FETCH.id] = FETCH_REQUEST;
REQUESTS[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_REQUEST;
REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST;
- REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
+ REQUESTS[ApiKeys.LEADER_AND_ISR.id] = LEADER_AND_ISR_REQUEST;
REQUESTS[ApiKeys.STOP_REPLICA.id] = STOP_REPLICA_REQUEST;
- REQUESTS[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {};
- REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {};
+ REQUESTS[ApiKeys.UPDATE_METADATA_KEY.id] = UPDATE_METADATA_REQUEST;
+ REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_REQUEST;
REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST;
REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST;
REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST;
@@ -492,10 +587,10 @@ public class Protocol {
RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
RESPONSES[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_RESPONSE;
RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE;
- RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
+ RESPONSES[ApiKeys.LEADER_AND_ISR.id] = LEADER_AND_ISR_RESPONSE;
RESPONSES[ApiKeys.STOP_REPLICA.id] = STOP_REPLICA_RESPONSE;
- RESPONSES[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {};
- RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {};
+ RESPONSES[ApiKeys.UPDATE_METADATA_KEY.id] = UPDATE_METADATA_RESPONSE;
+ RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_RESPONSE;
RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE;
RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE;
RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE;
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index e316957..a696e80 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -57,6 +57,12 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
return HeartbeatRequest.parse(buffer, versionId);
case STOP_REPLICA:
return StopReplicaRequest.parse(buffer, versionId);
+ case CONTROLLED_SHUTDOWN_KEY:
+ return ControlledShutdownRequest.parse(buffer, versionId);
+ case UPDATE_METADATA_KEY:
+ return UpdateMetadataRequest.parse(buffer, versionId);
+ case LEADER_AND_ISR:
+ return LeaderAndIsrRequest.parse(buffer, versionId);
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
new file mode 100644
index 0000000..57f51d8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+public class ControlledShutdownRequest extends AbstractRequest {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id);
+
+ private static final String BROKER_ID_KEY_NAME = "broker_id";
+
+ private int brokerId;
+
+ public ControlledShutdownRequest(int brokerId) {
+ super(new Struct(CURRENT_SCHEMA));
+ struct.set(BROKER_ID_KEY_NAME, brokerId);
+ this.brokerId = brokerId;
+ }
+
+ public ControlledShutdownRequest(Struct struct) {
+ super(struct);
+ brokerId = struct.getInt(BROKER_ID_KEY_NAME);
+ }
+
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ switch (versionId) {
+ case 0:
+ throw new IllegalArgumentException(String.format("Version 0 is not supported. It is only supported by " +
+ "the Scala request class for controlled shutdown"));
+ case 1:
+ return new ControlledShutdownResponse(Errors.forException(e).code(), Collections.<TopicPartition>emptySet());
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id)));
+ }
+ }
+
+ public int brokerId() {
+ return brokerId;
+ }
+
+ public static ControlledShutdownRequest parse(ByteBuffer buffer, int versionId) {
+ return new ControlledShutdownRequest(ProtoUtils.parseRequest(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, versionId, buffer));
+ }
+
+ public static ControlledShutdownRequest parse(ByteBuffer buffer) {
+ return new ControlledShutdownRequest((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
new file mode 100644
index 0000000..15d600d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class ControlledShutdownResponse extends AbstractRequestResponse {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id);
+
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+ private static final String PARTITIONS_REMAINING_KEY_NAME = "partitions_remaining";
+
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITION_KEY_NAME = "partition";
+
+ /**
+ * Possible error codes:
+ *
+ * UNKNOWN(-1) (this is because IllegalStateException may be thrown in `KafkaController.shutdownBroker`, it would be good to improve this)
+ * BROKER_NOT_AVAILABLE(8)
+ * STALE_CONTROLLER_EPOCH(11)
+ */
+ private final short errorCode;
+
+ private final Set<TopicPartition> partitionsRemaining;
+
+ public ControlledShutdownResponse(short errorCode, Set<TopicPartition> partitionsRemaining) {
+ super(new Struct(CURRENT_SCHEMA));
+
+ struct.set(ERROR_CODE_KEY_NAME, errorCode);
+
+ List<Struct> partitionsRemainingList = new ArrayList<>(partitionsRemaining.size());
+ for (TopicPartition topicPartition : partitionsRemaining) {
+ Struct topicPartitionStruct = struct.instance(PARTITIONS_REMAINING_KEY_NAME);
+ topicPartitionStruct.set(TOPIC_KEY_NAME, topicPartition.topic());
+ topicPartitionStruct.set(PARTITION_KEY_NAME, topicPartition.partition());
+ }
+ struct.set(PARTITIONS_REMAINING_KEY_NAME, partitionsRemainingList.toArray());
+
+ this.errorCode = errorCode;
+ this.partitionsRemaining = partitionsRemaining;
+ }
+
+ public ControlledShutdownResponse(Struct struct) {
+ super(struct);
+ errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+ Set<TopicPartition> partitions = new HashSet<>();
+ for (Object topicPartitionObj : struct.getArray(PARTITIONS_REMAINING_KEY_NAME)) {
+ Struct topicPartition = (Struct) topicPartitionObj;
+ String topic = topicPartition.getString(TOPIC_KEY_NAME);
+ int partition = topicPartition.getInt(PARTITION_KEY_NAME);
+ partitions.add(new TopicPartition(topic, partition));
+ }
+ partitionsRemaining = partitions;
+ }
+
+ public short errorCode() {
+ return errorCode;
+ }
+
+ public Set<TopicPartition> partitionsRemaining() {
+ return partitionsRemaining;
+ }
+
+ public static ControlledShutdownResponse parse(ByteBuffer buffer) {
+ return new ControlledShutdownResponse((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+
+ public static ControlledShutdownResponse parse(ByteBuffer buffer, int version) {
+ return new ControlledShutdownResponse(ProtoUtils.parseResponse(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, version, buffer));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
new file mode 100644
index 0000000..6b16496
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;;
+import java.util.*;
+
+public class LeaderAndIsrRequest extends AbstractRequest {
+
+ public static class PartitionState {
+ public final int controllerEpoch;
+ public final int leader;
+ public final int leaderEpoch;
+ public final List<Integer> isr;
+ public final int zkVersion;
+ public final Set<Integer> replicas;
+
+ public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer> isr, int zkVersion, Set<Integer> replicas) {
+ this.controllerEpoch = controllerEpoch;
+ this.leader = leader;
+ this.leaderEpoch = leaderEpoch;
+ this.isr = isr;
+ this.zkVersion = zkVersion;
+ this.replicas = replicas;
+ }
+
+ }
+
+ public static final class EndPoint {
+ public final int id;
+ public final String host;
+ public final int port;
+
+ public EndPoint(int id, String host, int port) {
+ this.id = id;
+ this.host = host;
+ this.port = port;
+ }
+ }
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LEADER_AND_ISR.id);
+
+ private static final String CONTROLLER_ID_KEY_NAME = "controller_id";
+ private static final String CONTROLLER_EPOCH_KEY_NAME = "controller_epoch";
+ private static final String PARTITION_STATES_KEY_NAME = "partition_states";
+ private static final String LIVE_LEADERS_KEY_NAME = "live_leaders";
+
+ // partition_states key names
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String LEADER_KEY_NAME = "leader";
+ private static final String LEADER_EPOCH_KEY_NAME = "leader_epoch";
+ private static final String ISR_KEY_NAME = "isr";
+ private static final String ZK_VERSION_KEY_NAME = "zk_version";
+ private static final String REPLICAS_KEY_NAME = "replicas";
+
+ // live_leaders key names
+ private static final String END_POINT_ID_KEY_NAME = "id";
+ private static final String HOST_KEY_NAME = "host";
+ private static final String PORT_KEY_NAME = "port";
+
+ private final int controllerId;
+ private final int controllerEpoch;
+ private final Map<TopicPartition, PartitionState> partitionStates;
+ private final Set<EndPoint> liveLeaders;
+
+ public LeaderAndIsrRequest(int controllerId, int controllerEpoch, Map<TopicPartition, PartitionState> partitionStates,
+ Set<EndPoint> liveLeaders) {
+ super(new Struct(CURRENT_SCHEMA));
+ struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
+ struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch);
+
+ List<Struct> partitionStatesData = new ArrayList<>(partitionStates.size());
+ for (Map.Entry<TopicPartition, PartitionState> entry : partitionStates.entrySet()) {
+ Struct partitionStateData = struct.instance(PARTITION_STATES_KEY_NAME);
+ TopicPartition topicPartition = entry.getKey();
+ partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic());
+ partitionStateData.set(PARTITION_KEY_NAME, topicPartition.partition());
+ PartitionState partitionState = entry.getValue();
+ partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.controllerEpoch);
+ partitionStateData.set(LEADER_KEY_NAME, partitionState.leader);
+ partitionStateData.set(LEADER_EPOCH_KEY_NAME, partitionState.leaderEpoch);
+ partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray());
+ partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.zkVersion);
+ partitionStateData.set(REPLICAS_KEY_NAME, partitionState.replicas.toArray());
+ partitionStatesData.add(partitionStateData);
+ }
+ struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray());
+
+ List<Struct> leadersData = new ArrayList<>(liveLeaders.size());
+ for (EndPoint leader : liveLeaders) {
+ Struct leaderData = struct.instance(LIVE_LEADERS_KEY_NAME);
+ leaderData.set(END_POINT_ID_KEY_NAME, leader.id);
+ leaderData.set(HOST_KEY_NAME, leader.host);
+ leaderData.set(PORT_KEY_NAME, leader.port);
+ leadersData.add(leaderData);
+ }
+ struct.set(LIVE_LEADERS_KEY_NAME, leadersData.toArray());
+
+ this.controllerId = controllerId;
+ this.controllerEpoch = controllerEpoch;
+ this.partitionStates = partitionStates;
+ this.liveLeaders = liveLeaders;
+ }
+
+ public LeaderAndIsrRequest(Struct struct) {
+ super(struct);
+
+ Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
+ for (Object partitionStateDataObj : struct.getArray(PARTITION_STATES_KEY_NAME)) {
+ Struct partitionStateData = (Struct) partitionStateDataObj;
+ String topic = partitionStateData.getString(TOPIC_KEY_NAME);
+ int partition = partitionStateData.getInt(PARTITION_KEY_NAME);
+ int controllerEpoch = partitionStateData.getInt(CONTROLLER_EPOCH_KEY_NAME);
+ int leader = partitionStateData.getInt(LEADER_KEY_NAME);
+ int leaderEpoch = partitionStateData.getInt(LEADER_EPOCH_KEY_NAME);
+
+ Object[] isrArray = partitionStateData.getArray(ISR_KEY_NAME);
+ List<Integer> isr = new ArrayList<>(isrArray.length);
+ for (Object r : isrArray)
+ isr.add((Integer) r);
+
+ int zkVersion = partitionStateData.getInt(ZK_VERSION_KEY_NAME);
+
+ Object[] replicasArray = partitionStateData.getArray(REPLICAS_KEY_NAME);
+ Set<Integer> replicas = new HashSet<>(replicasArray.length);
+ for (Object r : replicasArray)
+ replicas.add((Integer) r);
+
+ PartitionState partitionState = new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas);
+ partitionStates.put(new TopicPartition(topic, partition), partitionState);
+
+ }
+
+ Set<EndPoint> leaders = new HashSet<>();
+ for (Object leadersDataObj : struct.getArray(LIVE_LEADERS_KEY_NAME)) {
+ Struct leadersData = (Struct) leadersDataObj;
+ int id = leadersData.getInt(END_POINT_ID_KEY_NAME);
+ String host = leadersData.getString(HOST_KEY_NAME);
+ int port = leadersData.getInt(PORT_KEY_NAME);
+ leaders.add(new EndPoint(id, host, port));
+ }
+
+ controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME);
+ controllerEpoch = struct.getInt(CONTROLLER_EPOCH_KEY_NAME);
+ this.partitionStates = partitionStates;
+ this.liveLeaders = leaders;
+ }
+
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ Map<TopicPartition, Short> responses = new HashMap<>(partitionStates.size());
+ for (TopicPartition partition : partitionStates.keySet()) {
+ responses.put(partition, Errors.forException(e).code());
+ }
+
+ switch (versionId) {
+ case 0:
+ return new LeaderAndIsrResponse(Errors.NONE.code(), responses);
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LEADER_AND_ISR.id)));
+ }
+ }
+
+ public int controllerId() {
+ return controllerId;
+ }
+
+ public int controllerEpoch() {
+ return controllerEpoch;
+ }
+
+ public Map<TopicPartition, PartitionState> partitionStates() {
+ return partitionStates;
+ }
+
+ public Set<EndPoint> liveLeaders() {
+ return liveLeaders;
+ }
+
+ public static LeaderAndIsrRequest parse(ByteBuffer buffer, int versionId) {
+ return new LeaderAndIsrRequest(ProtoUtils.parseRequest(ApiKeys.LEADER_AND_ISR.id, versionId, buffer));
+ }
+
+ public static LeaderAndIsrRequest parse(ByteBuffer buffer) {
+ return new LeaderAndIsrRequest((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
new file mode 100644
index 0000000..3a6f4ee
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class LeaderAndIsrResponse extends AbstractRequestResponse {
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LEADER_AND_ISR.id);
+
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+ private static final String PARTITIONS_KEY_NAME = "partitions";
+
+ private static final String PARTITIONS_TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_PARTITION_KEY_NAME = "partition";
+ private static final String PARTITIONS_ERROR_CODE_KEY_NAME = "error_code";
+
+ /**
+ * Possible error code:
+ *
+ * STALE_CONTROLLER_EPOCH (11)
+ */
+ private final short errorCode;
+
+ private final Map<TopicPartition, Short> responses;
+
+ public LeaderAndIsrResponse(Map<TopicPartition, Short> responses) {
+ this(Errors.NONE.code(), responses);
+ }
+
+ public LeaderAndIsrResponse(short errorCode, Map<TopicPartition, Short> responses) {
+ super(new Struct(CURRENT_SCHEMA));
+
+ struct.set(ERROR_CODE_KEY_NAME, errorCode);
+
+ List<Struct> responseDatas = new ArrayList<>(responses.size());
+ for (Map.Entry<TopicPartition, Short> response : responses.entrySet()) {
+ Struct partitionData = struct.instance(PARTITIONS_KEY_NAME);
+ TopicPartition partition = response.getKey();
+ partitionData.set(PARTITIONS_TOPIC_KEY_NAME, partition.topic());
+ partitionData.set(PARTITIONS_PARTITION_KEY_NAME, partition.partition());
+ partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, response.getValue());
+ responseDatas.add(partitionData);
+ }
+
+ struct.set(PARTITIONS_KEY_NAME, responseDatas.toArray());
+ struct.set(ERROR_CODE_KEY_NAME, errorCode);
+
+ this.responses = responses;
+ this.errorCode = errorCode;
+ }
+
+ public LeaderAndIsrResponse(Struct struct) {
+ super(struct);
+
+ responses = new HashMap<>();
+ for (Object responseDataObj : struct.getArray(PARTITIONS_KEY_NAME)) {
+ Struct responseData = (Struct) responseDataObj;
+ String topic = responseData.getString(PARTITIONS_TOPIC_KEY_NAME);
+ int partition = responseData.getInt(PARTITIONS_PARTITION_KEY_NAME);
+ short errorCode = responseData.getShort(PARTITIONS_ERROR_CODE_KEY_NAME);
+ responses.put(new TopicPartition(topic, partition), errorCode);
+ }
+
+ errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+ }
+
+ public Map<TopicPartition, Short> responses() {
+ return responses;
+ }
+
+ public short errorCode() {
+ return errorCode;
+ }
+
+ public static LeaderAndIsrResponse parse(ByteBuffer buffer, int version) {
+ return new LeaderAndIsrResponse(ProtoUtils.parseResponse(ApiKeys.LEADER_AND_ISR.id, version, buffer));
+ }
+
+ public static LeaderAndIsrResponse parse(ByteBuffer buffer) {
+ return new LeaderAndIsrResponse((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+
+}