You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/05/11 20:11:38 UTC
kafka git commit: MINOR: Improve handling of channel close exception
Repository: kafka
Updated Branches:
refs/heads/trunk bd8681cdd -> b28bc57a1
MINOR: Improve handling of channel close exception
Propagate IOException in SslTransportLayer channel.close to be consistent with PlaintextTransportLayer, close authenticator on channel close even if transport layer close fails.
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #1370 from rajinisivaram/minor-channelclose2
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b28bc57a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b28bc57a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b28bc57a
Branch: refs/heads/trunk
Commit: b28bc57a1fdb9b56c89c3cb9c3df60afbeda521c
Parents: bd8681c
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Wed May 11 21:11:17 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed May 11 21:11:17 2016 +0100
----------------------------------------------------------------------
.../kafka/common/network/Authenticator.java | 10 +--
.../kafka/common/network/KafkaChannel.java | 5 +-
.../kafka/common/network/SslTransportLayer.java | 9 +--
.../org/apache/kafka/common/utils/Utils.java | 23 ++++++
.../apache/kafka/common/utils/UtilsTest.java | 80 ++++++++++++++++++++
5 files changed, 112 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b28bc57a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
index 6f01fe5..0012f15 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.network;
+import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.security.Principal;
@@ -27,7 +28,7 @@ import org.apache.kafka.common.KafkaException;
/**
* Authentication for Channel
*/
-public interface Authenticator {
+public interface Authenticator extends Closeable {
/**
* Configures Authenticator using the provided parameters.
@@ -54,11 +55,4 @@ public interface Authenticator {
*/
boolean complete();
- /**
- * Closes this Authenticator
- *
- * @throws IOException if any I/O error occurs
- */
- void close() throws IOException;
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b28bc57a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index f72f91b..16002eb 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -26,6 +26,8 @@ import java.nio.channels.SelectionKey;
import java.security.Principal;
+import org.apache.kafka.common.utils.Utils;
+
public class KafkaChannel {
private final String id;
private final TransportLayer transportLayer;
@@ -42,8 +44,7 @@ public class KafkaChannel {
}
public void close() throws IOException {
- transportLayer.close();
- authenticator.close();
+ Utils.closeAll(transportLayer, authenticator);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/b28bc57a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index d18d6b7..cfd618d 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -141,7 +141,7 @@ public class SslTransportLayer implements TransportLayer {
* Sends a SSL close message and closes socketChannel.
*/
@Override
- public void close() {
+ public void close() throws IOException {
if (closing) return;
closing = true;
sslEngine.closeOutbound();
@@ -168,12 +168,11 @@ public class SslTransportLayer implements TransportLayer {
try {
socketChannel.socket().close();
socketChannel.close();
- } catch (IOException e) {
- log.warn("Failed to close SSL socket channel: " + e);
+ } finally {
+ key.attach(null);
+ key.cancel();
}
}
- key.attach(null);
- key.cancel();
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/b28bc57a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index bd173ed..e740618 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -14,6 +14,7 @@ package org.apache.kafka.common.utils;
import java.io.IOException;
import java.io.InputStream;
+import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.OutputStream;
@@ -676,4 +677,26 @@ public class Utils {
}
}
+ /**
+ * Closes all the provided closeables.
+ * @throws IOException if any of the close methods throws an IOException.
+ * The first IOException is thrown with subsequent exceptions
+ * added as suppressed exceptions.
+ */
+ public static void closeAll(Closeable... closeables) throws IOException {
+ IOException exception = null;
+ for (Closeable closeable : closeables) {
+ try {
+ closeable.close();
+ } catch (IOException e) {
+ if (exception != null)
+ exception.addSuppressed(e);
+ else
+ exception = e;
+ }
+ }
+ if (exception != null)
+ throw exception;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b28bc57a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 1078578..1af7e43 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -18,6 +18,8 @@ package org.apache.kafka.common.utils;
import java.util.Arrays;
import java.util.Collections;
+import java.io.Closeable;
+import java.io.IOException;
import java.nio.ByteBuffer;
import org.junit.Test;
@@ -26,6 +28,8 @@ import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
import static org.apache.kafka.common.utils.Utils.formatAddress;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class UtilsTest {
@@ -114,4 +118,80 @@ public class UtilsTest {
assertEquals(1, Utils.min(2, 1, 3));
assertEquals(1, Utils.min(2, 3, 1));
}
+
+ @Test
+ public void testCloseAll() {
+ TestCloseable[] closeablesWithoutException = TestCloseable.createCloseables(false, false, false);
+ try {
+ Utils.closeAll(closeablesWithoutException);
+ TestCloseable.checkClosed(closeablesWithoutException);
+ } catch (IOException e) {
+ fail("Unexpected exception: " + e);
+ }
+
+ TestCloseable[] closeablesWithException = TestCloseable.createCloseables(true, true, true);
+ try {
+ Utils.closeAll(closeablesWithException);
+ fail("Expected exception not thrown");
+ } catch (IOException e) {
+ TestCloseable.checkClosed(closeablesWithException);
+ TestCloseable.checkException(e, closeablesWithException);
+ }
+
+ TestCloseable[] singleExceptionCloseables = TestCloseable.createCloseables(false, true, false);
+ try {
+ Utils.closeAll(singleExceptionCloseables);
+ fail("Expected exception not thrown");
+ } catch (IOException e) {
+ TestCloseable.checkClosed(singleExceptionCloseables);
+ TestCloseable.checkException(e, singleExceptionCloseables[1]);
+ }
+
+ TestCloseable[] mixedCloseables = TestCloseable.createCloseables(false, true, false, true, true);
+ try {
+ Utils.closeAll(mixedCloseables);
+ fail("Expected exception not thrown");
+ } catch (IOException e) {
+ TestCloseable.checkClosed(mixedCloseables);
+ TestCloseable.checkException(e, mixedCloseables[1], mixedCloseables[3], mixedCloseables[4]);
+ }
+ }
+
+ private static class TestCloseable implements Closeable {
+ private final int id;
+ private final IOException closeException;
+ private boolean closed;
+
+ TestCloseable(int id, boolean exceptionOnClose) {
+ this.id = id;
+ this.closeException = exceptionOnClose ? new IOException("Test close exception " + id) : null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ closed = true;
+ if (closeException != null)
+ throw closeException;
+ }
+
+ static TestCloseable[] createCloseables(boolean... exceptionOnClose) {
+ TestCloseable[] closeables = new TestCloseable[exceptionOnClose.length];
+ for (int i = 0; i < closeables.length; i++)
+ closeables[i] = new TestCloseable(i, exceptionOnClose[i]);
+ return closeables;
+ }
+
+ static void checkClosed(TestCloseable... closeables) {
+ for (TestCloseable closeable : closeables)
+ assertTrue("Close not invoked for " + closeable.id, closeable.closed);
+ }
+
+ static void checkException(IOException e, TestCloseable... closeablesWithException) {
+ assertEquals(closeablesWithException[0].closeException, e);
+ Throwable[] suppressed = e.getSuppressed();
+ assertEquals(closeablesWithException.length - 1, suppressed.length);
+ for (int i = 1; i < closeablesWithException.length; i++)
+ assertEquals(closeablesWithException[i].closeException, suppressed[i - 1]);
+ }
+ }
}