You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/11/19 09:34:45 UTC
kafka git commit: KAFKA-3703;
Graceful close for consumers and producer with acks=0
Repository: kafka
Updated Branches:
refs/heads/trunk edfa28067 -> e53babab9
KAFKA-3703; Graceful close for consumers and producer with acks=0
Process requests received from channels before they were closed. For consumers, wait for coordinator requests to complete before returning from close.
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Jason Gustafson <ja...@confluent.io>, Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #1836 from rajinisivaram/KAFKA-3703
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e53babab
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e53babab
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e53babab
Branch: refs/heads/trunk
Commit: e53babab9cada20cc54a18c0fd63aa5ab84fd012
Parents: edfa280
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Sat Nov 19 09:33:48 2016 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat Nov 19 09:33:48 2016 +0000
----------------------------------------------------------------------
.../consumer/internals/ConsumerCoordinator.java | 14 ++
.../kafka/common/network/KafkaChannel.java | 21 ++-
.../apache/kafka/common/network/Selector.java | 133 ++++++++++++++-----
.../kafka/common/network/SslTransportLayer.java | 8 +-
.../clients/consumer/KafkaConsumerTest.java | 81 +++++++++++
.../kafka/common/network/NioEchoServer.java | 46 +++++--
.../common/network/SslTransportLayerTest.java | 48 ++++++-
.../main/scala/kafka/network/SocketServer.scala | 13 +-
.../unit/kafka/network/SocketServerTest.scala | 15 +++
9 files changed, 321 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e53babab/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 4889872..56f6951 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -63,6 +63,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private static final Logger log = LoggerFactory.getLogger(ConsumerCoordinator.class);
+ private static final long CLOSE_TIMEOUT_MS = 5000;
+
private final List<PartitionAssignor> assignors;
private final Metadata metadata;
private final ConsumerCoordinatorMetrics sensors;
@@ -405,6 +407,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
client.disableWakeups();
try {
maybeAutoCommitOffsetsSync();
+
+ Node coordinator;
+ long endTimeMs = time.milliseconds() + CLOSE_TIMEOUT_MS;
+ while ((coordinator = coordinator()) != null && client.pendingRequestCount(coordinator) > 0) {
+ long remainingTimeMs = endTimeMs - time.milliseconds();
+ if (remainingTimeMs > 0)
+ client.poll(remainingTimeMs);
+ else {
+ log.warn("Close timed out with {} pending requests to coordinator, terminating client connections for group {}.", client.pendingRequestCount(coordinator), groupId);
+ break;
+ }
+ }
} finally {
super.close();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e53babab/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 16002eb..9b05aeb 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -35,15 +35,22 @@ public class KafkaChannel {
private final int maxReceiveSize;
private NetworkReceive receive;
private Send send;
+ // Track connection and mute state of channels to enable outstanding requests on channels to be
+ // processed after the channel is disconnected.
+ private boolean disconnected;
+ private boolean muted;
public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize) throws IOException {
this.id = id;
this.transportLayer = transportLayer;
this.authenticator = authenticator;
this.maxReceiveSize = maxReceiveSize;
+ this.disconnected = false;
+ this.muted = false;
}
public void close() throws IOException {
+ this.disconnected = true;
Utils.closeAll(transportLayer, authenticator);
}
@@ -65,6 +72,7 @@ public class KafkaChannel {
}
public void disconnect() {
+ disconnected = true;
transportLayer.disconnect();
}
@@ -82,15 +90,22 @@ public class KafkaChannel {
}
public void mute() {
- transportLayer.removeInterestOps(SelectionKey.OP_READ);
+ if (!disconnected)
+ transportLayer.removeInterestOps(SelectionKey.OP_READ);
+ muted = true;
}
public void unmute() {
- transportLayer.addInterestOps(SelectionKey.OP_READ);
+ if (!disconnected)
+ transportLayer.addInterestOps(SelectionKey.OP_READ);
+ muted = false;
}
+ /**
+ * Returns true if this channel has been explicitly muted using {@link KafkaChannel#mute()}
+ */
public boolean isMute() {
- return transportLayer.isMute();
+ return muted;
}
public boolean ready() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/e53babab/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 5244710..df35266 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -86,6 +86,7 @@ public class Selector implements Selectable {
private final List<NetworkReceive> completedReceives;
private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
private final Set<SelectionKey> immediatelyConnectedKeys;
+ private final Map<String, KafkaChannel> closingChannels;
private final List<String> disconnected;
private final List<String> connected;
private final List<String> failedSends;
@@ -132,6 +133,7 @@ public class Selector implements Selectable {
this.completedReceives = new ArrayList<>();
this.stagedReceives = new HashMap<>();
this.immediatelyConnectedKeys = new HashSet<>();
+ this.closingChannels = new HashMap<>();
this.connected = new ArrayList<>();
this.disconnected = new ArrayList<>();
this.failedSends = new ArrayList<>();
@@ -237,12 +239,17 @@ public class Selector implements Selectable {
* @param send The request to send
*/
public void send(Send send) {
- KafkaChannel channel = channelOrFail(send.destination());
- try {
- channel.setSend(send);
- } catch (CancelledKeyException e) {
- this.failedSends.add(send.destination());
- close(channel);
+ String connectionId = send.destination();
+ if (closingChannels.containsKey(connectionId))
+ this.failedSends.add(connectionId);
+ else {
+ KafkaChannel channel = channelOrFail(connectionId, false);
+ try {
+ channel.setSend(send);
+ } catch (CancelledKeyException e) {
+ this.failedSends.add(connectionId);
+ close(channel, false);
+ }
}
}
@@ -266,6 +273,11 @@ public class Selector implements Selectable {
* the poll to add the completedReceives. If there are any active channels in the "stagedReceives" we set "timeout" to 0
* and pop response and add to the completedReceives.
*
+ * Atmost one entry is added to "completedReceives" for a channel in each poll. This is necessary to guarantee that
+ * requests from a channel are processed on the broker in the order they are sent. Since outstanding requests added
+ * by SocketServer to the request queue may be processed by different request handler threads, requests on each
+ * channel must be processed one-at-a-time to guarantee ordering.
+ *
* @param timeout The amount of time to wait, in milliseconds, which must be non-negative
* @throws IllegalArgumentException If `timeout` is negative
* @throws IllegalStateException If a send is given for which we have no existing connection or for which there is
@@ -354,10 +366,8 @@ public class Selector implements Selectable {
}
/* cancel any defunct sockets */
- if (!key.isValid()) {
- close(channel);
- this.disconnected.add(channel.id());
- }
+ if (!key.isValid())
+ close(channel, true);
} catch (Exception e) {
String desc = channel.socketDescription();
@@ -365,8 +375,7 @@ public class Selector implements Selectable {
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
- close(channel);
- this.disconnected.add(channel.id());
+ close(channel, true);
}
}
}
@@ -393,7 +402,7 @@ public class Selector implements Selectable {
@Override
public void mute(String id) {
- KafkaChannel channel = channelOrFail(id);
+ KafkaChannel channel = channelOrFail(id, true);
mute(channel);
}
@@ -403,7 +412,7 @@ public class Selector implements Selectable {
@Override
public void unmute(String id) {
- KafkaChannel channel = channelOrFail(id);
+ KafkaChannel channel = channelOrFail(id, true);
unmute(channel);
}
@@ -430,13 +439,13 @@ public class Selector implements Selectable {
Map.Entry<String, Long> expiredConnection = idleExpiryManager.pollExpiredConnection(currentTimeNanos);
if (expiredConnection != null) {
String connectionId = expiredConnection.getKey();
-
- if (log.isTraceEnabled())
- log.trace("About to close the idle connection from {} due to being idle for {} millis",
- connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000);
-
- disconnected.add(connectionId);
- close(connectionId);
+ KafkaChannel channel = this.channels.get(connectionId);
+ if (channel != null) {
+ if (log.isTraceEnabled())
+ log.trace("About to close the idle connection from {} due to being idle for {} millis",
+ connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000);
+ close(channel, true);
+ }
}
}
@@ -448,6 +457,16 @@ public class Selector implements Selectable {
this.completedReceives.clear();
this.connected.clear();
this.disconnected.clear();
+ // Remove closed channels after all their staged receives have been processed or if a send was requested
+ for (Iterator<Map.Entry<String, KafkaChannel>> it = closingChannels.entrySet().iterator(); it.hasNext(); ) {
+ KafkaChannel channel = it.next().getValue();
+ Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
+ boolean sendFailed = failedSends.remove(channel.id());
+ if (deque == null || deque.isEmpty() || sendFailed) {
+ doClose(channel, true);
+ it.remove();
+ }
+ }
this.disconnected.addAll(this.failedSends);
this.failedSends.clear();
}
@@ -476,27 +495,58 @@ public class Selector implements Selectable {
public void close(String id) {
KafkaChannel channel = this.channels.get(id);
if (channel != null)
- close(channel);
+ close(channel, false);
}
/**
- * Begin closing this connection
+ * Begin closing this connection.
+ *
+ * If 'processOutstanding' is true, the channel is disconnected here, but staged receives are
+ * processed. The channel is closed when there are no outstanding receives or if a send
+ * is requested. The channel will be added to disconnect list when it is actually closed.
+ *
+ * If 'processOutstanding' is false, outstanding receives are discarded and the channel is
+ * closed immediately. The channel will not be added to disconnected list and it is the
+ * responsibility of the caller to handle disconnect notifications.
*/
- private void close(KafkaChannel channel) {
+ private void close(KafkaChannel channel, boolean processOutstanding) {
+
+ channel.disconnect();
+
+ // Keep track of closed channels with pending receives so that all received records
+ // may be processed. For example, when producer with acks=0 sends some records and
+ // closes its connections, a single poll() in the broker may receive records and
+ // handle close(). When the remote end closes its connection, the channel is retained until
+ // a send fails or all outstanding receives are processed. Mute state of disconnected channels
+ // are tracked to ensure that requests are processed one-by-one by the broker to preserve ordering.
+ Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
+ if (processOutstanding && deque != null && !deque.isEmpty()) {
+ if (!channel.isMute()) {
+ addToCompletedReceives(channel, deque);
+ if (deque.isEmpty())
+ this.stagedReceives.remove(channel);
+ }
+ closingChannels.put(channel.id(), channel);
+ } else
+ doClose(channel, processOutstanding);
+ this.channels.remove(channel.id());
+
+ if (idleExpiryManager != null)
+ idleExpiryManager.remove(channel.id());
+ }
+
+ private void doClose(KafkaChannel channel, boolean notifyDisconnect) {
try {
channel.close();
} catch (IOException e) {
log.error("Exception closing connection to node {}:", channel.id(), e);
}
- this.stagedReceives.remove(channel);
- this.channels.remove(channel.id());
this.sensors.connectionClosed.record();
-
- if (idleExpiryManager != null)
- idleExpiryManager.remove(channel.id());
+ this.stagedReceives.remove(channel);
+ if (notifyDisconnect)
+ this.disconnected.add(channel.id());
}
-
/**
* check if channel is ready
*/
@@ -506,10 +556,12 @@ public class Selector implements Selectable {
return channel != null && channel.ready();
}
- private KafkaChannel channelOrFail(String id) {
+ private KafkaChannel channelOrFail(String id, boolean maybeClosing) {
KafkaChannel channel = this.channels.get(id);
+ if (channel == null && maybeClosing)
+ channel = this.closingChannels.get(id);
if (channel == null)
- throw new IllegalStateException("Attempt to retrieve channel for which there is no open connection. Connection id " + id + " existing connections " + channels.keySet());
+ throw new IllegalStateException("Attempt to retrieve channel for which there is no connection. Connection id " + id + " existing connections " + channels.keySet());
return channel;
}
@@ -529,6 +581,14 @@ public class Selector implements Selectable {
}
/**
+ * Return the channel with the specified id if it was disconnected, but not yet closed
+ * since there are outstanding messages to be processed.
+ */
+ public KafkaChannel closingChannel(String id) {
+ return closingChannels.get(id);
+ }
+
+ /**
* Get the channel associated with selectionKey
*/
private KafkaChannel channel(SelectionKey key) {
@@ -576,9 +636,7 @@ public class Selector implements Selectable {
KafkaChannel channel = entry.getKey();
if (!channel.isMute()) {
Deque<NetworkReceive> deque = entry.getValue();
- NetworkReceive networkReceive = deque.poll();
- this.completedReceives.add(networkReceive);
- this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
+ addToCompletedReceives(channel, deque);
if (deque.isEmpty())
iter.remove();
}
@@ -586,6 +644,11 @@ public class Selector implements Selectable {
}
}
+ private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) {
+ NetworkReceive networkReceive = stagedDeque.poll();
+ this.completedReceives.add(networkReceive);
+ this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
+ }
private class SelectorMetrics {
private final Metrics metrics;
http://git-wip-us.apache.org/repos/asf/kafka/blob/e53babab/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index ffa980b..9ccf33d 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -447,7 +447,7 @@ public class SslTransportLayer implements TransportLayer {
netReadBuffer = Utils.ensureCapacity(netReadBuffer, netReadBufferSize());
if (netReadBuffer.remaining() > 0) {
int netread = socketChannel.read(netReadBuffer);
- if (netread == 0 && netReadBuffer.position() == 0) return netread;
+ if (netread == 0 && netReadBuffer.position() == 0) return read;
else if (netread < 0) throw new EOFException("EOF during read");
}
do {
@@ -488,7 +488,11 @@ public class SslTransportLayer implements TransportLayer {
}
break;
} else if (unwrapResult.getStatus() == Status.CLOSED) {
- throw new EOFException();
+ // If data has been read and unwrapped, return the data. Close will be handled on the next poll.
+ if (appReadBuffer.position() == 0 && read == 0)
+ throw new EOFException();
+ else
+ break;
}
} while (netReadBuffer.position() != 0);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e53babab/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index fa30a4b..e34f438 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -72,6 +72,11 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
@@ -831,6 +836,7 @@ public class KafkaConsumerTest {
assertTrue(consumer.subscription().isEmpty());
assertTrue(consumer.assignment().isEmpty());
+ client.requests().clear();
consumer.close();
}
@@ -905,6 +911,7 @@ public class KafkaConsumerTest {
for (ClientRequest req: client.requests())
assertTrue(req.header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
+ client.requests().clear();
consumer.close();
}
@@ -969,6 +976,7 @@ public class KafkaConsumerTest {
// verify that the offset commits occurred as expected
assertTrue(commitReceived.get());
+ client.requests().clear();
consumer.close();
}
@@ -1032,6 +1040,7 @@ public class KafkaConsumerTest {
for (ClientRequest req : client.requests())
assertTrue(req.header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
+ client.requests().clear();
consumer.close();
}
@@ -1067,6 +1076,78 @@ public class KafkaConsumerTest {
}
}
+ @Test
+ public void testGracefulClose() throws Exception {
+ consumerCloseTest(true);
+ }
+
+ @Test
+ public void testCloseTimeout() throws Exception {
+ consumerCloseTest(false);
+ }
+
+ private void consumerCloseTest(boolean graceful) throws Exception {
+ int rebalanceTimeoutMs = 60000;
+ int sessionTimeoutMs = 30000;
+ int heartbeatIntervalMs = 5000;
+
+ Time time = new MockTime();
+ Cluster cluster = TestUtils.singletonCluster(topic, 1);
+ Node node = cluster.nodes().get(0);
+
+ Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+ metadata.update(cluster, time.milliseconds());
+
+ MockClient client = new MockClient(time);
+ client.setNode(node);
+ PartitionAssignor assignor = new RoundRobinAssignor();
+
+ final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+ rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, 1000);
+
+ consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
+ Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
+
+ // Poll with responses
+ client.prepareResponseFrom(fetchResponse(tp0, 0, 1), node);
+ client.prepareResponseFrom(fetchResponse(tp0, 1, 0), node);
+ consumer.poll(0);
+
+ // Initiate close() after a commit request on another thread.
+ // Kafka consumer is single-threaded, but the implementation allows calls on a
+ // different thread as long as the calls are not executed concurrently. So this is safe.
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ Future<?> future = executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ consumer.commitAsync();
+ consumer.close();
+ }
+ });
+
+ // Close task should not complete until commit succeeds or close times out
+ try {
+ future.get(100, TimeUnit.MILLISECONDS);
+ fail("Close completed without waiting for commit response");
+ } catch (TimeoutException e) {
+ // Expected exception
+ }
+
+ // In graceful mode, commit response results in close() completing immediately without a timeout
+ // In non-graceful mode, close() times out without an exception even though commit response is pending
+ if (graceful) {
+ Map<TopicPartition, Short> response = new HashMap<>();
+ response.put(tp0, Errors.NONE.code());
+ client.respondFrom(offsetCommitResponse(response), coordinator);
+ } else
+ time.sleep(5000);
+ future.get(500, TimeUnit.MILLISECONDS); // Should succeed without TimeoutException or ExecutionException
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
private ConsumerRebalanceListener getConsumerRebalanceListener(final KafkaConsumer<String, String> consumer) {
return new ConsumerRebalanceListener() {
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/e53babab/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index e99a399..cc4befa 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -14,15 +14,16 @@ package org.apache.kafka.common.network;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.SecurityProtocol;
@@ -40,7 +41,7 @@ public class NioEchoServer extends Thread {
private final List<SocketChannel> socketChannels;
private final AcceptorThread acceptorThread;
private final Selector selector;
- private final ConcurrentLinkedQueue<NetworkSend> inflightSends = new ConcurrentLinkedQueue<NetworkSend>();
+ private volatile WritableByteChannel outputChannel;
public NioEchoServer(SecurityProtocol securityProtocol, Map<String, ?> configs, String serverHost) throws Exception {
serverSocketChannel = ServerSocketChannel.open();
@@ -72,22 +73,23 @@ public class NioEchoServer extends Thread {
socketChannels.add(socketChannel);
}
newChannels.clear();
- while (true) {
- NetworkSend send = inflightSends.peek();
- if (send != null && !selector.channel(send.destination()).hasSend()) {
- send = inflightSends.poll();
- selector.send(send);
- } else
- break;
- }
+
List<NetworkReceive> completedReceives = selector.completedReceives();
for (NetworkReceive rcv : completedReceives) {
+ KafkaChannel channel = channel(rcv.source());
+ channel.mute();
NetworkSend send = new NetworkSend(rcv.source(), rcv.payload());
- if (!selector.channel(send.destination()).hasSend())
+ if (outputChannel == null)
selector.send(send);
- else
- inflightSends.add(send);
+ else {
+ for (ByteBuffer buffer : send.buffers)
+ outputChannel.write(buffer);
+ channel.unmute();
+ }
}
+ for (Send send : selector.completedSends())
+ selector.unmute(send.destination());
+
}
} catch (IOException e) {
// ignore
@@ -99,6 +101,24 @@ public class NioEchoServer extends Thread {
channel.socket().getInetAddress().getHostAddress() + ":" + channel.socket().getPort();
}
+ private KafkaChannel channel(String id) {
+ KafkaChannel channel = selector.channel(id);
+ return channel == null ? selector.closingChannel(id) : channel;
+ }
+
+ /**
+ * Sets the output channel to which messages received on this server are echoed.
+ * This is useful in tests where the clients sending the messages don't receive
+ * the responses (eg. testing graceful close).
+ */
+ public void outputChannel(WritableByteChannel channel) {
+ this.outputChannel = channel;
+ }
+
+ public Selector selector() {
+ return selector;
+ }
+
public void closeConnections() throws IOException {
for (SocketChannel channel : socketChannels)
channel.close();
http://git-wip-us.apache.org/repos/asf/kafka/blob/e53babab/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index a044dc9..01d8a25 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -16,10 +16,12 @@ import static org.junit.Assert.fail;
import java.util.Arrays;
import java.util.Map;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
@@ -32,6 +34,8 @@ import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
import org.apache.kafka.common.config.types.Password;
import org.junit.After;
import org.junit.Before;
@@ -390,7 +394,49 @@ public class SslTransportLayerTest {
NetworkTestUtils.checkClientConnection(selector, node, 64000, 10);
}
-
+
+ @Test
+ public void testCloseSsl() throws Exception {
+ testClose(SecurityProtocol.SSL, new SslChannelBuilder(Mode.CLIENT));
+ }
+
+ @Test
+ public void testClosePlaintext() throws Exception {
+ testClose(SecurityProtocol.PLAINTEXT, new PlaintextChannelBuilder());
+ }
+
+ private void testClose(SecurityProtocol securityProtocol, ChannelBuilder clientChannelBuilder) throws Exception {
+ String node = "0";
+ server = NetworkTestUtils.createEchoServer(securityProtocol, sslServerConfigs);
+ clientChannelBuilder.configure(sslClientConfigs);
+ this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", clientChannelBuilder);
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ NetworkTestUtils.waitForChannelReady(selector, node);
+
+ final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ server.outputChannel(Channels.newChannel(bytesOut));
+ server.selector().muteAll();
+ byte[] message = TestUtils.randomString(100).getBytes();
+ int count = 20;
+ final int totalSendSize = count * (message.length + 4);
+ for (int i = 0; i < count; i++) {
+ selector.send(new NetworkSend(node, ByteBuffer.wrap(message)));
+ do {
+ selector.poll(0L);
+ } while (selector.completedSends().isEmpty());
+ }
+ server.selector().unmuteAll();
+ selector.close(node);
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return bytesOut.toByteArray().length == totalSendSize;
+ }
+ }, 5000, "All requests sent were not processed");
+ }
+
private void createSelector(Map<String, Object> sslClientConfigs) {
createSelector(sslClientConfigs, null, null, null);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e53babab/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 90a0fee..e98445f 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -443,7 +443,9 @@ private[kafka] class Processor(val id: Int,
// that are sitting in the server's socket buffer
curr.request.updateRequestMetrics
trace("Socket server received empty response to send, registering for read: " + curr)
- selector.unmute(curr.request.connectionId)
+ val channelId = curr.request.connectionId
+ if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null)
+ selector.unmute(channelId)
case RequestChannel.SendAction =>
sendResponse(curr)
case RequestChannel.CloseConnectionAction =>
@@ -486,9 +488,12 @@ private[kafka] class Processor(val id: Int,
private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>
try {
- val channel = selector.channel(receive.source)
- val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
- channel.socketAddress)
+ val openChannel = selector.channel(receive.source)
+ val session = {
+ // Only methods that are safe to call on a disconnected channel should be invoked on 'channel'.
+ val channel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
+ RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress)
+ }
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
requestChannel.sendRequest(req)
selector.mute(receive.source)
http://git-wip-us.apache.org/repos/asf/kafka/blob/e53babab/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index f7b5da5..317f3d6 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -163,6 +163,21 @@ class SocketServerTest extends JUnitSuite {
}
@Test
+ def testGracefulClose() {
+ val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
+ val serializedBytes = producerRequestBytes
+
+ for (i <- 0 until 10)
+ sendRequest(plainSocket, serializedBytes)
+ plainSocket.close()
+ for (i <- 0 until 10) {
+ val request = server.requestChannel.receiveRequest(2000)
+ assertNotNull("receiveRequest timed out", request)
+ server.requestChannel.noOperation(request.processor, request)
+ }
+ }
+
+ @Test
def testSocketsCloseOnShutdown() {
// open a connection
val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)