You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2015/07/09 12:54:10 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5368 - improve nio ssl handshake performance

Repository: activemq
Updated Branches:
  refs/heads/master 3985e7225 -> 52e452712


https://issues.apache.org/jira/browse/AMQ-5368 - improve nio ssl handshake performance


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/52e45271
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/52e45271
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/52e45271

Branch: refs/heads/master
Commit: 52e45271254af25da21c033265e45b8c55c3ddc5
Parents: 3985e72
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Thu Jul 9 12:53:38 2015 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Thu Jul 9 12:53:58 2015 +0200

----------------------------------------------------------------------
 .../activemq/transport/nio/NIOSSLTransport.java | 66 ++++++++++++-----
 .../activemq/transport/nio/NIOSSLLoadTest.java  | 75 ++++++++++++++++++--
 2 files changed, 117 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/52e45271/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
index 33f529e..5b8f869 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
@@ -22,15 +22,19 @@ import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.net.Socket;
+import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
 import java.security.cert.X509Certificate;
 
 import javax.net.SocketFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
 import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.net.ssl.SSLSession;
 
@@ -298,7 +302,7 @@ public class NIOSSLTransport extends NIOTransport {
         if (!(inputBuffer.position() != 0 && inputBuffer.hasRemaining()) || status == SSLEngineResult.Status.BUFFER_UNDERFLOW) {
             int bytesRead = channel.read(inputBuffer);
 
-            if (bytesRead == 0) {
+            if (bytesRead == 0 && !(sslEngine.getHandshakeStatus().equals(SSLEngineResult.HandshakeStatus.NEED_UNWRAP))) {
                 return 0;
             }
 
@@ -341,25 +345,51 @@ public class NIOSSLTransport extends NIOTransport {
 
     protected void doHandshake() throws Exception {
         handshakeInProgress = true;
-        while (true) {
-            switch (sslEngine.getHandshakeStatus()) {
-            case NEED_UNWRAP:
-                secureRead(ByteBuffer.allocate(sslSession.getApplicationBufferSize()));
-                break;
-            case NEED_TASK:
-                Runnable task;
-                while ((task = sslEngine.getDelegatedTask()) != null) {
-                    taskRunnerFactory.execute(task);
+        Selector selector = null;
+        SelectionKey key = null;
+        boolean readable = true;
+        int timeout = 100;
+        try {
+            while (true) {
+                HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus();
+                switch (handshakeStatus) {
+                    case NEED_UNWRAP:
+                        if (readable) {
+                            secureRead(ByteBuffer.allocate(sslSession.getApplicationBufferSize()));
+                        }
+                        if (this.status == SSLEngineResult.Status.BUFFER_UNDERFLOW) {
+                            long now = System.currentTimeMillis();
+                            if (selector == null) {
+                                selector = Selector.open();
+                                key = channel.register(selector, SelectionKey.OP_READ);
+                            } else {
+                                key.interestOps(SelectionKey.OP_READ);
+                            }
+                            int keyCount = selector.select(timeout);
+                            if (keyCount == 0 && ((System.currentTimeMillis() - now) >= timeout)) {
+                                throw new SocketTimeoutException("Timeout during handshake");
+                            }
+                            readable = key.isReadable();
+                        }
+                        break;
+                    case NEED_TASK:
+                        Runnable task;
+                        while ((task = sslEngine.getDelegatedTask()) != null) {
+                            task.run();
+                        }
+                        break;
+                    case NEED_WRAP:
+                        ((NIOOutputStream) buffOut).write(ByteBuffer.allocate(0));
+                        break;
+                    case FINISHED:
+                    case NOT_HANDSHAKING:
+                        finishHandshake();
+                        return;
                 }
-                break;
-            case NEED_WRAP:
-                ((NIOOutputStream) buffOut).write(ByteBuffer.allocate(0));
-                break;
-            case FINISHED:
-            case NOT_HANDSHAKING:
-                finishHandshake();
-                return;
             }
+        } finally {
+            if (key!=null) try {key.cancel();} catch (Exception ignore) {}
+            if (selector!=null) try {selector.close();} catch (Exception ignore) {}
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/52e45271/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOSSLLoadTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOSSLLoadTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOSSLLoadTest.java
index 698945c..4751c9f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOSSLLoadTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOSSLLoadTest.java
@@ -23,12 +23,28 @@ import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.util.ConsumerThread;
 import org.apache.activemq.util.ProducerThread;
 import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.jms.Connection;
 import javax.jms.Queue;
 import javax.jms.Session;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
-public class NIOSSLLoadTest extends TestCase {
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class NIOSSLLoadTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NIOSSLLoadTest.class);
 
     BrokerService broker;
     Connection connection;
@@ -44,9 +60,10 @@ public class NIOSSLLoadTest extends TestCase {
     public static final int MESSAGE_COUNT = 1000;
 
     final ConsumerThread[] consumers = new ConsumerThread[CONSUMER_COUNT];
+    TransportConnector connector;
 
-    @Override
-    protected void setUp() throws Exception {
+    @Before
+    public void setUp() throws Exception {
         System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
         System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
         System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
@@ -57,7 +74,7 @@ public class NIOSSLLoadTest extends TestCase {
         broker = new BrokerService();
         broker.setPersistent(false);
         broker.setUseJmx(false);
-        TransportConnector connector = broker.addConnector("nio+ssl://localhost:0?transport.needClientAuth=true&transport.enabledCipherSuites=SSL_RSA_WITH_RC4_128_SHA,SSL_DH_anon_WITH_3DES_EDE_CBC_SHA");
+        connector = broker.addConnector("nio+ssl://localhost:0?transport.needClientAuth=true&transport.enabledCipherSuites=SSL_RSA_WITH_RC4_128_SHA,SSL_DH_anon_WITH_3DES_EDE_CBC_SHA");
         broker.start();
         broker.waitUntilStarted();
 
@@ -67,8 +84,8 @@ public class NIOSSLLoadTest extends TestCase {
         connection.start();
     }
 
-    @Override
-    protected void tearDown() throws Exception {
+    @After
+    public void tearDown() throws Exception {
         if (connection != null) {
             connection.close();
         }
@@ -79,6 +96,7 @@ public class NIOSSLLoadTest extends TestCase {
         }
     }
 
+    @Test
     public void testLoad() throws Exception {
         Queue dest = session.createQueue("TEST");
         for (int i = 0; i < PRODUCER_COUNT; i++) {
@@ -104,6 +122,51 @@ public class NIOSSLLoadTest extends TestCase {
 
     }
 
+    @Test(timeout=360000)
+    @Ignore
+    public void testConnectionHandshakeLoad() throws Exception {
+        final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("nio+ssl://localhost:" + connector.getConnectUri().getPort());
+        int threadNumber = 500;
+        final CountDownLatch latch = new CountDownLatch(threadNumber);
+        final AtomicInteger errors = new AtomicInteger(0);
+        final Random rand = new Random();
+        for (int i = 0; i < threadNumber; i++) {
+            Thread thread = new Thread("thread " + i) {
+                @Override
+                public void run() {
+                    for (int i = 0; i < 100; i++) {
+                        Connection conn = null;
+                        try {
+                            conn = factory.createConnection();
+                            conn.start();
+                            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                            int sleepTime = rand.nextInt((3000 - 1000) + 1) + 1000;
+                            LOG.info(getName() + " sleeping " + sleepTime);
+                            Thread.sleep(sleepTime);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                            errors.incrementAndGet();
+                        }  finally {
+                            try {
+                                conn.close();
+                            } catch (Exception e) {}
+                            LOG.info(getName() + " iteration " + i);
+                        }
+                    }
+
+                    LOG.info(getName() + " finished");
+                    latch.countDown();
+                }
+            };
+            thread.start();
+        }
+
+        latch.await(5, TimeUnit.MINUTES);
+
+        LOG.info("errors " + errors.get());
+
+    }
+
     protected int getReceived() {
         int received = 0;
         for (ConsumerThread consumer : consumers) {