You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/05/11 20:11:38 UTC

kafka git commit: MINOR: Improve handling of channel close exception

Repository: kafka
Updated Branches:
  refs/heads/trunk bd8681cdd -> b28bc57a1


MINOR: Improve handling of channel close exception

Propagate IOException in SslTransportLayer channel.close to be consistent with PlaintextTransportLayer,  close authenticator on channel close even if transport layer close fails.

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #1370 from rajinisivaram/minor-channelclose2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b28bc57a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b28bc57a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b28bc57a

Branch: refs/heads/trunk
Commit: b28bc57a1fdb9b56c89c3cb9c3df60afbeda521c
Parents: bd8681c
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Wed May 11 21:11:17 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed May 11 21:11:17 2016 +0100

----------------------------------------------------------------------
 .../kafka/common/network/Authenticator.java     | 10 +--
 .../kafka/common/network/KafkaChannel.java      |  5 +-
 .../kafka/common/network/SslTransportLayer.java |  9 +--
 .../org/apache/kafka/common/utils/Utils.java    | 23 ++++++
 .../apache/kafka/common/utils/UtilsTest.java    | 80 ++++++++++++++++++++
 5 files changed, 112 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b28bc57a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
index 6f01fe5..0012f15 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.common.network;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.Map;
 import java.security.Principal;
@@ -27,7 +28,7 @@ import org.apache.kafka.common.KafkaException;
 /**
  * Authentication for Channel
  */
-public interface Authenticator {
+public interface Authenticator extends Closeable {
 
     /**
      * Configures Authenticator using the provided parameters.
@@ -54,11 +55,4 @@ public interface Authenticator {
      */
     boolean complete();
 
-    /**
-     * Closes this Authenticator
-     *
-     * @throws IOException if any I/O error occurs
-     */
-    void close() throws IOException;
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b28bc57a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index f72f91b..16002eb 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -26,6 +26,8 @@ import java.nio.channels.SelectionKey;
 
 import java.security.Principal;
 
+import org.apache.kafka.common.utils.Utils;
+
 public class KafkaChannel {
     private final String id;
     private final TransportLayer transportLayer;
@@ -42,8 +44,7 @@ public class KafkaChannel {
     }
 
     public void close() throws IOException {
-        transportLayer.close();
-        authenticator.close();
+        Utils.closeAll(transportLayer, authenticator);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/b28bc57a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index d18d6b7..cfd618d 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -141,7 +141,7 @@ public class SslTransportLayer implements TransportLayer {
     * Sends a SSL close message and closes socketChannel.
     */
     @Override
-    public void close() {
+    public void close() throws IOException {
         if (closing) return;
         closing = true;
         sslEngine.closeOutbound();
@@ -168,12 +168,11 @@ public class SslTransportLayer implements TransportLayer {
             try {
                 socketChannel.socket().close();
                 socketChannel.close();
-            } catch (IOException e) {
-                log.warn("Failed to close SSL socket channel: " + e);
+            } finally {
+                key.attach(null);
+                key.cancel();
             }
         }
-        key.attach(null);
-        key.cancel();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/b28bc57a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index bd173ed..e740618 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -14,6 +14,7 @@ package org.apache.kafka.common.utils;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.Closeable;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.OutputStream;
@@ -676,4 +677,26 @@ public class Utils {
         }
     }
 
+    /**
+     * Closes all the provided closeables.
+     * @throws IOException if any of the close methods throws an IOException.
+     *         The first IOException is thrown with subsequent exceptions
+     *         added as suppressed exceptions.
+     */
+    public static void closeAll(Closeable... closeables) throws IOException {
+        IOException exception = null;
+        for (Closeable closeable : closeables) {
+            try {
+                closeable.close();
+            } catch (IOException e) {
+                if (exception != null)
+                    exception.addSuppressed(e);
+                else
+                    exception = e;
+            }
+        }
+        if (exception != null)
+            throw exception;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b28bc57a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 1078578..1af7e43 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -18,6 +18,8 @@ package org.apache.kafka.common.utils;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.io.Closeable;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.junit.Test;
@@ -26,6 +28,8 @@ import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
 import static org.apache.kafka.common.utils.Utils.formatAddress;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class UtilsTest {
 
@@ -114,4 +118,80 @@ public class UtilsTest {
         assertEquals(1, Utils.min(2, 1, 3));
         assertEquals(1, Utils.min(2, 3, 1));
     }
+
+    @Test
+    public void testCloseAll() {
+        TestCloseable[] closeablesWithoutException = TestCloseable.createCloseables(false, false, false);
+        try {
+            Utils.closeAll(closeablesWithoutException);
+            TestCloseable.checkClosed(closeablesWithoutException);
+        } catch (IOException e) {
+            fail("Unexpected exception: " + e);
+        }
+
+        TestCloseable[] closeablesWithException = TestCloseable.createCloseables(true, true, true);
+        try {
+            Utils.closeAll(closeablesWithException);
+            fail("Expected exception not thrown");
+        } catch (IOException e) {
+            TestCloseable.checkClosed(closeablesWithException);
+            TestCloseable.checkException(e, closeablesWithException);
+        }
+
+        TestCloseable[] singleExceptionCloseables = TestCloseable.createCloseables(false, true, false);
+        try {
+            Utils.closeAll(singleExceptionCloseables);
+            fail("Expected exception not thrown");
+        } catch (IOException e) {
+            TestCloseable.checkClosed(singleExceptionCloseables);
+            TestCloseable.checkException(e, singleExceptionCloseables[1]);
+        }
+
+        TestCloseable[] mixedCloseables = TestCloseable.createCloseables(false, true, false, true, true);
+        try {
+            Utils.closeAll(mixedCloseables);
+            fail("Expected exception not thrown");
+        } catch (IOException e) {
+            TestCloseable.checkClosed(mixedCloseables);
+            TestCloseable.checkException(e, mixedCloseables[1], mixedCloseables[3], mixedCloseables[4]);
+        }
+    }
+
+    private static class TestCloseable implements Closeable {
+        private final int id;
+        private final IOException closeException;
+        private boolean closed;
+
+        TestCloseable(int id, boolean exceptionOnClose) {
+            this.id = id;
+            this.closeException = exceptionOnClose ? new IOException("Test close exception " + id) : null;
+        }
+
+        @Override
+        public void close() throws IOException {
+            closed = true;
+            if (closeException != null)
+                throw closeException;
+        }
+
+        static TestCloseable[] createCloseables(boolean... exceptionOnClose) {
+            TestCloseable[] closeables = new TestCloseable[exceptionOnClose.length];
+            for (int i = 0; i < closeables.length; i++)
+                closeables[i] = new TestCloseable(i, exceptionOnClose[i]);
+            return closeables;
+        }
+
+        static void checkClosed(TestCloseable... closeables) {
+            for (TestCloseable closeable : closeables)
+                assertTrue("Close not invoked for " + closeable.id, closeable.closed);
+        }
+
+        static void checkException(IOException e, TestCloseable... closeablesWithException) {
+            assertEquals(closeablesWithException[0].closeException, e);
+            Throwable[] suppressed = e.getSuppressed();
+            assertEquals(closeablesWithException.length - 1, suppressed.length);
+            for (int i = 1; i < closeablesWithException.length; i++)
+                assertEquals(closeablesWithException[i].closeException, suppressed[i - 1]);
+        }
+    }
 }