You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/11/15 00:46:31 UTC
[5/5] kafka git commit: KAFKA-2066;
Use client-side FetchRequest/FetchResponse on server
KAFKA-2066; Use client-side FetchRequest/FetchResponse on server
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #2069 from hachikuji/KAFKA-2066
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3b4c3479
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3b4c3479
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3b4c3479
Branch: refs/heads/trunk
Commit: 3b4c347949c02b1e2b1dd473deda0f8d2304d027
Parents: 1f1d450
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Nov 14 16:31:04 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Nov 14 16:31:04 2016 -0800
----------------------------------------------------------------------
checkstyle/import-control.xml | 2 +
.../org/apache/kafka/clients/ClientRequest.java | 65 +++---
.../apache/kafka/clients/ClientResponse.java | 56 +++--
.../apache/kafka/clients/InFlightRequests.java | 46 ++--
.../org/apache/kafka/clients/KafkaClient.java | 1 -
.../kafka/clients/ManualMetadataUpdater.java | 11 +-
.../apache/kafka/clients/MetadataUpdater.java | 10 +-
.../org/apache/kafka/clients/NetworkClient.java | 164 ++++++++------
.../consumer/internals/AbstractCoordinator.java | 33 +--
.../consumer/internals/ConsumerCoordinator.java | 12 -
.../internals/ConsumerNetworkClient.java | 30 ++-
.../clients/consumer/internals/Fetcher.java | 11 +-
.../clients/producer/internals/Sender.java | 68 +++---
.../kafka/common/network/ByteBufferSend.java | 5 +-
.../kafka/common/network/NetworkSend.java | 4 +-
.../apache/kafka/common/protocol/ApiKeys.java | 2 +-
.../apache/kafka/common/protocol/Protocol.java | 21 +-
.../kafka/common/protocol/types/ArrayOf.java | 13 +-
.../kafka/common/protocol/types/Schema.java | 18 +-
.../kafka/common/protocol/types/Struct.java | 16 +-
.../kafka/common/protocol/types/Type.java | 48 ++++
.../apache/kafka/common/record/FileRecords.java | 126 +++++++++++
.../kafka/common/record/LogInputStream.java | 35 +++
.../kafka/common/record/MemoryRecords.java | 186 ++++------------
.../org/apache/kafka/common/record/Records.java | 15 ++
.../kafka/common/record/RecordsIterator.java | 170 ++++++++++++++
.../kafka/common/requests/AbstractRequest.java | 8 +-
.../requests/AbstractRequestResponse.java | 9 +-
.../kafka/common/requests/AbstractResponse.java | 86 ++++++++
.../common/requests/ApiVersionsRequest.java | 2 +-
.../common/requests/ApiVersionsResponse.java | 2 +-
.../requests/ControlledShutdownRequest.java | 2 +-
.../requests/ControlledShutdownResponse.java | 2 +-
.../common/requests/CreateTopicsRequest.java | 2 +-
.../common/requests/CreateTopicsResponse.java | 2 +-
.../common/requests/DeleteTopicsRequest.java | 2 +-
.../common/requests/DeleteTopicsResponse.java | 2 +-
.../common/requests/DescribeGroupsRequest.java | 2 +-
.../common/requests/DescribeGroupsResponse.java | 2 +-
.../kafka/common/requests/FetchRequest.java | 17 +-
.../kafka/common/requests/FetchResponse.java | 163 ++++++++++----
.../requests/GroupCoordinatorRequest.java | 2 +-
.../requests/GroupCoordinatorResponse.java | 2 +-
.../kafka/common/requests/HeartbeatRequest.java | 2 +-
.../common/requests/HeartbeatResponse.java | 2 +-
.../kafka/common/requests/JoinGroupRequest.java | 2 +-
.../common/requests/JoinGroupResponse.java | 2 +-
.../common/requests/LeaderAndIsrRequest.java | 2 +-
.../common/requests/LeaderAndIsrResponse.java | 2 +-
.../common/requests/LeaveGroupRequest.java | 2 +-
.../common/requests/LeaveGroupResponse.java | 2 +-
.../common/requests/ListGroupsRequest.java | 2 +-
.../common/requests/ListGroupsResponse.java | 2 +-
.../common/requests/ListOffsetRequest.java | 2 +-
.../common/requests/ListOffsetResponse.java | 2 +-
.../kafka/common/requests/MetadataRequest.java | 2 +-
.../kafka/common/requests/MetadataResponse.java | 2 +-
.../common/requests/OffsetCommitRequest.java | 8 +-
.../common/requests/OffsetCommitResponse.java | 2 +-
.../common/requests/OffsetFetchRequest.java | 10 +-
.../common/requests/OffsetFetchResponse.java | 2 +-
.../kafka/common/requests/ProduceRequest.java | 31 +--
.../kafka/common/requests/ProduceResponse.java | 8 +-
.../kafka/common/requests/RecordsSend.java | 77 +++++++
.../kafka/common/requests/RequestSend.java | 55 -----
.../kafka/common/requests/ResponseSend.java | 41 ----
.../common/requests/SaslHandshakeRequest.java | 2 +-
.../common/requests/SaslHandshakeResponse.java | 2 +-
.../common/requests/StopReplicaRequest.java | 2 +-
.../common/requests/StopReplicaResponse.java | 2 +-
.../kafka/common/requests/SyncGroupRequest.java | 2 +-
.../common/requests/SyncGroupResponse.java | 2 +-
.../common/requests/UpdateMetadataRequest.java | 2 +-
.../common/requests/UpdateMetadataResponse.java | 2 +-
.../authenticator/SaslClientAuthenticator.java | 57 +++--
.../authenticator/SaslServerAuthenticator.java | 71 +++---
.../kafka/common/utils/AbstractIterator.java | 4 +-
.../org/apache/kafka/clients/MockClient.java | 101 ++++-----
.../apache/kafka/clients/NetworkClientTest.java | 31 ++-
.../clients/consumer/KafkaConsumerTest.java | 64 +++---
.../internals/AbstractCoordinatorTest.java | 64 +++---
.../internals/ConsumerCoordinatorTest.java | 85 ++++---
.../internals/ConsumerNetworkClientTest.java | 10 +-
.../clients/consumer/internals/FetcherTest.java | 40 ++--
.../clients/producer/internals/SenderTest.java | 40 ++--
.../common/requests/RequestResponseTest.java | 174 +++++++++++----
.../authenticator/SaslAuthenticatorTest.java | 54 ++---
.../distributed/WorkerCoordinatorTest.java | 41 ++--
.../main/scala/kafka/admin/AdminClient.scala | 18 +-
.../src/main/scala/kafka/api/FetchRequest.scala | 3 +-
.../main/scala/kafka/api/FetchResponse.scala | 3 +-
.../kafka/api/GenericRequestAndHeader.scala | 5 +-
.../kafka/api/GenericResponseAndHeader.scala | 5 +-
.../main/scala/kafka/api/ProducerRequest.scala | 4 +-
.../scala/kafka/api/RequestOrResponse.scala | 2 +-
.../controller/ControllerChannelManager.scala | 59 +++--
.../kafka/controller/KafkaController.scala | 8 +-
.../kafka/controller/TopicDeletionManager.scala | 7 +-
.../main/scala/kafka/log/FileMessageSet.scala | 35 +--
.../kafka/message/ByteBufferMessageSet.scala | 17 +-
.../main/scala/kafka/message/MessageSet.scala | 12 +-
.../scala/kafka/network/RequestChannel.scala | 45 ++--
.../main/scala/kafka/server/DelayedFetch.scala | 9 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 219 +++++++++----------
.../main/scala/kafka/server/KafkaServer.scala | 58 +++--
.../kafka/server/ReplicaFetcherThread.scala | 24 +-
.../scala/kafka/server/ReplicaManager.scala | 31 +--
.../kafka/server/ReplicationQuotaManager.scala | 2 +-
.../kafka/utils/NetworkClientBlockingOps.scala | 14 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 20 +-
.../kafka/message/BaseMessageSetTestCases.scala | 4 +-
.../unit/kafka/network/SocketServerTest.scala | 7 +-
.../kafka/server/CreateTopicsRequestTest.scala | 2 +-
.../unit/kafka/server/EdgeCaseRequestTest.scala | 7 +-
.../unit/kafka/server/FetchRequestTest.scala | 10 +-
.../unit/kafka/server/LeaderElectionTest.scala | 4 +-
.../unit/kafka/server/ProduceRequestTest.scala | 6 +-
.../kafka/server/ReplicaManagerQuotasTest.scala | 11 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 9 +-
.../unit/kafka/server/SimpleFetchTest.scala | 15 +-
120 files changed, 1870 insertions(+), 1394 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 7716f43..58525ad 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -81,11 +81,13 @@
<subpackage name="protocol">
<allow pkg="org.apache.kafka.common.errors" />
<allow pkg="org.apache.kafka.common.protocol.types" />
+ <allow pkg="org.apache.kafka.common.record" />
</subpackage>
<subpackage name="record">
<allow pkg="net.jpountz" />
<allow pkg="org.apache.kafka.common.record" />
+ <allow pkg="org.apache.kafka.common.network" />
</subpackage>
<subpackage name="requests">
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
index 117b0bf..de6e506 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
@@ -12,56 +12,50 @@
*/
package org.apache.kafka.clients;
-import org.apache.kafka.common.requests.RequestSend;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.RequestHeader;
/**
* A request being sent to the server. This holds both the network send as well as the client-level metadata.
*/
public final class ClientRequest {
+ private final String destination;
+ private final RequestHeader header;
+ private final AbstractRequest body;
private final long createdTimeMs;
private final boolean expectResponse;
- private final RequestSend request;
private final RequestCompletionHandler callback;
- private final boolean isInitiatedByNetworkClient;
- private long sendTimeMs;
/**
+ * @param destination The brokerId to send the request to
* @param createdTimeMs The unix timestamp in milliseconds for the time at which this request was created.
* @param expectResponse Should we expect a response message or is this request complete once it is sent?
- * @param request The request
+ * @param header The request's header
+ * @param body The request's body
* @param callback A callback to execute when the response has been received (or null if no callback is necessary)
*/
- public ClientRequest(long createdTimeMs, boolean expectResponse, RequestSend request,
+ public ClientRequest(String destination,
+ long createdTimeMs,
+ boolean expectResponse,
+ RequestHeader header,
+ AbstractRequest body,
RequestCompletionHandler callback) {
- this(createdTimeMs, expectResponse, request, callback, false);
- }
-
- /**
- * @param createdTimeMs The unix timestamp in milliseconds for the time at which this request was created.
- * @param expectResponse Should we expect a response message or is this request complete once it is sent?
- * @param request The request
- * @param callback A callback to execute when the response has been received (or null if no callback is necessary)
- * @param isInitiatedByNetworkClient Is request initiated by network client, if yes, its
- * response will be consumed by network client
- */
- public ClientRequest(long createdTimeMs, boolean expectResponse, RequestSend request,
- RequestCompletionHandler callback, boolean isInitiatedByNetworkClient) {
+ this.destination = destination;
this.createdTimeMs = createdTimeMs;
this.callback = callback;
- this.request = request;
+ this.header = header;
+ this.body = body;
this.expectResponse = expectResponse;
- this.isInitiatedByNetworkClient = isInitiatedByNetworkClient;
}
@Override
public String toString() {
return "ClientRequest(expectResponse=" + expectResponse +
", callback=" + callback +
- ", request=" + request +
- (isInitiatedByNetworkClient ? ", isInitiatedByNetworkClient" : "") +
+ ", header=" + header +
+ ", body=" + body +
", createdTimeMs=" + createdTimeMs +
- ", sendTimeMs=" + sendTimeMs +
")";
}
@@ -69,12 +63,16 @@ public final class ClientRequest {
return expectResponse;
}
- public RequestSend request() {
- return request;
+ public RequestHeader header() {
+ return header;
}
- public boolean hasCallback() {
- return callback != null;
+ public AbstractRequest body() {
+ return body;
+ }
+
+ public String destination() {
+ return destination;
}
public RequestCompletionHandler callback() {
@@ -85,15 +83,4 @@ public final class ClientRequest {
return createdTimeMs;
}
- public boolean isInitiatedByNetworkClient() {
- return isInitiatedByNetworkClient;
- }
-
- public long sendTimeMs() {
- return sendTimeMs;
- }
-
- public void setSendTimeMs(long sendTimeMs) {
- this.sendTimeMs = sendTimeMs;
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
index 3b6f955..3cd8f1a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
@@ -12,30 +12,45 @@
*/
package org.apache.kafka.clients;
-import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.RequestHeader;
/**
- * A response from the server. Contains both the body of the response as well as the correlated request that was
- * originally sent.
+ * A response from the server. Contains both the body of the response as well as the correlated request
+ * metadata that was originally sent.
*/
public class ClientResponse {
+ private final RequestHeader requestHeader;
+ private final RequestCompletionHandler callback;
+ private final String destination;
private final long receivedTimeMs;
+ private final long latencyMs;
private final boolean disconnected;
- private final ClientRequest request;
- private final Struct responseBody;
+ private final AbstractResponse responseBody;
/**
- * @param request The original request
+ * @param requestHeader The header of the corresponding request
+ * @param callback The callback to be invoked
+ * @param createdTimeMs The unix timestamp when the corresponding request was created
+ * @param destination The node the corresponding request was sent to
* @param receivedTimeMs The unix timestamp when this response was received
* @param disconnected Whether the client disconnected before fully reading a response
* @param responseBody The response contents (or null) if we disconnected or no response was expected
*/
- public ClientResponse(ClientRequest request, long receivedTimeMs, boolean disconnected, Struct responseBody) {
- super();
+ public ClientResponse(RequestHeader requestHeader,
+ RequestCompletionHandler callback,
+ String destination,
+ long createdTimeMs,
+ long receivedTimeMs,
+ boolean disconnected,
+ AbstractResponse responseBody) {
+ this.requestHeader = requestHeader;
+ this.callback = callback;
+ this.destination = destination;
this.receivedTimeMs = receivedTimeMs;
+ this.latencyMs = receivedTimeMs - createdTimeMs;
this.disconnected = disconnected;
- this.request = request;
this.responseBody = responseBody;
}
@@ -47,11 +62,15 @@ public class ClientResponse {
return disconnected;
}
- public ClientRequest request() {
- return request;
+ public RequestHeader requestHeader() {
+ return requestHeader;
}
- public Struct responseBody() {
+ public String destination() {
+ return destination;
+ }
+
+ public AbstractResponse responseBody() {
return responseBody;
}
@@ -60,16 +79,23 @@ public class ClientResponse {
}
public long requestLatencyMs() {
- return receivedTimeMs() - this.request.createdTimeMs();
+ return latencyMs;
+ }
+
+ public void onComplete() {
+ if (callback != null)
+ callback.onComplete(this);
}
@Override
public String toString() {
return "ClientResponse(receivedTimeMs=" + receivedTimeMs +
+ ", latencyMs=" +
+ latencyMs +
", disconnected=" +
disconnected +
- ", request=" +
- request +
+ ", requestHeader=" +
+ requestHeader +
", responseBody=" +
responseBody +
")";
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
index 91b9dba..f4f753e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
+++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
@@ -26,7 +26,7 @@ import java.util.Map;
final class InFlightRequests {
private final int maxInFlightRequestsPerConnection;
- private final Map<String, Deque<ClientRequest>> requests = new HashMap<>();
+ private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>();
public InFlightRequests(int maxInFlightRequestsPerConnection) {
this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;
@@ -35,11 +35,12 @@ final class InFlightRequests {
/**
* Add the given request to the queue for the connection it was directed to
*/
- public void add(ClientRequest request) {
- Deque<ClientRequest> reqs = this.requests.get(request.request().destination());
+ public void add(NetworkClient.InFlightRequest request) {
+ String destination = request.destination;
+ Deque<NetworkClient.InFlightRequest> reqs = this.requests.get(destination);
if (reqs == null) {
reqs = new ArrayDeque<>();
- this.requests.put(request.request().destination(), reqs);
+ this.requests.put(destination, reqs);
}
reqs.addFirst(request);
}
@@ -47,8 +48,8 @@ final class InFlightRequests {
/**
* Get the request queue for the given node
*/
- private Deque<ClientRequest> requestQueue(String node) {
- Deque<ClientRequest> reqs = requests.get(node);
+ private Deque<NetworkClient.InFlightRequest> requestQueue(String node) {
+ Deque<NetworkClient.InFlightRequest> reqs = requests.get(node);
if (reqs == null || reqs.isEmpty())
throw new IllegalStateException("Response from server for which there are no in-flight requests.");
return reqs;
@@ -57,7 +58,7 @@ final class InFlightRequests {
/**
* Get the oldest request (the one that that will be completed next) for the given node
*/
- public ClientRequest completeNext(String node) {
+ public NetworkClient.InFlightRequest completeNext(String node) {
return requestQueue(node).pollLast();
}
@@ -65,7 +66,7 @@ final class InFlightRequests {
* Get the last request we sent to the given node (but don't remove it from the queue)
* @param node The node id
*/
- public ClientRequest lastSent(String node) {
+ public NetworkClient.InFlightRequest lastSent(String node) {
return requestQueue(node).peekFirst();
}
@@ -74,20 +75,20 @@ final class InFlightRequests {
* @param node The node the request was sent to
* @return The request
*/
- public ClientRequest completeLastSent(String node) {
+ public NetworkClient.InFlightRequest completeLastSent(String node) {
return requestQueue(node).pollFirst();
}
/**
* Can we send more requests to this node?
- *
+ *
* @param node Node in question
* @return true iff we have no requests still being sent to the given node
*/
public boolean canSendMore(String node) {
- Deque<ClientRequest> queue = requests.get(node);
+ Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
return queue == null || queue.isEmpty() ||
- (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
+ (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}
/**
@@ -96,7 +97,7 @@ final class InFlightRequests {
* @return The request count.
*/
public int inFlightRequestCount(String node) {
- Deque<ClientRequest> queue = requests.get(node);
+ Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
return queue == null ? 0 : queue.size();
}
@@ -105,19 +106,19 @@ final class InFlightRequests {
*/
public int inFlightRequestCount() {
int total = 0;
- for (Deque<ClientRequest> deque : this.requests.values())
+ for (Deque<NetworkClient.InFlightRequest> deque : this.requests.values())
total += deque.size();
return total;
}
/**
* Clear out all the in-flight requests for the given node and return them
- *
+ *
* @param node The node
* @return All the in-flight requests for that node that have been removed
*/
- public Iterable<ClientRequest> clearAll(String node) {
- Deque<ClientRequest> reqs = requests.get(node);
+ public Iterable<NetworkClient.InFlightRequest> clearAll(String node) {
+ Deque<NetworkClient.InFlightRequest> reqs = requests.get(node);
if (reqs == null) {
return Collections.emptyList();
} else {
@@ -126,7 +127,7 @@ final class InFlightRequests {
}
/**
- * Returns a list of nodes with pending inflight request, that need to be timed out
+ * Returns a list of nodes with pending in-flight request, that need to be timed out
*
* @param now current time in milliseconds
* @param requestTimeout max time to wait for the request to be completed
@@ -134,13 +135,13 @@ final class InFlightRequests {
*/
public List<String> getNodesWithTimedOutRequests(long now, int requestTimeout) {
List<String> nodeIds = new LinkedList<>();
- for (Map.Entry<String, Deque<ClientRequest>> requestEntry : requests.entrySet()) {
+ for (Map.Entry<String, Deque<NetworkClient.InFlightRequest>> requestEntry : requests.entrySet()) {
String nodeId = requestEntry.getKey();
- Deque<ClientRequest> deque = requestEntry.getValue();
+ Deque<NetworkClient.InFlightRequest> deque = requestEntry.getValue();
if (!deque.isEmpty()) {
- ClientRequest request = deque.peekLast();
- long timeSinceSend = now - request.sendTimeMs();
+ NetworkClient.InFlightRequest request = deque.peekLast();
+ long timeSinceSend = now - request.sendTimeMs;
if (timeSinceSend > requestTimeout)
nodeIds.add(nodeId);
}
@@ -148,4 +149,5 @@ final class InFlightRequests {
return nodeIds;
}
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 8c6e39a..f171d13 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -66,7 +66,6 @@ public interface KafkaClient extends Closeable {
/**
* Queue up the given request for sending. Requests can only be sent on ready connections.
- *
* @param request The request
* @param now The current timestamp
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
index efbe664..1c9fa79 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
@@ -14,7 +14,8 @@
package org.apache.kafka.clients;
import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.RequestHeader;
import java.util.ArrayList;
import java.util.List;
@@ -60,13 +61,13 @@ public class ManualMetadataUpdater implements MetadataUpdater {
}
@Override
- public boolean maybeHandleDisconnection(ClientRequest request) {
- return false;
+ public void handleDisconnection(String destination) {
+ // Do nothing
}
@Override
- public boolean maybeHandleCompletedReceive(ClientRequest request, long now, Struct body) {
- return false;
+ public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, AbstractResponse body) {
+ // Do nothing
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
index 4669a68..21c50bd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -14,7 +14,8 @@
package org.apache.kafka.clients;
import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.RequestHeader;
import java.util.List;
@@ -41,7 +42,7 @@ interface MetadataUpdater {
* be 0 if an update has been started as a result of this call).
*
* If the implementation relies on `NetworkClient` to send requests, the completed receive will be passed to
- * `maybeHandleCompletedReceive`.
+ * `handleCompletedMetadataResponse`.
*
* The semantics of `needed` and `possible` are implementation-dependent and may take into account a number of
* factors like node availability, how long since the last metadata update, etc.
@@ -53,8 +54,9 @@ interface MetadataUpdater {
*
* This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own
* requests with special handling for disconnections of such requests.
+ * @param destination
*/
- boolean maybeHandleDisconnection(ClientRequest request);
+ void handleDisconnection(String destination);
/**
* If `request` is a metadata request, handles it and returns `true`. Otherwise, returns `false`.
@@ -62,7 +64,7 @@ interface MetadataUpdater {
* This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own
* requests with special handling for completed receives of such requests.
*/
- boolean maybeHandleCompletedReceive(ClientRequest request, long now, Struct body);
+ void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, AbstractResponse body);
/**
* Schedules an update of the current cluster metadata info. A subsequent call to `maybeUpdate` would trigger the
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 29c6d6f..124810d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -21,10 +21,10 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
-import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -173,8 +173,9 @@ public class NetworkClient implements KafkaClient {
@Override
public void close(String nodeId) {
selector.close(nodeId);
- for (ClientRequest request : inFlightRequests.clearAll(nodeId))
- metadataUpdater.maybeHandleDisconnection(request);
+ for (InFlightRequest request : inFlightRequests.clearAll(nodeId))
+ if (request.isInternalMetadataRequest)
+ metadataUpdater.handleDisconnection(request.destination);
connectionStates.remove(nodeId);
}
@@ -230,22 +231,38 @@ public class NetworkClient implements KafkaClient {
/**
* Queue up the given request for sending. Requests can only be sent out to ready nodes.
- *
* @param request The request
* @param now The current timestamp
*/
@Override
public void send(ClientRequest request, long now) {
- String nodeId = request.request().destination();
+ doSend(request, false, now);
+ }
+
+ private void sendInternalMetadataRequest(MetadataRequest metadataRequest, String nodeConnectionId, long now) {
+ ClientRequest clientRequest = new ClientRequest(nodeConnectionId, now, true,
+ nextRequestHeader(ApiKeys.METADATA), metadataRequest, null);
+ doSend(clientRequest, true, now);
+ }
+
+ private void doSend(ClientRequest request, boolean isInternalMetadataRequest, long now) {
+ String nodeId = request.destination();
if (!canSendRequest(nodeId))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
- doSend(request, now);
- }
- private void doSend(ClientRequest request, long now) {
- request.setSendTimeMs(now);
- this.inFlightRequests.add(request);
- selector.send(request.request());
+ Send send = request.body().toSend(nodeId, request.header());
+ InFlightRequest inFlightRequest = new InFlightRequest(
+ request.header(),
+ request.createdTimeMs(),
+ request.destination(),
+ request.callback(),
+ request.expectResponse(),
+ isInternalMetadataRequest,
+ send,
+ now);
+
+ this.inFlightRequests.add(inFlightRequest);
+ selector.send(inFlightRequest.send);
}
/**
@@ -277,12 +294,10 @@ public class NetworkClient implements KafkaClient {
// invoke callbacks
for (ClientResponse response : responses) {
- if (response.request().hasCallback()) {
- try {
- response.request().callback().onComplete(response);
- } catch (Exception e) {
- log.error("Uncaught error in request completion:", e);
- }
+ try {
+ response.onComplete();
+ } catch (Exception e) {
+ log.error("Uncaught error in request completion:", e);
}
}
@@ -376,14 +391,14 @@ public class NetworkClient implements KafkaClient {
return found;
}
- public static Struct parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) {
+ public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) {
ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer);
// Always expect the response version id to be the same as the request version id
short apiKey = requestHeader.apiKey();
short apiVer = requestHeader.apiVersion();
Struct responseBody = ProtoUtils.responseSchema(apiKey, apiVer).read(responseBuffer);
correlate(requestHeader, responseHeader);
- return responseBody;
+ return AbstractResponse.getResponse(apiKey, responseBody);
}
/**
@@ -395,10 +410,12 @@ public class NetworkClient implements KafkaClient {
*/
private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {
connectionStates.disconnected(nodeId, now);
- for (ClientRequest request : this.inFlightRequests.clearAll(nodeId)) {
+ for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {
log.trace("Cancelled request {} due to node {} being disconnected", request, nodeId);
- if (!metadataUpdater.maybeHandleDisconnection(request))
- responses.add(new ClientResponse(request, now, true, null));
+ if (request.isInternalMetadataRequest)
+ metadataUpdater.handleDisconnection(request.destination);
+ else
+ responses.add(request.disconnected(now));
}
}
@@ -432,10 +449,10 @@ public class NetworkClient implements KafkaClient {
private void handleCompletedSends(List<ClientResponse> responses, long now) {
// if no response is expected then when the send is completed, return it
for (Send send : this.selector.completedSends()) {
- ClientRequest request = this.inFlightRequests.lastSent(send.destination());
- if (!request.expectResponse()) {
+ InFlightRequest request = this.inFlightRequests.lastSent(send.destination());
+ if (!request.expectResponse) {
this.inFlightRequests.completeLastSent(send.destination());
- responses.add(new ClientResponse(request, now, false, null));
+ responses.add(request.completed(null, now));
}
}
}
@@ -449,10 +466,12 @@ public class NetworkClient implements KafkaClient {
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
- ClientRequest req = inFlightRequests.completeNext(source);
- Struct body = parseResponse(receive.payload(), req.request().header());
- if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
- responses.add(new ClientResponse(req, now, false, body));
+ InFlightRequest req = inFlightRequests.completeNext(source);
+ AbstractResponse body = parseResponse(receive.payload(), req.header);
+ if (req.isInternalMetadataRequest)
+ metadataUpdater.handleCompletedMetadataResponse(req.header, now, body);
+ else
+ responses.add(req.completed(body, now));
}
}
@@ -558,33 +577,23 @@ public class NetworkClient implements KafkaClient {
}
@Override
- public boolean maybeHandleDisconnection(ClientRequest request) {
- ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey());
-
- if (requestKey == ApiKeys.METADATA && request.isInitiatedByNetworkClient()) {
- Cluster cluster = metadata.fetch();
- if (cluster.isBootstrapConfigured()) {
- int nodeId = Integer.parseInt(request.request().destination());
- Node node = cluster.nodeById(nodeId);
- if (node != null)
- log.warn("Bootstrap broker {}:{} disconnected", node.host(), node.port());
- }
-
- metadataFetchInProgress = false;
- return true;
+ public void handleDisconnection(String destination) {
+ Cluster cluster = metadata.fetch();
+ if (cluster.isBootstrapConfigured()) {
+ int nodeId = Integer.parseInt(destination);
+ Node node = cluster.nodeById(nodeId);
+ if (node != null)
+ log.warn("Bootstrap broker {}:{} disconnected", node.host(), node.port());
}
- return false;
+ metadataFetchInProgress = false;
}
@Override
- public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
- short apiKey = req.request().header().apiKey();
- if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
- handleResponse(req.request().header(), body, now);
- return true;
- }
- return false;
+ public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, AbstractResponse response) {
+ if (!(response instanceof MetadataResponse))
+ throw new IllegalStateException("Unexpected response type in metadata handler: " + response);
+ handleMetadataResponse(requestHeader, (MetadataResponse) response, now);
}
@Override
@@ -592,9 +601,8 @@ public class NetworkClient implements KafkaClient {
this.metadata.requestUpdate();
}
- private void handleResponse(RequestHeader header, Struct body, long now) {
+ private void handleMetadataResponse(RequestHeader header, MetadataResponse response, long now) {
this.metadataFetchInProgress = false;
- MetadataResponse response = new MetadataResponse(body);
Cluster cluster = response.cluster();
// check if any topics metadata failed to get updated
Map<String, Errors> errors = response.errors();
@@ -612,14 +620,6 @@ public class NetworkClient implements KafkaClient {
}
/**
- * Create a metadata request for the given topics
- */
- private ClientRequest request(long now, String node, MetadataRequest metadata) {
- RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
- return new ClientRequest(now, true, send, null, true);
- }
-
- /**
* Return true if there's at least one connection establishment is currently underway
*/
private boolean isAnyNodeConnecting() {
@@ -644,9 +644,9 @@ public class NetworkClient implements KafkaClient {
metadataRequest = MetadataRequest.allTopics();
else
metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
- ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
+
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
- doSend(clientRequest, now);
+ sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
return requestTimeoutMs;
}
@@ -674,4 +674,42 @@ public class NetworkClient implements KafkaClient {
}
+ static class InFlightRequest {
+ final RequestHeader header;
+ final String destination;
+ final RequestCompletionHandler callback;
+ final boolean expectResponse;
+ final boolean isInternalMetadataRequest; // used to flag metadata fetches which are triggered internally by NetworkClient
+ final Send send;
+ final long sendTimeMs;
+ final long createdTimeMs;
+
+ public InFlightRequest(RequestHeader header,
+ long createdTimeMs,
+ String destination,
+ RequestCompletionHandler callback,
+ boolean expectResponse,
+ boolean isInternalMetadataRequest,
+ Send send,
+ long sendTimeMs) {
+ this.header = header;
+ this.destination = destination;
+ this.callback = callback;
+ this.expectResponse = expectResponse;
+ this.isInternalMetadataRequest = isInternalMetadataRequest;
+ this.send = send;
+ this.sendTimeMs = sendTimeMs;
+ this.createdTimeMs = createdTimeMs;
+ }
+
+ public ClientResponse completed(AbstractResponse response, long timeMs) {
+ return new ClientResponse(header, callback, destination, createdTimeMs, timeMs, false, response);
+ }
+
+ public ClientResponse disconnected(long timeMs) {
+ return new ClientResponse(header, callback, destination, createdTimeMs, timeMs, true, null);
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 59319ef..c205273 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -393,17 +393,11 @@ public abstract class AbstractCoordinator implements Closeable {
}
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
-
- @Override
- public JoinGroupResponse parse(ClientResponse response) {
- return new JoinGroupResponse(response.responseBody());
- }
-
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = Errors.forCode(joinResponse.errorCode());
if (error == Errors.NONE) {
- log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct());
+ log.debug("Received successful join group response for group {}: {}", groupId, joinResponse);
sensors.joinLatency.record(response.requestLatencyMs());
synchronized (AbstractCoordinator.this) {
@@ -483,12 +477,6 @@ public abstract class AbstractCoordinator implements Closeable {
}
private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
-
- @Override
- public SyncGroupResponse parse(ClientResponse response) {
- return new SyncGroupResponse(response.responseBody());
- }
-
@Override
public void handle(SyncGroupResponse syncResponse,
RequestFuture<ByteBuffer> future) {
@@ -540,7 +528,7 @@ public abstract class AbstractCoordinator implements Closeable {
public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
log.debug("Received group coordinator response {}", resp);
- GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody());
+ GroupCoordinatorResponse groupCoordinatorResponse = (GroupCoordinatorResponse) resp.responseBody();
// use MAX_VALUE - node.id as the coordinator id to mimic separate connections
// for the coordinator in the underlying network client layer
// TODO: this needs to be better handled in KAFKA-1935
@@ -653,12 +641,6 @@ public abstract class AbstractCoordinator implements Closeable {
}
private class LeaveGroupResponseHandler extends CoordinatorResponseHandler<LeaveGroupResponse, Void> {
-
- @Override
- public LeaveGroupResponse parse(ClientResponse response) {
- return new LeaveGroupResponse(response.responseBody());
- }
-
@Override
public void handle(LeaveGroupResponse leaveResponse, RequestFuture<Void> future) {
Errors error = Errors.forCode(leaveResponse.errorCode());
@@ -680,12 +662,6 @@ public abstract class AbstractCoordinator implements Closeable {
}
private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
-
- @Override
- public HeartbeatResponse parse(ClientResponse response) {
- return new HeartbeatResponse(response.responseBody());
- }
-
@Override
public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
sensors.heartbeatLatency.record(response.requestLatencyMs());
@@ -722,8 +698,6 @@ public abstract class AbstractCoordinator implements Closeable {
protected abstract class CoordinatorResponseHandler<R, T> extends RequestFutureAdapter<ClientResponse, T> {
protected ClientResponse response;
- public abstract R parse(ClientResponse response);
-
public abstract void handle(R response, RequestFuture<T> future);
@Override
@@ -735,10 +709,11 @@ public abstract class AbstractCoordinator implements Closeable {
}
@Override
+ @SuppressWarnings("unchecked")
public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
try {
this.response = clientResponse;
- R responseObj = parse(clientResponse);
+ R responseObj = (R) clientResponse.responseBody();
handle(responseObj, future);
} catch (RuntimeException e) {
if (!future.isDone())
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 2621c09..4889872 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -12,7 +12,6 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -624,11 +623,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
@Override
- public OffsetCommitResponse parse(ClientResponse response) {
- return new OffsetCommitResponse(response.responseBody());
- }
-
- @Override
public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
sensors.commitLatency.record(response.requestLatencyMs());
Set<String> unauthorizedTopics = new HashSet<>();
@@ -718,12 +712,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
private class OffsetFetchResponseHandler extends CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, OffsetAndMetadata>> {
-
- @Override
- public OffsetFetchResponse parse(ClientResponse response) {
- return new OffsetFetchResponse(response.responseBody());
- }
-
@Override
public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size());
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 2495b23..d9baa56 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.RequestHeader;
-import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,14 +97,14 @@ public class ConsumerNetworkClient implements Closeable {
}
private RequestFuture<ClientResponse> send(Node node,
- ApiKeys api,
- short version,
- AbstractRequest request) {
+ ApiKeys api,
+ short version,
+ AbstractRequest request) {
long now = time.milliseconds();
RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
RequestHeader header = client.nextRequestHeader(api, version);
- RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
- put(node, new ClientRequest(now, true, send, completionHandler));
+ ClientRequest clientRequest = new ClientRequest(node.idString(), now, true, header, request, completionHandler);
+ put(node, clientRequest);
// wakeup the client in case it is blocking in poll so that we can send the queued request
client.wakeup();
@@ -336,9 +335,9 @@ public class ConsumerNetworkClient implements Closeable {
// coordinator failures traversing the unsent list again.
iterator.remove();
for (ClientRequest request : requestEntry.getValue()) {
- RequestFutureCompletionHandler handler =
- (RequestFutureCompletionHandler) request.callback();
- handler.onComplete(new ClientResponse(request, now, true, null));
+ RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
+ handler.onComplete(new ClientResponse(request.header(), request.callback(), request.destination(),
+ request.createdTimeMs(), now, true, null));
}
}
}
@@ -369,8 +368,8 @@ public class ConsumerNetworkClient implements Closeable {
synchronized (this) {
List<ClientRequest> unsentRequests = unsent.remove(node);
if (unsentRequests != null) {
- for (ClientRequest request : unsentRequests) {
- RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
+ for (ClientRequest unsentRequest : unsentRequests) {
+ RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) unsentRequest.callback();
handler.onFailure(e);
}
}
@@ -468,12 +467,11 @@ public class ConsumerNetworkClient implements Closeable {
if (e != null) {
future.raise(e);
} else if (response.wasDisconnected()) {
- ClientRequest request = response.request();
- RequestSend send = request.request();
- ApiKeys api = ApiKeys.forId(send.header().apiKey());
- int correlation = send.header().correlationId();
+ RequestHeader requestHeader = response.requestHeader();
+ ApiKeys api = ApiKeys.forId(requestHeader.apiKey());
+ int correlation = requestHeader.correlationId();
log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
- api, request, correlation, send.destination());
+ api, requestHeader, correlation, response.destination());
future.raise(DisconnectException.INSTANCE);
} else {
future.complete(response);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index bfc1a0b..703ea29 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -41,7 +41,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.LogEntry;
-import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.FetchRequest;
@@ -155,7 +154,7 @@ public class Fetcher<K, V> {
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
- FetchResponse response = new FetchResponse(resp.responseBody());
+ FetchResponse response = (FetchResponse) resp.responseBody();
if (!matchesRequestedPartitions(request, response)) {
// obviously we expect the broker to always send us valid responses, so this check
// is mainly for test cases where mock fetch responses must be manually crafted.
@@ -256,7 +255,7 @@ public class Fetcher<K, V> {
throw future.exception();
if (future.succeeded()) {
- MetadataResponse response = new MetadataResponse(future.value().responseBody());
+ MetadataResponse response = (MetadataResponse) future.value().responseBody();
Cluster cluster = response.cluster();
Set<String> unauthorizedTopics = cluster.unauthorizedTopics();
@@ -549,7 +548,7 @@ public class Fetcher<K, V> {
.compose(new RequestFutureAdapter<ClientResponse, Map<TopicPartition, OffsetAndTimestamp>>() {
@Override
public void onSuccess(ClientResponse response, RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future) {
- ListOffsetResponse lor = new ListOffsetResponse(response.responseBody());
+ ListOffsetResponse lor = (ListOffsetResponse) response.responseBody();
log.trace("Received ListOffsetResponse {} from broker {}", lor, node);
handleListOffsetResponse(timestampsToSearch, lor, future);
}
@@ -673,10 +672,8 @@ public class Fetcher<K, V> {
return null;
}
- ByteBuffer buffer = partition.recordSet;
- MemoryRecords records = MemoryRecords.readableRecords(buffer);
List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
- for (LogEntry logEntry : records) {
+ for (LogEntry logEntry : partition.records) {
// Skip the messages earlier than current position.
if (logEntry.offset() >= position) {
parsed.add(parseRecord(tp, logEntry));
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index c71bb67..7555b71 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -12,20 +12,13 @@
*/
package org.apache.kafka.clients.producer.internals;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidMetadataException;
@@ -34,7 +27,6 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@@ -42,15 +34,22 @@ import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
-import org.apache.kafka.common.requests.RequestSend;
+import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
/**
* The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
* requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.
@@ -210,19 +209,17 @@ public class Sender implements Runnable {
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
sensors.updateProduceRequestMetrics(batches);
- List<ClientRequest> requests = createProduceRequests(batches, now);
+
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
// with sendable data that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
- if (result.readyNodes.size() > 0) {
+ if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
- log.trace("Created {} produce requests: {}", requests.size(), requests);
pollTimeout = 0;
}
- for (ClientRequest request : requests)
- client.send(request, now);
+ sendProduceRequests(batches, now);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
@@ -254,20 +251,16 @@ public class Sender implements Runnable {
* Handle a produce response
*/
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {
- int correlationId = response.request().request().header().correlationId();
+ int correlationId = response.requestHeader().correlationId();
if (response.wasDisconnected()) {
- log.trace("Cancelled request {} due to node {} being disconnected", response, response.request()
- .request()
- .destination());
+ log.trace("Cancelled request {} due to node {} being disconnected", response, response.destination());
for (RecordBatch batch : batches.values())
completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, Record.NO_TIMESTAMP, correlationId, now);
} else {
- log.trace("Received produce response from node {} with correlation id {}",
- response.request().request().destination(),
- correlationId);
+ log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId);
// if we have a response, parse it
if (response.hasResponse()) {
- ProduceResponse produceResponse = new ProduceResponse(response.responseBody());
+ ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
TopicPartition tp = entry.getKey();
ProduceResponse.PartitionResponse partResp = entry.getValue();
@@ -275,7 +268,7 @@ public class Sender implements Runnable {
RecordBatch batch = batches.get(tp);
completeBatch(batch, error, partResp.baseOffset, partResp.timestamp, correlationId, now);
}
- this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs());
+ this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
this.sensors.recordThrottleTime(produceResponse.getThrottleTime());
} else {
// this is the acks = 0 case, just complete all requests
@@ -339,35 +332,36 @@ public class Sender implements Runnable {
/**
* Transfer the record batches into a list of produce requests on a per-node basis
*/
- private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
- List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size());
+ private void sendProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
- requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
- return requests;
+ sendProduceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue());
}
/**
* Create a produce request from the given record batches
*/
- private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
- Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size());
- final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
+ private void sendProduceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
+ Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
+ final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<>(batches.size());
for (RecordBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
- produceRecordsByPartition.put(tp, batch.records.buffer());
+ produceRecordsByPartition.put(tp, batch.records);
recordsByPartition.put(tp, batch);
}
- ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
- RequestSend send = new RequestSend(Integer.toString(destination),
- this.client.nextRequestHeader(ApiKeys.PRODUCE),
- request.toStruct());
+
+ ProduceRequest produceRequest = new ProduceRequest(acks, timeout, produceRecordsByPartition);
+ RequestHeader header = this.client.nextRequestHeader(ApiKeys.PRODUCE);
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
- return new ClientRequest(now, acks != 0, send, callback);
+ String nodeId = Integer.toString(destination);
+ ClientRequest clientRequest = new ClientRequest(nodeId, now, acks != 0, header, produceRequest, callback);
+
+ client.send(clientRequest, now);
+ log.trace("Sent produce request to {}: {}", nodeId, produceRequest);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
index 9e213ec..3683283 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
@@ -29,11 +29,10 @@ public class ByteBufferSend implements Send {
private boolean pending = false;
public ByteBufferSend(String destination, ByteBuffer... buffers) {
- super();
this.destination = destination;
this.buffers = buffers;
- for (int i = 0; i < buffers.length; i++)
- remaining += buffers[i].remaining();
+ for (ByteBuffer buffer : buffers)
+ remaining += buffer.remaining();
this.size = remaining;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
index 49964b0..5e4bf2c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
@@ -29,8 +29,8 @@ public class NetworkSend extends ByteBufferSend {
private static ByteBuffer[] sizeDelimit(ByteBuffer[] buffers) {
int size = 0;
- for (int i = 0; i < buffers.length; i++)
- size += buffers[i].remaining();
+ for (ByteBuffer buffer : buffers)
+ size += buffer.remaining();
ByteBuffer[] delimited = new ByteBuffer[buffers.length + 1];
delimited[0] = ByteBuffer.allocate(4);
delimited[0].putInt(size);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 6178b80..e07c3c3 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -63,7 +63,7 @@ public enum ApiKeys {
/** an english description of the api--this is for debugging and can change */
public final String name;
- private ApiKeys(int id, String name) {
+ ApiKeys(int id, String name) {
this.id = (short) id;
this.name = name;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 9e21f3b..cd4e6e3 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -31,6 +31,7 @@ import static org.apache.kafka.common.protocol.types.Type.BYTES;
import static org.apache.kafka.common.protocol.types.Type.INT16;
import static org.apache.kafka.common.protocol.types.Type.INT32;
import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.RECORDS;
import static org.apache.kafka.common.protocol.types.Type.STRING;
import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
@@ -135,7 +136,7 @@ public class Protocol {
public static final Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING),
new Field("data", new ArrayOf(new Schema(new Field("partition", INT32),
- new Field("record_set", BYTES)))));
+ new Field("record_set", RECORDS)))));
public static final Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
INT16,
@@ -500,14 +501,15 @@ public class Protocol {
new ArrayOf(FETCH_REQUEST_TOPIC_V0),
"Topics to fetch in the order provided."));
- public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("error_code", INT16),
- new Field("high_watermark",
- INT64,
- "Last committed offset."),
- new Field("record_set", BYTES));
+ public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("error_code", INT16),
+ new Field("high_watermark",
+ INT64,
+ "Last committed offset."));
+ public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition_header", FETCH_RESPONSE_PARTITION_HEADER_V0),
+ new Field("record_set", RECORDS));
public static final Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
new Field("partition_responses",
@@ -515,6 +517,7 @@ public class Protocol {
public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
+
public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("throttle_time_ms",
INT32,
"Duration in milliseconds for which the request was throttled" +
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
index 207f108..1ba8e44 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
@@ -54,8 +54,9 @@ public class ArrayOf extends Type {
Object[] objs = (Object[]) o;
int size = objs.length;
buffer.putInt(size);
- for (int i = 0; i < size; i++)
- type.write(buffer, objs[i]);
+
+ for (Object obj : objs)
+ type.write(buffer, obj);
}
@Override
@@ -81,8 +82,8 @@ public class ArrayOf extends Type {
return size;
Object[] objs = (Object[]) o;
- for (int i = 0; i < objs.length; i++)
- size += type.sizeOf(objs[i]);
+ for (Object obj : objs)
+ size += type.sizeOf(obj);
return size;
}
@@ -102,8 +103,8 @@ public class ArrayOf extends Type {
return null;
Object[] array = (Object[]) item;
- for (int i = 0; i < array.length; i++)
- type.validate(array[i]);
+ for (Object obj : array)
+ type.validate(obj);
return array;
} catch (ClassCastException e) {
throw new SchemaException("Not an Object[].");
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index e8dce31..efbfd14 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -31,7 +31,7 @@ public class Schema extends Type {
*/
public Schema(Field... fs) {
this.fields = new Field[fs.length];
- this.fieldsByName = new HashMap<String, Field>();
+ this.fieldsByName = new HashMap<>();
for (int i = 0; i < this.fields.length; i++) {
Field field = fs[i];
if (fieldsByName.containsKey(field.name))
@@ -47,13 +47,12 @@ public class Schema extends Type {
@Override
public void write(ByteBuffer buffer, Object o) {
Struct r = (Struct) o;
- for (int i = 0; i < fields.length; i++) {
- Field f = fields[i];
+ for (Field field : fields) {
try {
- Object value = f.type().validate(r.get(f));
- f.type.write(buffer, value);
+ Object value = field.type().validate(r.get(field));
+ field.type.write(buffer, value);
} catch (Exception e) {
- throw new SchemaException("Error writing field '" + f.name +
+ throw new SchemaException("Error writing field '" + field.name +
"': " +
(e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
}
@@ -85,8 +84,8 @@ public class Schema extends Type {
public int sizeOf(Object o) {
int size = 0;
Struct r = (Struct) o;
- for (int i = 0; i < fields.length; i++)
- size += fields[i].type.sizeOf(r.get(fields[i]));
+ for (Field field : fields)
+ size += field.type.sizeOf(r.get(field));
return size;
}
@@ -146,8 +145,7 @@ public class Schema extends Type {
public Struct validate(Object item) {
try {
Struct struct = (Struct) item;
- for (int i = 0; i < this.fields.length; i++) {
- Field field = this.fields[i];
+ for (Field field : fields) {
try {
field.type.validate(struct.get(field));
} catch (SchemaException e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 212d701..0165ce6 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -12,6 +12,8 @@
*/
package org.apache.kafka.common.protocol.types;
+import org.apache.kafka.common.record.Records;
+
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -108,6 +110,10 @@ public class Struct {
return (Byte) get(name);
}
+ public Records getRecords(String name) {
+ return (Records) get(name);
+ }
+
public Short getShort(Field field) {
return (Short) get(field);
}
@@ -272,16 +278,6 @@ public class Struct {
this.schema.validate(this);
}
- /**
- * Create a byte buffer containing the serialized form of the values in this struct. This method can choose to break
- * the struct into multiple ByteBuffers if need be.
- */
- public ByteBuffer[] toBytes() {
- ByteBuffer buffer = ByteBuffer.allocate(sizeOf());
- writeTo(buffer);
- return new ByteBuffer[] {buffer};
- }
-
@Override
public String toString() {
StringBuilder b = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index 43b4a37..e2f7baf 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -18,6 +18,9 @@ package org.apache.kafka.common.protocol.types;
import java.nio.ByteBuffer;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.Utils;
/**
@@ -424,5 +427,50 @@ public abstract class Type {
}
};
+ public static final Type RECORDS = new Type() {
+ @Override
+ public boolean isNullable() {
+ return true;
+ }
+
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ if (o instanceof FileRecords)
+ throw new IllegalArgumentException("FileRecords must be written to the channel directly");
+ MemoryRecords records = (MemoryRecords) o;
+ NULLABLE_BYTES.write(buffer, records.buffer().duplicate());
+ }
+
+ @Override
+ public Object read(ByteBuffer buffer) {
+ ByteBuffer recordsBuffer = (ByteBuffer) NULLABLE_BYTES.read(buffer);
+ return MemoryRecords.readableRecords(recordsBuffer);
+ }
+
+ @Override
+ public int sizeOf(Object o) {
+ if (o == null)
+ return 4;
+
+ Records records = (Records) o;
+ return 4 + records.sizeInBytes();
+ }
+
+ @Override
+ public String toString() {
+ return "RECORDS";
+ }
+
+ @Override
+ public Records validate(Object item) {
+ if (item == null)
+ return null;
+
+ if (item instanceof Records)
+ return (Records) item;
+
+ throw new SchemaException(item + " is not an instance of " + Records.class.getName());
+ }
+ };
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
new file mode 100644
index 0000000..bdae08d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.network.TransportLayer;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.GatheringByteChannel;
+
+/**
+ * File-backed record set.
+ */
+public class FileRecords implements Records {
+ private final File file;
+ private final FileChannel channel;
+ private final long start;
+ private final long end;
+ private final long size;
+
+ public FileRecords(File file,
+ FileChannel channel,
+ int start,
+ int end,
+ boolean isSlice) throws IOException {
+ this.file = file;
+ this.channel = channel;
+ this.start = start;
+ this.end = end;
+
+ if (isSlice)
+ this.size = end - start;
+ else
+ this.size = Math.min(channel.size(), end) - start;
+ }
+
+ @Override
+ public int sizeInBytes() {
+ return (int) size;
+ }
+
+ @Override
+ public long writeTo(GatheringByteChannel destChannel, long offset, int length) throws IOException {
+ long newSize = Math.min(channel.size(), end) - start;
+ if (newSize < size)
+ throw new KafkaException(String.format("Size of FileRecords %s has been truncated during write: old size %d, new size %d", file.getAbsolutePath(), size, newSize));
+
+ if (offset > size)
+ throw new KafkaException(String.format("The requested offset %d is out of range. The size of this FileRecords is %d.", offset, size));
+
+ long position = start + offset;
+ long count = Math.min(length, this.size - offset);
+ if (destChannel instanceof TransportLayer) {
+ TransportLayer tl = (TransportLayer) destChannel;
+ return tl.transferFrom(this.channel, position, count);
+ } else {
+ return this.channel.transferTo(position, count, destChannel);
+ }
+ }
+
+ @Override
+ public RecordsIterator iterator() {
+ return new RecordsIterator(new FileLogInputStream(channel, start, end), false);
+ }
+
+ private static class FileLogInputStream implements LogInputStream {
+ private long position;
+ protected final long end;
+ protected final FileChannel channel;
+ private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(Records.LOG_OVERHEAD);
+
+ public FileLogInputStream(FileChannel channel, long start, long end) {
+ this.channel = channel;
+ this.position = start;
+ this.end = end;
+ }
+
+ @Override
+ public LogEntry nextEntry() throws IOException {
+ if (position + Records.LOG_OVERHEAD >= end)
+ return null;
+
+ logHeaderBuffer.rewind();
+ channel.read(logHeaderBuffer, position);
+ if (logHeaderBuffer.hasRemaining())
+ return null;
+
+ logHeaderBuffer.rewind();
+ long offset = logHeaderBuffer.getLong();
+ int size = logHeaderBuffer.getInt();
+ if (size < 0)
+ throw new IllegalStateException("Record with size " + size);
+
+ if (position + Records.LOG_OVERHEAD + size > end)
+ return null;
+
+ ByteBuffer recordBuffer = ByteBuffer.allocate(size);
+ channel.read(recordBuffer, position + Records.LOG_OVERHEAD);
+ if (recordBuffer.hasRemaining())
+ return null;
+ recordBuffer.rewind();
+
+ Record record = new Record(recordBuffer);
+ LogEntry logEntry = new LogEntry(offset, record);
+ position += logEntry.size();
+ return logEntry;
+ }
+ }
+}