You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2015/07/09 12:54:10 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5368 -
improve nio ssl handshake performance
Repository: activemq
Updated Branches:
refs/heads/master 3985e7225 -> 52e452712
https://issues.apache.org/jira/browse/AMQ-5368 - improve nio ssl handshake performance
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/52e45271
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/52e45271
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/52e45271
Branch: refs/heads/master
Commit: 52e45271254af25da21c033265e45b8c55c3ddc5
Parents: 3985e72
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Thu Jul 9 12:53:38 2015 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Thu Jul 9 12:53:58 2015 +0200
----------------------------------------------------------------------
.../activemq/transport/nio/NIOSSLTransport.java | 66 ++++++++++++-----
.../activemq/transport/nio/NIOSSLLoadTest.java | 75 ++++++++++++++++++--
2 files changed, 117 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/52e45271/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
index 33f529e..5b8f869 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
@@ -22,15 +22,19 @@ import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
+import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
import java.security.cert.X509Certificate;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
@@ -298,7 +302,7 @@ public class NIOSSLTransport extends NIOTransport {
if (!(inputBuffer.position() != 0 && inputBuffer.hasRemaining()) || status == SSLEngineResult.Status.BUFFER_UNDERFLOW) {
int bytesRead = channel.read(inputBuffer);
- if (bytesRead == 0) {
+ if (bytesRead == 0 && !(sslEngine.getHandshakeStatus().equals(SSLEngineResult.HandshakeStatus.NEED_UNWRAP))) {
return 0;
}
@@ -341,25 +345,51 @@ public class NIOSSLTransport extends NIOTransport {
protected void doHandshake() throws Exception {
handshakeInProgress = true;
- while (true) {
- switch (sslEngine.getHandshakeStatus()) {
- case NEED_UNWRAP:
- secureRead(ByteBuffer.allocate(sslSession.getApplicationBufferSize()));
- break;
- case NEED_TASK:
- Runnable task;
- while ((task = sslEngine.getDelegatedTask()) != null) {
- taskRunnerFactory.execute(task);
+ Selector selector = null;
+ SelectionKey key = null;
+ boolean readable = true;
+ int timeout = 100;
+ try {
+ while (true) {
+ HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus();
+ switch (handshakeStatus) {
+ case NEED_UNWRAP:
+ if (readable) {
+ secureRead(ByteBuffer.allocate(sslSession.getApplicationBufferSize()));
+ }
+ if (this.status == SSLEngineResult.Status.BUFFER_UNDERFLOW) {
+ long now = System.currentTimeMillis();
+ if (selector == null) {
+ selector = Selector.open();
+ key = channel.register(selector, SelectionKey.OP_READ);
+ } else {
+ key.interestOps(SelectionKey.OP_READ);
+ }
+ int keyCount = selector.select(timeout);
+ if (keyCount == 0 && ((System.currentTimeMillis() - now) >= timeout)) {
+ throw new SocketTimeoutException("Timeout during handshake");
+ }
+ readable = key.isReadable();
+ }
+ break;
+ case NEED_TASK:
+ Runnable task;
+ while ((task = sslEngine.getDelegatedTask()) != null) {
+ task.run();
+ }
+ break;
+ case NEED_WRAP:
+ ((NIOOutputStream) buffOut).write(ByteBuffer.allocate(0));
+ break;
+ case FINISHED:
+ case NOT_HANDSHAKING:
+ finishHandshake();
+ return;
}
- break;
- case NEED_WRAP:
- ((NIOOutputStream) buffOut).write(ByteBuffer.allocate(0));
- break;
- case FINISHED:
- case NOT_HANDSHAKING:
- finishHandshake();
- return;
}
+ } finally {
+ if (key!=null) try {key.cancel();} catch (Exception ignore) {}
+ if (selector!=null) try {selector.close();} catch (Exception ignore) {}
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/52e45271/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOSSLLoadTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOSSLLoadTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOSSLLoadTest.java
index 698945c..4751c9f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOSSLLoadTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOSSLLoadTest.java
@@ -23,12 +23,28 @@ import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.util.ConsumerThread;
import org.apache.activemq.util.ProducerThread;
import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.Queue;
import javax.jms.Session;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
-public class NIOSSLLoadTest extends TestCase {
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class NIOSSLLoadTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NIOSSLLoadTest.class);
BrokerService broker;
Connection connection;
@@ -44,9 +60,10 @@ public class NIOSSLLoadTest extends TestCase {
public static final int MESSAGE_COUNT = 1000;
final ConsumerThread[] consumers = new ConsumerThread[CONSUMER_COUNT];
+ TransportConnector connector;
- @Override
- protected void setUp() throws Exception {
+ @Before
+ public void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
@@ -57,7 +74,7 @@ public class NIOSSLLoadTest extends TestCase {
broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
- TransportConnector connector = broker.addConnector("nio+ssl://localhost:0?transport.needClientAuth=true&transport.enabledCipherSuites=SSL_RSA_WITH_RC4_128_SHA,SSL_DH_anon_WITH_3DES_EDE_CBC_SHA");
+ connector = broker.addConnector("nio+ssl://localhost:0?transport.needClientAuth=true&transport.enabledCipherSuites=SSL_RSA_WITH_RC4_128_SHA,SSL_DH_anon_WITH_3DES_EDE_CBC_SHA");
broker.start();
broker.waitUntilStarted();
@@ -67,8 +84,8 @@ public class NIOSSLLoadTest extends TestCase {
connection.start();
}
- @Override
- protected void tearDown() throws Exception {
+ @After
+ public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
@@ -79,6 +96,7 @@ public class NIOSSLLoadTest extends TestCase {
}
}
+ @Test
public void testLoad() throws Exception {
Queue dest = session.createQueue("TEST");
for (int i = 0; i < PRODUCER_COUNT; i++) {
@@ -104,6 +122,51 @@ public class NIOSSLLoadTest extends TestCase {
}
+ @Test(timeout=360000)
+ @Ignore
+ public void testConnectionHandshakeLoad() throws Exception {
+ final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("nio+ssl://localhost:" + connector.getConnectUri().getPort());
+ int threadNumber = 500;
+ final CountDownLatch latch = new CountDownLatch(threadNumber);
+ final AtomicInteger errors = new AtomicInteger(0);
+ final Random rand = new Random();
+ for (int i = 0; i < threadNumber; i++) {
+ Thread thread = new Thread("thread " + i) {
+ @Override
+ public void run() {
+ for (int i = 0; i < 100; i++) {
+ Connection conn = null;
+ try {
+ conn = factory.createConnection();
+ conn.start();
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ int sleepTime = rand.nextInt((3000 - 1000) + 1) + 1000;
+ LOG.info(getName() + " sleeping " + sleepTime);
+ Thread.sleep(sleepTime);
+ } catch (Exception e) {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ } finally {
+ try {
+ conn.close();
+ } catch (Exception e) {}
+ LOG.info(getName() + " iteration " + i);
+ }
+ }
+
+ LOG.info(getName() + " finished");
+ latch.countDown();
+ }
+ };
+ thread.start();
+ }
+
+ latch.await(5, TimeUnit.MINUTES);
+
+ LOG.info("errors " + errors.get());
+
+ }
+
protected int getReceived() {
int received = 0;
for (ConsumerThread consumer : consumers) {