You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/06/04 06:40:45 UTC
[2/3] kafka git commit: kafka-1928;
Move kafka.network over to using the network classes in
org.apache.kafka.common.network; patched by Gwen Shapira;
reviewed by Joel Koshy, Jay Kreps, Jiangjie Qin, Guozhang Wang and Jun Rao
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 57de058..effb1e6 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -17,17 +17,8 @@ import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.nio.channels.CancelledKeyException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.nio.channels.*;
+import java.util.*;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.KafkaException;
@@ -40,20 +31,21 @@ import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A selector interface for doing non-blocking multi-connection network I/O.
+ * A nioSelector interface for doing non-blocking multi-connection network I/O.
* <p>
* This class works with {@link NetworkSend} and {@link NetworkReceive} to transmit size-delimited network requests and
* responses.
* <p>
- * A connection can be added to the selector associated with an integer id by doing
+ * A connection can be added to the nioSelector associated with an integer id by doing
*
* <pre>
- * selector.connect(42, new InetSocketAddress("google.com", server.port), 64000, 64000);
+ * nioSelector.connect(42, new InetSocketAddress("google.com", server.port), 64000, 64000);
* </pre>
*
* The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating
@@ -64,10 +56,10 @@ import org.slf4j.LoggerFactory;
*
* <pre>
* List<NetworkRequest> requestsToSend = Arrays.asList(new NetworkRequest(0, myBytes), new NetworkRequest(1, myOtherBytes));
- * selector.poll(TIMEOUT_MS, requestsToSend);
+ * nioSelector.poll(TIMEOUT_MS, requestsToSend);
* </pre>
*
- * The selector maintains several lists that are reset by each call to <code>poll()</code> which are available via
+ * The nioSelector maintains several lists that are reset by each call to <code>poll()</code> which are available via
* various getters. These are reset by each call to <code>poll()</code>.
*
* This class is not thread safe!
@@ -76,41 +68,59 @@ public class Selector implements Selectable {
private static final Logger log = LoggerFactory.getLogger(Selector.class);
- private final java.nio.channels.Selector selector;
- private final Map<Integer, SelectionKey> keys;
- private final List<NetworkSend> completedSends;
+ private final java.nio.channels.Selector nioSelector;
+ private final Map<String, SelectionKey> keys;
+ private final List<Send> completedSends;
private final List<NetworkReceive> completedReceives;
- private final List<Integer> disconnected;
- private final List<Integer> connected;
- private final List<Integer> failedSends;
+ private final List<String> disconnected;
+ private final List<String> connected;
+ private final List<String> failedSends;
private final Time time;
private final SelectorMetrics sensors;
private final String metricGrpPrefix;
private final Map<String, String> metricTags;
+ private final Map<String, Long> lruConnections;
+ private final long connectionsMaxIdleNanos;
+ private final int maxReceiveSize;
+ private final boolean metricsPerConnection;
+ private long currentTimeNanos;
+ private long nextIdleCloseCheckTime;
+
/**
- * Create a new selector
+ * Create a new nioSelector
*/
- public Selector(Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags) {
+ public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) {
try {
- this.selector = java.nio.channels.Selector.open();
+ this.nioSelector = java.nio.channels.Selector.open();
} catch (IOException e) {
throw new KafkaException(e);
}
+ this.maxReceiveSize = maxReceiveSize;
+ this.connectionsMaxIdleNanos = connectionMaxIdleMs * 1000 * 1000;
this.time = time;
this.metricGrpPrefix = metricGrpPrefix;
this.metricTags = metricTags;
- this.keys = new HashMap<Integer, SelectionKey>();
- this.completedSends = new ArrayList<NetworkSend>();
+ this.keys = new HashMap<String, SelectionKey>();
+ this.completedSends = new ArrayList<Send>();
this.completedReceives = new ArrayList<NetworkReceive>();
- this.connected = new ArrayList<Integer>();
- this.disconnected = new ArrayList<Integer>();
- this.failedSends = new ArrayList<Integer>();
+ this.connected = new ArrayList<String>();
+ this.disconnected = new ArrayList<String>();
+ this.failedSends = new ArrayList<String>();
this.sensors = new SelectorMetrics(metrics);
+ // initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true
+ this.lruConnections = new LinkedHashMap<String, Long>(16, .75F, true);
+ currentTimeNanos = new SystemTime().nanoseconds();
+ nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
+ this.metricsPerConnection = metricsPerConnection;
+ }
+
+ public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags) {
+ this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true);
}
/**
- * Begin connecting to the given address and add the connection to this selector associated with the given id
+ * Begin connecting to the given address and add the connection to this nioSelector associated with the given id
* number.
* <p>
* Note that this call only initiates the connection, which will be completed on a future {@link #poll(long, List)}
@@ -123,7 +133,7 @@ public class Selector implements Selectable {
* @throws IOException if DNS resolution fails on the hostname or if the broker is down
*/
@Override
- public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
+ public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
if (this.keys.containsKey(id))
throw new IllegalStateException("There is already a connection for id " + id);
@@ -143,7 +153,18 @@ public class Selector implements Selectable {
channel.close();
throw e;
}
- SelectionKey key = channel.register(this.selector, SelectionKey.OP_CONNECT);
+ SelectionKey key = channel.register(this.nioSelector, SelectionKey.OP_CONNECT);
+ key.attach(new Transmissions(id));
+ this.keys.put(id, key);
+ }
+
+ /**
+ * Register the nioSelector with an existing channel
+ * Use this on server-side, when a connection is accepted by a different thread but processed by the Selector
+ * Note that we are not checking if the connection id is valid - since the connection already exists
+ */
+ public void register(String id, SocketChannel channel) throws ClosedChannelException {
+ SelectionKey key = channel.register(nioSelector, SelectionKey.OP_READ);
key.attach(new Transmissions(id));
this.keys.put(id, key);
}
@@ -153,18 +174,18 @@ public class Selector implements Selectable {
* processed until the next {@link #poll(long, List) poll()} call.
*/
@Override
- public void disconnect(int id) {
+ public void disconnect(String id) {
SelectionKey key = this.keys.get(id);
if (key != null)
key.cancel();
}
/**
- * Interrupt the selector if it is blocked waiting to do I/O.
+ * Interrupt the nioSelector if it is blocked waiting to do I/O.
*/
@Override
public void wakeup() {
- this.selector.wakeup();
+ this.nioSelector.wakeup();
}
/**
@@ -172,12 +193,14 @@ public class Selector implements Selectable {
*/
@Override
public void close() {
- for (SelectionKey key : this.selector.keys())
- close(key);
+ List<String> connections = new LinkedList<String>(keys.keySet());
+ for (String id: connections)
+ close(id);
+
try {
- this.selector.close();
+ this.nioSelector.close();
} catch (IOException e) {
- log.error("Exception closing selector:", e);
+ log.error("Exception closing nioSelector:", e);
}
}
@@ -185,7 +208,7 @@ public class Selector implements Selectable {
* Queue the given request for sending in the subsequent {@poll(long)} calls
* @param send The request to send
*/
- public void send(NetworkSend send) {
+ public void send(Send send) {
SelectionKey key = keyForId(send.destination());
Transmissions transmissions = transmissions(key);
if (transmissions.hasSend())
@@ -194,7 +217,7 @@ public class Selector implements Selectable {
try {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
} catch (CancelledKeyException e) {
- close(key);
+ close(transmissions.id);
this.failedSends.add(send.destination());
}
}
@@ -220,10 +243,11 @@ public class Selector implements Selectable {
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
long endSelect = time.nanoseconds();
+ currentTimeNanos = endSelect;
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (readyKeys > 0) {
- Set<SelectionKey> keys = this.selector.selectedKeys();
+ Set<SelectionKey> keys = this.nioSelector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
@@ -232,8 +256,9 @@ public class Selector implements Selectable {
Transmissions transmissions = transmissions(key);
SocketChannel channel = channel(key);
- // register all per-broker metrics at once
- sensors.maybeRegisterNodeMetrics(transmissions.id);
+ // register all per-connection metrics at once
+ sensors.maybeRegisterConnectionMetrics(transmissions.id);
+ lruConnections.put(transmissions.id, currentTimeNanos);
try {
/* complete any connections that have finished their handshake */
@@ -247,8 +272,14 @@ public class Selector implements Selectable {
/* read from any connections that have readable data */
if (key.isReadable()) {
if (!transmissions.hasReceive())
- transmissions.receive = new NetworkReceive(transmissions.id);
- transmissions.receive.readFrom(channel);
+ transmissions.receive = new NetworkReceive(maxReceiveSize, transmissions.id);
+ try {
+ transmissions.receive.readFrom(channel);
+ } catch (InvalidReceiveException e) {
+ log.error("Invalid data received from " + transmissions.id + " closing connection", e);
+ close(transmissions.id);
+ throw e;
+ }
if (transmissions.receive.complete()) {
transmissions.receive.payload().rewind();
this.completedReceives.add(transmissions.receive);
@@ -260,7 +291,7 @@ public class Selector implements Selectable {
/* write to any sockets that have space in their buffer and for which we have data */
if (key.isWritable()) {
transmissions.send.writeTo(channel);
- if (transmissions.send.remaining() <= 0) {
+ if (transmissions.send.completed()) {
this.completedSends.add(transmissions.send);
this.sensors.recordBytesSent(transmissions.id, transmissions.send.size());
transmissions.clearSend();
@@ -270,7 +301,7 @@ public class Selector implements Selectable {
/* cancel any defunct sockets */
if (!key.isValid()) {
- close(key);
+ close(transmissions.id);
this.disconnected.add(transmissions.id);
}
} catch (IOException e) {
@@ -279,15 +310,16 @@ public class Selector implements Selectable {
log.info("Connection {} disconnected", desc);
else
log.warn("Error in I/O with connection to {}", desc, e);
- close(key);
+ close(transmissions.id);
this.disconnected.add(transmissions.id);
}
}
}
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
+ maybeCloseOldestConnection();
}
-
+
private String socketDescription(SocketChannel channel) {
Socket socket = channel.socket();
if (socket == null)
@@ -299,7 +331,7 @@ public class Selector implements Selectable {
}
@Override
- public List<NetworkSend> completedSends() {
+ public List<Send> completedSends() {
return this.completedSends;
}
@@ -309,17 +341,17 @@ public class Selector implements Selectable {
}
@Override
- public List<Integer> disconnected() {
+ public List<String> disconnected() {
return this.disconnected;
}
@Override
- public List<Integer> connected() {
+ public List<String> connected() {
return this.connected;
}
@Override
- public void mute(int id) {
+ public void mute(String id) {
mute(this.keyForId(id));
}
@@ -328,7 +360,7 @@ public class Selector implements Selectable {
}
@Override
- public void unmute(int id) {
+ public void unmute(String id) {
unmute(this.keyForId(id));
}
@@ -348,6 +380,25 @@ public class Selector implements Selectable {
unmute(key);
}
+ private void maybeCloseOldestConnection() {
+ if (currentTimeNanos > nextIdleCloseCheckTime) {
+ if (lruConnections.isEmpty()) {
+ nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
+ } else {
+ Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next();
+ Long connectionLastActiveTime = oldestConnectionEntry.getValue();
+ nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;
+ if (currentTimeNanos > nextIdleCloseCheckTime) {
+ String connectionId = oldestConnectionEntry.getKey();
+ if (log.isTraceEnabled())
+ log.trace("About to close the idle connection from " + connectionId
+ + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis");
+ close(connectionId);
+ }
+ }
+ }
+ }
+
/**
* Clear the results from the prior poll
*/
@@ -369,17 +420,19 @@ public class Selector implements Selectable {
*/
private int select(long ms) throws IOException {
if (ms == 0L)
- return this.selector.selectNow();
+ return this.nioSelector.selectNow();
else if (ms < 0L)
- return this.selector.select();
+ return this.nioSelector.select();
else
- return this.selector.select(ms);
+ return this.nioSelector.select(ms);
}
/**
* Begin closing this connection
*/
- private void close(SelectionKey key) {
+ public void close(String id) {
+ SelectionKey key = keyForId(id);
+ lruConnections.remove(id);
SocketChannel channel = channel(key);
Transmissions trans = transmissions(key);
if (trans != null) {
@@ -401,10 +454,10 @@ public class Selector implements Selectable {
/**
* Get the selection key associated with this numeric id
*/
- private SelectionKey keyForId(int id) {
+ private SelectionKey keyForId(String id) {
SelectionKey key = this.keys.get(id);
if (key == null)
- throw new IllegalStateException("Attempt to write to socket for which there is no open connection.");
+ throw new IllegalStateException("Attempt to write to socket for which there is no open connection. Connection id " + id + " existing connections " + keys.keySet().toString());
return key;
}
@@ -426,11 +479,11 @@ public class Selector implements Selectable {
* The id and in-progress send and receive associated with a connection
*/
private static class Transmissions {
- public int id;
- public NetworkSend send;
+ public String id;
+ public Send send;
public NetworkReceive receive;
- public Transmissions(int id) {
+ public Transmissions(String id) {
this.id = id;
}
@@ -464,20 +517,27 @@ public class Selector implements Selectable {
public SelectorMetrics(Metrics metrics) {
this.metrics = metrics;
String metricGrpName = metricGrpPrefix + "-metrics";
+ StringBuilder tagsSuffix = new StringBuilder();
+
+ for (Map.Entry<String, String> tag: metricTags.entrySet()) {
+ tagsSuffix.append(tag.getKey());
+ tagsSuffix.append("-");
+ tagsSuffix.append(tag.getValue());
+ }
- this.connectionClosed = this.metrics.sensor("connections-closed");
+ this.connectionClosed = this.metrics.sensor("connections-closed:" + tagsSuffix.toString());
MetricName metricName = new MetricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags);
this.connectionClosed.add(metricName, new Rate());
- this.connectionCreated = this.metrics.sensor("connections-created");
+ this.connectionCreated = this.metrics.sensor("connections-created:" + tagsSuffix.toString());
metricName = new MetricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags);
this.connectionCreated.add(metricName, new Rate());
- this.bytesTransferred = this.metrics.sensor("bytes-sent-received");
+ this.bytesTransferred = this.metrics.sensor("bytes-sent-received:" + tagsSuffix.toString());
metricName = new MetricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags);
bytesTransferred.add(metricName, new Rate(new Count()));
- this.bytesSent = this.metrics.sensor("bytes-sent", bytesTransferred);
+ this.bytesSent = this.metrics.sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred);
metricName = new MetricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags);
this.bytesSent.add(metricName, new Rate());
metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags);
@@ -487,13 +547,13 @@ public class Selector implements Selectable {
metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags);
this.bytesSent.add(metricName, new Max());
- this.bytesReceived = this.metrics.sensor("bytes-received", bytesTransferred);
+ this.bytesReceived = this.metrics.sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred);
metricName = new MetricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags);
this.bytesReceived.add(metricName, new Rate());
metricName = new MetricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags);
this.bytesReceived.add(metricName, new Rate(new Count()));
- this.selectTime = this.metrics.sensor("select-time");
+ this.selectTime = this.metrics.sensor("select-time:" + tagsSuffix.toString());
metricName = new MetricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags);
this.selectTime.add(metricName, new Rate(new Count()));
metricName = new MetricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
@@ -501,7 +561,7 @@ public class Selector implements Selectable {
metricName = new MetricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags);
this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
- this.ioTime = this.metrics.sensor("io-time");
+ this.ioTime = this.metrics.sensor("io-time:" + tagsSuffix.toString());
metricName = new MetricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
this.ioTime.add(metricName, new Avg());
metricName = new MetricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags);
@@ -515,17 +575,17 @@ public class Selector implements Selectable {
});
}
- public void maybeRegisterNodeMetrics(int node) {
- if (node >= 0) {
- // if one sensor of the metrics has been registered for the node,
+ public void maybeRegisterConnectionMetrics(String connectionId) {
+ if (!connectionId.isEmpty() && metricsPerConnection) {
+ // if one sensor of the metrics has been registered for the connection,
// then all other sensors should have been registered; and vice versa
- String nodeRequestName = "node-" + node + ".bytes-sent";
+ String nodeRequestName = "node-" + connectionId + ".bytes-sent";
Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
if (nodeRequest == null) {
String metricGrpName = metricGrpPrefix + "-node-metrics";
Map<String, String> tags = new LinkedHashMap<String, String>(metricTags);
- tags.put("node-id", "node-" + node);
+ tags.put("node-id", "node-" + connectionId);
nodeRequest = this.metrics.sensor(nodeRequestName);
MetricName metricName = new MetricName("outgoing-byte-rate", metricGrpName, tags);
@@ -537,14 +597,14 @@ public class Selector implements Selectable {
metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
nodeRequest.add(metricName, new Max());
- String nodeResponseName = "node-" + node + ".bytes-received";
+ String nodeResponseName = "node-" + connectionId + ".bytes-received";
Sensor nodeResponse = this.metrics.sensor(nodeResponseName);
metricName = new MetricName("incoming-byte-rate", metricGrpName, tags);
nodeResponse.add(metricName, new Rate());
metricName = new MetricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
nodeResponse.add(metricName, new Rate(new Count()));
- String nodeTimeName = "node-" + node + ".latency";
+ String nodeTimeName = "node-" + connectionId + ".latency";
Sensor nodeRequestTime = this.metrics.sensor(nodeTimeName);
metricName = new MetricName("request-latency-avg", metricGrpName, tags);
nodeRequestTime.add(metricName, new Avg());
@@ -554,22 +614,22 @@ public class Selector implements Selectable {
}
}
- public void recordBytesSent(int node, int bytes) {
+ public void recordBytesSent(String connectionId, long bytes) {
long now = time.milliseconds();
this.bytesSent.record(bytes, now);
- if (node >= 0) {
- String nodeRequestName = "node-" + node + ".bytes-sent";
+ if (!connectionId.isEmpty()) {
+ String nodeRequestName = "node-" + connectionId + ".bytes-sent";
Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
if (nodeRequest != null)
nodeRequest.record(bytes, now);
}
}
- public void recordBytesReceived(int node, int bytes) {
+ public void recordBytesReceived(String connection, int bytes) {
long now = time.milliseconds();
this.bytesReceived.record(bytes, now);
- if (node >= 0) {
- String nodeRequestName = "node-" + node + ".bytes-received";
+ if (!connection.isEmpty()) {
+ String nodeRequestName = "node-" + connection + ".bytes-received";
Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
if (nodeRequest != null)
nodeRequest.record(bytes, now);
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/Send.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Send.java b/clients/src/main/java/org/apache/kafka/common/network/Send.java
index 5d321a0..8f6daad 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Send.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Send.java
@@ -13,7 +13,6 @@
package org.apache.kafka.common.network;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
/**
@@ -24,12 +23,7 @@ public interface Send {
/**
* The numeric id for the destination of this send
*/
- public int destination();
-
- /**
- * The number of bytes remaining to send
- */
- public int remaining();
+ public String destination();
/**
* Is this send complete?
@@ -37,11 +31,6 @@ public interface Send {
public boolean completed();
/**
- * An optional method to turn this send into an array of ByteBuffers if possible (otherwise returns null)
- */
- public ByteBuffer[] reify();
-
- /**
* Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send
* to be completely written
* @param channel The channel to write to
@@ -50,4 +39,9 @@ public interface Send {
*/
public long writeTo(GatheringByteChannel channel) throws IOException;
+ /**
+ * Size of the send
+ */
+ public long size();
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
index 27cbf39..3fec60b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
@@ -25,7 +25,7 @@ public class RequestSend extends NetworkSend {
private final RequestHeader header;
private final Struct body;
- public RequestSend(int destination, RequestHeader header, Struct body) {
+ public RequestSend(String destination, RequestHeader header, Struct body) {
super(destination, serialize(header, body));
this.header = header;
this.body = body;
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
new file mode 100644
index 0000000..12b06d1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.network.NetworkSend;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class ResponseSend extends NetworkSend {
+
+ public ResponseSend(String destination, ResponseHeader header, Struct body) {
+ super(destination, serialize(header, body));
+ }
+
+ public ResponseSend(String destination, ResponseHeader header, AbstractRequestResponse response) {
+ this(destination, header, response.toStruct());
+ }
+
+ private static ByteBuffer serialize(ResponseHeader header, Struct body) {
+ ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
+ header.writeTo(buffer);
+ body.writeTo(buffer);
+ buffer.rewind();
+ return buffer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 5e3fab1..d9c97e9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -78,7 +78,7 @@ public class MockClient implements KafkaClient {
return false;
}
- public void disconnect(Integer node) {
+ public void disconnect(String node) {
Iterator<ClientRequest> iter = requests.iterator();
while (iter.hasNext()) {
ClientRequest request = iter.next();
@@ -115,7 +115,7 @@ public class MockClient implements KafkaClient {
}
@Override
- public List<ClientResponse> completeAll(int node, long now) {
+ public List<ClientResponse> completeAll(String node, long now) {
return completeAll(now);
}
@@ -158,7 +158,7 @@ public class MockClient implements KafkaClient {
}
@Override
- public int inFlightRequestCount(int nodeId) {
+ public int inFlightRequestCount(String nodeId) {
return requests.size();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 8b27889..43238ce 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -65,7 +65,7 @@ public class NetworkClientTest {
client.poll(1, time.milliseconds());
selector.clear();
assertTrue("Now the client is ready", client.ready(node, time.milliseconds()));
- selector.disconnect(node.id());
+ selector.disconnect(node.idString());
client.poll(1, time.milliseconds());
selector.clear();
assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds()));
@@ -74,7 +74,7 @@ public class NetworkClientTest {
@Test(expected = IllegalStateException.class)
public void testSendToUnreadyNode() {
- RequestSend send = new RequestSend(5,
+ RequestSend send = new RequestSend("5",
client.nextRequestHeader(ApiKeys.METADATA),
new MetadataRequest(Arrays.asList("test")).toStruct());
ClientRequest request = new ClientRequest(time.milliseconds(), false, send, null);
@@ -86,7 +86,7 @@ public class NetworkClientTest {
public void testSimpleRequestResponse() {
ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, ByteBuffer>emptyMap());
RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE);
- RequestSend send = new RequestSend(node.id(), reqHeader, produceRequest.toStruct());
+ RequestSend send = new RequestSend(node.idString(), reqHeader, produceRequest.toStruct());
TestCallbackHandler handler = new TestCallbackHandler();
ClientRequest request = new ClientRequest(time.milliseconds(), true, send, handler);
awaitReady(client, node);
@@ -101,7 +101,7 @@ public class NetworkClientTest {
respHeader.writeTo(buffer);
resp.writeTo(buffer);
buffer.flip();
- selector.completeReceive(new NetworkReceive(node.id(), buffer));
+ selector.completeReceive(new NetworkReceive(node.idString(), buffer));
List<ClientResponse> responses = client.poll(1, time.milliseconds());
assertEquals(1, responses.size());
assertTrue("The handler should have executed.", handler.executed);
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index d5b306b..d23b4b6 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -22,10 +22,7 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
+import java.util.*;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
@@ -49,7 +46,7 @@ public class SelectorTest {
public void setup() throws Exception {
this.server = new EchoServer();
this.server.start();
- this.selector = new Selector(new Metrics(), new MockTime() , "MetricGroup", new LinkedHashMap<String, String>());
+ this.selector = new Selector(5000, new Metrics(), new MockTime() , "MetricGroup", new LinkedHashMap<String, String>());
}
@After
@@ -63,7 +60,7 @@ public class SelectorTest {
*/
@Test
public void testServerDisconnect() throws Exception {
- int node = 0;
+ String node = "0";
// connect and do a simple request
blockingConnect(node);
@@ -84,7 +81,7 @@ public class SelectorTest {
*/
@Test
public void testClientDisconnect() throws Exception {
- int node = 0;
+ String node = "0";
blockingConnect(node);
selector.disconnect(node);
selector.send(createSend(node, "hello1"));
@@ -101,7 +98,7 @@ public class SelectorTest {
*/
@Test(expected = IllegalStateException.class)
public void testCantSendWithInProgress() throws Exception {
- int node = 0;
+ String node = "0";
blockingConnect(node);
selector.send(createSend(node, "test1"));
selector.send(createSend(node, "test2"));
@@ -113,7 +110,7 @@ public class SelectorTest {
*/
@Test(expected = IllegalStateException.class)
public void testCantSendWithoutConnecting() throws Exception {
- selector.send(createSend(0, "test"));
+ selector.send(createSend("0", "test"));
selector.poll(1000L);
}
@@ -122,7 +119,7 @@ public class SelectorTest {
*/
@Test(expected = IOException.class)
public void testNoRouteToHost() throws Exception {
- selector.connect(0, new InetSocketAddress("asdf.asdf.dsc", server.port), BUFFER_SIZE, BUFFER_SIZE);
+ selector.connect("0", new InetSocketAddress("asdf.asdf.dsc", server.port), BUFFER_SIZE, BUFFER_SIZE);
}
/**
@@ -130,7 +127,7 @@ public class SelectorTest {
*/
@Test
public void testConnectionRefused() throws Exception {
- int node = 0;
+ String node = "0";
ServerSocket nonListeningSocket = new ServerSocket(0);
int nonListeningPort = nonListeningSocket.getLocalPort();
selector.connect(node, new InetSocketAddress("localhost", nonListeningPort), BUFFER_SIZE, BUFFER_SIZE);
@@ -151,14 +148,15 @@ public class SelectorTest {
// create connections
InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
for (int i = 0; i < conns; i++)
- selector.connect(i, addr, BUFFER_SIZE, BUFFER_SIZE);
-
+ selector.connect(Integer.toString(i), addr, BUFFER_SIZE, BUFFER_SIZE);
// send echo requests and receive responses
- int[] requests = new int[conns];
- int[] responses = new int[conns];
+ Map<String, Integer> requests = new HashMap<String, Integer>();
+ Map<String, Integer> responses = new HashMap<String, Integer>();
int responseCount = 0;
- for (int i = 0; i < conns; i++)
- selector.send(createSend(i, i + "-" + 0));
+ for (int i = 0; i < conns; i++) {
+ String node = Integer.toString(i);
+ selector.send(createSend(node, node + "-0"));
+ }
// loop until we complete all requests
while (responseCount < conns * reqs) {
@@ -171,19 +169,27 @@ public class SelectorTest {
for (NetworkReceive receive : selector.completedReceives()) {
String[] pieces = asString(receive).split("-");
assertEquals("Should be in the form 'conn-counter'", 2, pieces.length);
- assertEquals("Check the source", receive.source(), Integer.parseInt(pieces[0]));
+ assertEquals("Check the source", receive.source(), pieces[0]);
assertEquals("Check that the receive has kindly been rewound", 0, receive.payload().position());
- assertEquals("Check the request counter", responses[receive.source()], Integer.parseInt(pieces[1]));
- responses[receive.source()]++; // increment the expected counter
+ if (responses.containsKey(receive.source())) {
+ assertEquals("Check the request counter", (int) responses.get(receive.source()), Integer.parseInt(pieces[1]));
+ responses.put(receive.source(), responses.get(receive.source()) + 1);
+ } else {
+ assertEquals("Check the request counter", 0, Integer.parseInt(pieces[1]));
+ responses.put(receive.source(), 1);
+ }
responseCount++;
}
// prepare new sends for the next round
- for (NetworkSend send : selector.completedSends()) {
- int dest = send.destination();
- requests[dest]++;
- if (requests[dest] < reqs)
- selector.send(createSend(dest, dest + "-" + requests[dest]));
+ for (Send send : selector.completedSends()) {
+ String dest = send.destination();
+ if (requests.containsKey(dest))
+ requests.put(dest, requests.get(dest) + 1);
+ else
+ requests.put(dest, 1);
+ if (requests.get(dest) < reqs)
+ selector.send(createSend(dest, dest + "-" + requests.get(dest)));
}
}
}
@@ -193,7 +199,7 @@ public class SelectorTest {
*/
@Test
public void testSendLargeRequest() throws Exception {
- int node = 0;
+ String node = "0";
blockingConnect(node);
String big = TestUtils.randomString(10 * BUFFER_SIZE);
assertEquals(big, blockingRequest(node, big));
@@ -204,41 +210,41 @@ public class SelectorTest {
*/
@Test
public void testEmptyRequest() throws Exception {
- int node = 0;
+ String node = "0";
blockingConnect(node);
assertEquals("", blockingRequest(node, ""));
}
@Test(expected = IllegalStateException.class)
public void testExistingConnectionId() throws IOException {
- blockingConnect(0);
- blockingConnect(0);
+ blockingConnect("0");
+ blockingConnect("0");
}
@Test
public void testMute() throws Exception {
- blockingConnect(0);
- blockingConnect(1);
+ blockingConnect("0");
+ blockingConnect("1");
- selector.send(createSend(0, "hello"));
- selector.send(createSend(1, "hi"));
+ selector.send(createSend("0", "hello"));
+ selector.send(createSend("1", "hi"));
- selector.mute(1);
+ selector.mute("1");
while (selector.completedReceives().isEmpty())
selector.poll(5);
assertEquals("We should have only one response", 1, selector.completedReceives().size());
- assertEquals("The response should not be from the muted node", 0, selector.completedReceives().get(0).source());
+ assertEquals("The response should not be from the muted node", "0", selector.completedReceives().get(0).source());
- selector.unmute(1);
+ selector.unmute("1");
do {
selector.poll(5);
} while (selector.completedReceives().isEmpty());
assertEquals("We should have only one response", 1, selector.completedReceives().size());
- assertEquals("The response should be from the previously muted node", 1, selector.completedReceives().get(0).source());
+ assertEquals("The response should be from the previously muted node", "1", selector.completedReceives().get(0).source());
}
- private String blockingRequest(int node, String s) throws IOException {
+ private String blockingRequest(String node, String s) throws IOException {
selector.send(createSend(node, s));
selector.poll(1000L);
while (true) {
@@ -250,13 +256,13 @@ public class SelectorTest {
}
/* connect and wait for the connection to complete */
- private void blockingConnect(int node) throws IOException {
+ private void blockingConnect(String node) throws IOException {
selector.connect(node, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
while (!selector.connected().contains(node))
selector.poll(10000L);
}
- private NetworkSend createSend(int node, String s) {
+ private NetworkSend createSend(String node, String s) {
return new NetworkSend(node, ByteBuffer.wrap(s.getBytes()));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/test/java/org/apache/kafka/test/MockSelector.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index ea89b06..51eb9d1 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -20,6 +20,7 @@ import java.util.List;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.utils.Time;
/**
@@ -28,23 +29,23 @@ import org.apache.kafka.common.utils.Time;
public class MockSelector implements Selectable {
private final Time time;
- private final List<NetworkSend> initiatedSends = new ArrayList<NetworkSend>();
- private final List<NetworkSend> completedSends = new ArrayList<NetworkSend>();
+ private final List<Send> initiatedSends = new ArrayList<Send>();
+ private final List<Send> completedSends = new ArrayList<Send>();
private final List<NetworkReceive> completedReceives = new ArrayList<NetworkReceive>();
- private final List<Integer> disconnected = new ArrayList<Integer>();
- private final List<Integer> connected = new ArrayList<Integer>();
+ private final List<String> disconnected = new ArrayList<String>();
+ private final List<String> connected = new ArrayList<String>();
public MockSelector(Time time) {
this.time = time;
}
@Override
- public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
+ public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
this.connected.add(id);
}
@Override
- public void disconnect(int id) {
+ public void disconnect(String id) {
this.disconnected.add(id);
}
@@ -64,7 +65,7 @@ public class MockSelector implements Selectable {
}
@Override
- public void send(NetworkSend send) {
+ public void send(Send send) {
this.initiatedSends.add(send);
}
@@ -76,7 +77,7 @@ public class MockSelector implements Selectable {
}
@Override
- public List<NetworkSend> completedSends() {
+ public List<Send> completedSends() {
return completedSends;
}
@@ -94,21 +95,21 @@ public class MockSelector implements Selectable {
}
@Override
- public List<Integer> disconnected() {
+ public List<String> disconnected() {
return disconnected;
}
@Override
- public List<Integer> connected() {
+ public List<String> connected() {
return connected;
}
@Override
- public void mute(int id) {
+ public void mute(String id) {
}
@Override
- public void unmute(int id) {
+ public void unmute(String id) {
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/Kafka.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index 9efabaa..6af7b80 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -17,6 +17,8 @@
package kafka
+import java.util.Properties
+
import scala.collection.JavaConversions._
import joptsimple.OptionParser
import metrics.KafkaMetricsReporter
@@ -26,7 +28,7 @@ import org.apache.kafka.common.utils.Utils
object Kafka extends Logging {
- def getKafkaConfigFromArgs(args: Array[String]): KafkaConfig = {
+ def getPropsFromArgs(args: Array[String]): Properties = {
val optionParser = new OptionParser
val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")
.withRequiredArg()
@@ -47,14 +49,14 @@ object Kafka extends Logging {
props.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt)))
}
-
- KafkaConfig.fromProps(props)
+ props
}
def main(args: Array[String]): Unit = {
try {
- val serverConfig = getKafkaConfigFromArgs(args)
- KafkaMetricsReporter.startReporters(new VerifiableProperties(serverConfig.toProps))
+ val serverProps = getPropsFromArgs(args)
+ val serverConfig = KafkaConfig.fromProps(serverProps)
+ KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))
val kafkaServerStartable = new KafkaServerStartable(serverConfig)
// attach shutdown handler to catch control-c
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 6d1c6ab..f23120e 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -174,7 +174,7 @@ object ConsumerGroupCommand {
val offsetMap = mutable.Map[TopicAndPartition, Long]()
val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs)
channel.send(OffsetFetchRequest(group, topicPartitions))
- val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer)
+ val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload())
offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) =>
if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
index a3b1b78..258d5fe 100644
--- a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
@@ -18,9 +18,10 @@
package kafka.api
import java.nio.ByteBuffer
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
-import kafka.network.RequestChannel.Response
+
import kafka.common.ErrorMapping
+import kafka.network.{RequestOrResponseSend, RequestChannel}
+import kafka.network.RequestChannel.Response
object ConsumerMetadataRequest {
val CurrentVersion = 0.shortValue
@@ -64,7 +65,7 @@ case class ConsumerMetadataRequest(group: String,
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
// return ConsumerCoordinatorNotAvailable for all uncaught errors
val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId)
- requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+ requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
def describe(details: Boolean) = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
index fe81635..8092007 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -18,10 +18,9 @@
package kafka.api
import java.nio.ByteBuffer
-import kafka.api.ApiUtils._
-import collection.mutable.ListBuffer
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
-import kafka.common.{TopicAndPartition, ErrorMapping}
+
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
import kafka.utils.Logging
@@ -63,7 +62,7 @@ case class ControlledShutdownRequest(versionId: Short,
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val errorResponse = ControlledShutdownResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]), Set.empty[TopicAndPartition])
- requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+ requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
override def describe(details: Boolean = false): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index b038c15..5b38f85 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -149,7 +149,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
(topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, MessageSet.Empty))
}
val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData)
- requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse)))
+ requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, errorResponse)))
}
override def describe(details: Boolean): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index 75aaf57..0b6b33a 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -22,8 +22,10 @@ import java.nio.channels.GatheringByteChannel
import kafka.common.{TopicAndPartition, ErrorMapping}
import kafka.message.{MessageSet, ByteBufferMessageSet}
-import kafka.network.{MultiSend, Send}
import kafka.api.ApiUtils._
+import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.network.Send
+import org.apache.kafka.common.network.MultiSend
import scala.collection._
@@ -62,10 +64,12 @@ class PartitionDataSend(val partitionId: Int,
buffer.putInt(partitionData.messages.sizeInBytes)
buffer.rewind()
- override def complete = !buffer.hasRemaining && messagesSentSize >= messageSize
+ override def completed = !buffer.hasRemaining && messagesSentSize >= messageSize
- override def writeTo(channel: GatheringByteChannel): Int = {
- var written = 0
+ override def destination: String = ""
+
+ override def writeTo(channel: GatheringByteChannel): Long = {
+ var written = 0L
if(buffer.hasRemaining)
written += channel.write(buffer)
if(!buffer.hasRemaining && messagesSentSize < messageSize) {
@@ -75,6 +79,8 @@ class PartitionDataSend(val partitionId: Int,
}
written
}
+
+ override def size = buffer.capacity() + messageSize
}
object TopicData {
@@ -101,29 +107,32 @@ case class TopicData(topic: String, partitionData: Map[Int, FetchResponsePartiti
val headerSize = TopicData.headerSize(topic)
}
-class TopicDataSend(val topicData: TopicData) extends Send {
- private val size = topicData.sizeInBytes
+class TopicDataSend(val dest: String, val topicData: TopicData) extends Send {
+
+ private var sent = 0L
- private var sent = 0
+ override def completed: Boolean = sent >= size
- override def complete = sent >= size
+ override def destination: String = dest
+
+ override def size = topicData.headerSize + sends.size()
private val buffer = ByteBuffer.allocate(topicData.headerSize)
writeShortString(buffer, topicData.topic)
buffer.putInt(topicData.partitionData.size)
buffer.rewind()
- val sends = new MultiSend(topicData.partitionData.toList
- .map(d => new PartitionDataSend(d._1, d._2))) {
- val expectedBytesToWrite = topicData.sizeInBytes - topicData.headerSize
- }
+ private val sends = new MultiSend(dest,
+ JavaConversions.seqAsJavaList(topicData.partitionData.toList.map(d => new PartitionDataSend(d._1, d._2))))
- def writeTo(channel: GatheringByteChannel): Int = {
- expectIncomplete()
- var written = 0
+ override def writeTo(channel: GatheringByteChannel): Long = {
+ if (completed)
+ throw new KafkaException("This operation cannot be completed on a complete request.")
+
+ var written = 0L
if(buffer.hasRemaining)
written += channel.write(buffer)
- if(!buffer.hasRemaining && !sends.complete) {
+ if(!buffer.hasRemaining && !sends.completed) {
written += sends.writeTo(channel)
}
sent += written
@@ -200,34 +209,36 @@ case class FetchResponse(correlationId: Int, data: Map[TopicAndPartition, FetchR
}
-class FetchResponseSend(val fetchResponse: FetchResponse) extends Send {
- private val size = fetchResponse.sizeInBytes
+class FetchResponseSend(val dest: String, val fetchResponse: FetchResponse) extends Send {
+ private val payloadSize = fetchResponse.sizeInBytes
+
+ private var sent = 0L
- private var sent = 0
+ override def size = 4 /* for size byte */ + payloadSize
- private val sendSize = 4 /* for size */ + size
+ override def completed = sent >= size
- override def complete = sent >= sendSize
+ override def destination = dest
private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize)
- buffer.putInt(size)
+ buffer.putInt(payloadSize)
buffer.putInt(fetchResponse.correlationId)
buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count
buffer.rewind()
- val sends = new MultiSend(fetchResponse.dataGroupedByTopic.toList.map {
- case(topic, data) => new TopicDataSend(TopicData(topic,
+ private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(fetchResponse.dataGroupedByTopic.toList.map {
+ case(topic, data) => new TopicDataSend(dest, TopicData(topic,
data.map{case(topicAndPartition, message) => (topicAndPartition.partition, message)}))
- }) {
- val expectedBytesToWrite = fetchResponse.sizeInBytes - FetchResponse.headerSize
- }
+ }))
+
+ override def writeTo(channel: GatheringByteChannel): Long = {
+ if (completed)
+ throw new KafkaException("This operation cannot be completed on a complete request.")
- def writeTo(channel: GatheringByteChannel):Int = {
- expectIncomplete()
- var written = 0
+ var written = 0L
if(buffer.hasRemaining)
written += channel.write(buffer)
- if(!buffer.hasRemaining && !sends.complete) {
+ if(!buffer.hasRemaining && !sends.completed) {
written += sends.writeTo(channel)
}
sent += written
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 431190a..c2584e0 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -19,14 +19,16 @@
package kafka.api
import java.nio._
-import kafka.utils._
+
import kafka.api.ApiUtils._
import kafka.cluster.BrokerEndPoint
-import kafka.controller.LeaderIsrAndControllerEpoch
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
import kafka.common.ErrorMapping
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
-import collection.Set
+import kafka.utils._
+
+import scala.collection.Set
object LeaderAndIsr {
@@ -184,7 +186,7 @@ case class LeaderAndIsrRequest (versionId: Short,
case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}
val errorResponse = LeaderAndIsrResponse(correlationId, responseMap)
- requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+ requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
override def describe(details: Boolean): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 317daed..5b362ef 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -18,11 +18,13 @@
package kafka.api
import java.nio.ByteBuffer
+
import kafka.api.ApiUtils._
-import kafka.utils.{SystemTime, Logging}
-import kafka.network.{RequestChannel, BoundedByteBufferSend}
-import kafka.common.{OffsetMetadata, OffsetAndMetadata, ErrorMapping, TopicAndPartition}
+import kafka.common.{ErrorMapping, OffsetAndMetadata, TopicAndPartition}
+import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
+import kafka.utils.Logging
+
import scala.collection._
object OffsetCommitRequest extends Logging {
@@ -162,7 +164,7 @@ case class OffsetCommitRequest(groupId: String,
val commitStatus = requestInfo.mapValues(_ => errorCode)
val commitResponse = OffsetCommitResponse(commitStatus, correlationId)
- requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(commitResponse)))
+ requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, commitResponse)))
}
override def describe(details: Boolean): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index fa8bd6a..a83e147 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -17,16 +17,13 @@
package kafka.api
+import java.nio.ByteBuffer
+
import kafka.api.ApiUtils._
-import kafka.utils.Logging
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
-import kafka.common._
-import kafka.common.TopicAndPartition
+import kafka.common.{TopicAndPartition, _}
+import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
-
-import scala.Some
-
-import java.nio.ByteBuffer
+import kafka.utils.Logging
object OffsetFetchRequest extends Logging {
val CurrentVersion: Short = 1
@@ -99,7 +96,7 @@ case class OffsetFetchRequest(groupId: String,
))
}.toMap
val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId)
- requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+ requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
override def describe(details: Boolean): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index 3d483bc..f418868 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -18,9 +18,10 @@
package kafka.api
import java.nio.ByteBuffer
-import kafka.common.{ErrorMapping, TopicAndPartition}
+
import kafka.api.ApiUtils._
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
@@ -117,7 +118,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
(topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), null))
}
val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap)
- requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+ requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
override def describe(details: Boolean): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 570b2da..c866180 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -18,11 +18,12 @@
package kafka.api
import java.nio._
-import kafka.message._
+
import kafka.api.ApiUtils._
import kafka.common._
+import kafka.message._
+import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
-import kafka.network.{RequestChannel, BoundedByteBufferSend}
object ProducerRequest {
val CurrentVersion = 0.shortValue
@@ -136,7 +137,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
(topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l))
}
val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
- requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+ requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index ef7a86e..155cb65 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -20,6 +20,8 @@ package kafka.api
import kafka.common.KafkaException
import java.nio.ByteBuffer
+import kafka.network.InvalidRequestException
+
object RequestKeys {
val ProduceKey: Short = 0
val FetchKey: Short = 1
@@ -59,7 +61,7 @@ object RequestKeys {
def deserializerForKey(key: Short): (ByteBuffer) => RequestOrResponse = {
keyToNameAndDeserializerMap.get(key) match {
case Some(nameAndSerializer) => nameAndSerializer._2
- case None => throw new KafkaException("Wrong request type %d".format(key))
+ case None => throw new InvalidRequestException("Wrong request type %d".format(key))
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index 5e14987..4441fc6 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -19,7 +19,7 @@ package kafka.api
import java.nio._
import kafka.api.ApiUtils._
-import kafka.network.{BoundedByteBufferSend, RequestChannel, InvalidRequestException}
+import kafka.network.{RequestOrResponseSend, RequestChannel, InvalidRequestException}
import kafka.common.{TopicAndPartition, ErrorMapping}
import kafka.network.RequestChannel.Response
import kafka.utils.Logging
@@ -106,7 +106,7 @@ case class StopReplicaRequest(versionId: Short,
case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}.toMap
val errorResponse = StopReplicaResponse(correlationId, responseMap)
- requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+ requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
override def describe(details: Boolean): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 363bae0..401c583 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -18,13 +18,15 @@
package kafka.api
import java.nio.ByteBuffer
+
import kafka.api.ApiUtils._
-import collection.mutable.ListBuffer
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
import kafka.common.ErrorMapping
+import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
import kafka.utils.Logging
+import scala.collection.mutable.ListBuffer
+
object TopicMetadataRequest extends Logging {
val CurrentVersion = 0.shortValue
val DefaultClientId = ""
@@ -80,7 +82,7 @@ case class TopicMetadataRequest(versionId: Short,
topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}
val errorResponse = TopicMetadataResponse(Seq(), topicMetadata, correlationId)
- requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+ requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
override def describe(details: Boolean): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
index 69f0397..d59de82 100644
--- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
@@ -21,8 +21,8 @@ import java.nio.ByteBuffer
import kafka.api.ApiUtils._
import kafka.cluster.{Broker, BrokerEndPoint}
import kafka.common.{ErrorMapping, KafkaException, TopicAndPartition}
+import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
import org.apache.kafka.common.protocol.SecurityProtocol
import scala.collection.Set
@@ -128,7 +128,7 @@ case class UpdateMetadataRequest (versionId: Short,
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val errorResponse = new UpdateMetadataResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]))
- requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+ requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
override def describe(details: Boolean): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 62394c0..68c7e7f 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -153,7 +153,7 @@ object ClientUtils extends Logging{
debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group))
queryChannel.send(ConsumerMetadataRequest(group))
val response = queryChannel.receive()
- val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.buffer)
+ val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.payload())
debug("Consumer metadata response: " + consumerMetadataResponse.toString)
if (consumerMetadataResponse.errorCode == ErrorMapping.NoError)
coordinatorOpt = consumerMetadataResponse.coordinatorOpt
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 31a2639..c16f7ed 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -24,6 +24,7 @@ import kafka.api._
import kafka.network._
import kafka.utils._
import kafka.common.{ErrorMapping, TopicAndPartition}
+import org.apache.kafka.common.network.{NetworkReceive, Receive}
import org.apache.kafka.common.utils.Utils._
/**
@@ -65,9 +66,9 @@ class SimpleConsumer(val host: String,
}
}
- private def sendRequest(request: RequestOrResponse): Receive = {
+ private def sendRequest(request: RequestOrResponse): NetworkReceive = {
lock synchronized {
- var response: Receive = null
+ var response: NetworkReceive = null
try {
getOrMakeConnection()
blockingChannel.send(request)
@@ -94,12 +95,12 @@ class SimpleConsumer(val host: String,
def send(request: TopicMetadataRequest): TopicMetadataResponse = {
val response = sendRequest(request)
- TopicMetadataResponse.readFrom(response.buffer)
+ TopicMetadataResponse.readFrom(response.payload())
}
def send(request: ConsumerMetadataRequest): ConsumerMetadataResponse = {
val response = sendRequest(request)
- ConsumerMetadataResponse.readFrom(response.buffer)
+ ConsumerMetadataResponse.readFrom(response.payload())
}
/**
@@ -109,7 +110,7 @@ class SimpleConsumer(val host: String,
* @return a set of fetched messages
*/
def fetch(request: FetchRequest): FetchResponse = {
- var response: Receive = null
+ var response: NetworkReceive = null
val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestTimer
val aggregateTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestTimer
aggregateTimer.time {
@@ -117,7 +118,7 @@ class SimpleConsumer(val host: String,
response = sendRequest(request)
}
}
- val fetchResponse = FetchResponse.readFrom(response.buffer)
+ val fetchResponse = FetchResponse.readFrom(response.payload())
val fetchedSize = fetchResponse.sizeInBytes
fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestSizeHist.update(fetchedSize)
fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize)
@@ -129,7 +130,7 @@ class SimpleConsumer(val host: String,
* @param request a [[kafka.api.OffsetRequest]] object.
* @return a [[kafka.api.OffsetResponse]] object.
*/
- def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer)
+ def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).payload())
/**
* Commit offsets for a topic
@@ -140,7 +141,7 @@ class SimpleConsumer(val host: String,
def commitOffsets(request: OffsetCommitRequest) = {
// TODO: With KAFKA-1012, we have to first issue a ConsumerMetadataRequest and connect to the coordinator before
// we can commit offsets.
- OffsetCommitResponse.readFrom(sendRequest(request).buffer)
+ OffsetCommitResponse.readFrom(sendRequest(request).payload())
}
/**
@@ -149,7 +150,7 @@ class SimpleConsumer(val host: String,
* @param request a [[kafka.api.OffsetFetchRequest]] object.
* @return a [[kafka.api.OffsetFetchResponse]] object.
*/
- def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).buffer)
+ def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).payload())
private def getOrMakeConnection() {
if(!isClosed && !blockingChannel.isConnected) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index a7f2acc..e42d104 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -334,7 +334,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
try {
kafkaCommitMeter.mark(offsetsToCommit.size)
offsetsChannel.send(offsetCommitRequest)
- val offsetCommitResponse = OffsetCommitResponse.readFrom(offsetsChannel.receive().buffer)
+ val offsetCommitResponse = OffsetCommitResponse.readFrom(offsetsChannel.receive().payload())
trace("Offset commit response: %s.".format(offsetCommitResponse))
val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = {
@@ -421,7 +421,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
ensureOffsetManagerConnected()
try {
offsetsChannel.send(offsetFetchRequest)
- val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetsChannel.receive().buffer)
+ val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetsChannel.receive().payload())
trace("Offset fetch response: %s.".format(offsetFetchResponse))
val (leaderChanged, loadInProgress) =
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 6cf13f0..9f521fa 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -16,8 +16,9 @@
*/
package kafka.controller
-import kafka.network.{Receive, BlockingChannel}
+import kafka.network.BlockingChannel
import kafka.utils.{CoreUtils, Logging, ShutdownableThread}
+import org.apache.kafka.common.network.NetworkReceive
import collection.mutable.HashMap
import kafka.cluster.Broker
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
@@ -120,7 +121,7 @@ class RequestSendThread(val controllerId: Int,
val queueItem = queue.take()
val request = queueItem._1
val callback = queueItem._2
- var receive: Receive = null
+ var receive: NetworkReceive = null
try {
lock synchronized {
var isSendSuccessful = false
@@ -147,11 +148,11 @@ class RequestSendThread(val controllerId: Int,
var response: RequestOrResponse = null
request.requestId.get match {
case RequestKeys.LeaderAndIsrKey =>
- response = LeaderAndIsrResponse.readFrom(receive.buffer)
+ response = LeaderAndIsrResponse.readFrom(receive.payload())
case RequestKeys.StopReplicaKey =>
- response = StopReplicaResponse.readFrom(receive.buffer)
+ response = StopReplicaResponse.readFrom(receive.payload())
case RequestKeys.UpdateMetadataKey =>
- response = UpdateMetadataResponse.readFrom(receive.buffer)
+ response = UpdateMetadataResponse.readFrom(receive.payload())
}
stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s"
.format(controllerId, controllerContext.epoch, response.toString, toBroker.toString))
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
index b0b7be1..568d0ac 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
@@ -16,12 +16,11 @@
*/
package kafka.javaapi
-import kafka.api._
import java.nio.ByteBuffer
+
+import kafka.api._
+
import scala.collection.mutable
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
-import kafka.common.ErrorMapping
-import kafka.network.RequestChannel.Response
class TopicMetadataRequest(val versionId: Short,
val correlationId: Int,
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/network/BlockingChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala
index 6e2a38e..1197259 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -19,8 +19,10 @@ package kafka.network
import java.net.InetSocketAddress
import java.nio.channels._
-import kafka.utils.{nonthreadsafe, Logging}
+
import kafka.api.RequestOrResponse
+import kafka.utils.{Logging, nonthreadsafe}
+import org.apache.kafka.common.network.NetworkReceive
object BlockingChannel{
@@ -43,6 +45,7 @@ class BlockingChannel( val host: String,
private var writeChannel: GatheringByteChannel = null
private val lock = new Object()
private val connectTimeoutMs = readTimeoutMs
+ private var connectionId: String = ""
def connect() = lock synchronized {
if(!connected) {
@@ -59,8 +62,15 @@ class BlockingChannel( val host: String,
channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)
writeChannel = channel
+ // Need to create a new ReadableByteChannel from input stream because SocketChannel doesn't implement read with timeout
+ // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work
readChannel = Channels.newChannel(channel.socket().getInputStream)
connected = true
+ val localHost = channel.socket.getLocalAddress.getHostAddress
+ val localPort = channel.socket.getLocalPort
+ val remoteHost = channel.socket.getInetAddress.getHostAddress
+ val remotePort = channel.socket.getPort
+ connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort
// settings may not match what we requested above
val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d), connectTimeoutMs = %d."
debug(msg.format(channel.socket.getSoTimeout,
@@ -95,20 +105,21 @@ class BlockingChannel( val host: String,
def isConnected = connected
- def send(request: RequestOrResponse):Int = {
+ def send(request: RequestOrResponse): Long = {
if(!connected)
throw new ClosedChannelException()
- val send = new BoundedByteBufferSend(request)
+ val send = new RequestOrResponseSend(connectionId, request)
send.writeCompletely(writeChannel)
}
- def receive(): Receive = {
+ def receive(): NetworkReceive = {
if(!connected)
throw new ClosedChannelException()
- val response = new BoundedByteBufferReceive()
+ val response = new NetworkReceive()
response.readCompletely(readChannel)
+ response.payload().rewind()
response
}