You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/25 22:14:33 UTC

svn commit: r1402317 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java test/java/org/apache/activemq/transport/nio/NIOSSLBasicTest.java

Author: tabish
Date: Thu Oct 25 20:14:33 2012
New Revision: 1402317

URL: http://svn.apache.org/viewvc?rev=1402317&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4132

Allow the transport to reassemble OpenWire commands from multiple chunks of varying size which can occur depending on the cipher suite.

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOSSLBasicTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java?rev=1402317&r1=1402316&r2=1402317&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java Thu Oct 25 20:14:33 2012
@@ -118,7 +118,6 @@ public class NIOSSLTransport extends NIO
 
             inputBuffer = ByteBuffer.allocate(sslSession.getPacketBufferSize());
             inputBuffer.clear();
-            currentBuffer = ByteBuffer.allocate(sslSession.getApplicationBufferSize());
 
             NIOOutputStream outputStream = new NIOOutputStream(channel);
             outputStream.setEngine(sslEngine);
@@ -171,11 +170,6 @@ public class NIOSSLTransport extends NIO
             while (true) {
                 if (!plain.hasRemaining()) {
 
-                    if (status == SSLEngineResult.Status.OK && handshakeStatus != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {
-                        plain.clear();
-                    } else {
-                        plain.compact();
-                    }
                     int readCount = secureRead(plain);
 
                     if (readCount == 0) {
@@ -204,17 +198,66 @@ public class NIOSSLTransport extends NIO
     }
 
     protected void processCommand(ByteBuffer plain) throws Exception {
-        nextFrameSize = plain.getInt();
-        if (wireFormat instanceof OpenWireFormat) {
-            long maxFrameSize = ((OpenWireFormat) wireFormat).getMaxFrameSize();
-            if (nextFrameSize > maxFrameSize) {
-                throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) +
-                                       " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
+
+        // Are we waiting for the next Command or are we building on the current one
+        if (nextFrameSize == -1) {
+
+            // We can get small packets that don't give us enough for the frame size
+            // so allocate enough for the initial size value and
+            if (plain.remaining() < Integer.SIZE) {
+                if (currentBuffer == null) {
+                    currentBuffer = ByteBuffer.allocate(4);
+                }
+
+                // Go until we fill the integer sized current buffer.
+                while (currentBuffer.hasRemaining() && plain.hasRemaining()) {
+                    currentBuffer.put(plain.get());
+                }
+
+                // Didn't we get enough yet to figure out next frame size.
+                if (currentBuffer.hasRemaining()) {
+                    return;
+                } else {
+                    currentBuffer.flip();
+                    nextFrameSize = currentBuffer.getInt();
+                }
+
+            } else {
+
+                // Either we are completing a previous read of the next frame size or its
+                // fully contained in plain already.
+                if (currentBuffer != null) {
+
+                    // Finish the frame size integer read and get from the current buffer.
+                    while (currentBuffer.hasRemaining()) {
+                        currentBuffer.put(plain.get());
+                    }
+
+                    currentBuffer.flip();
+                    nextFrameSize = currentBuffer.getInt();
+
+                } else {
+                    nextFrameSize = plain.getInt();
+                }
             }
-        }
-        currentBuffer = ByteBuffer.allocate(nextFrameSize + 4);
-        currentBuffer.putInt(nextFrameSize);
-        if (currentBuffer.hasRemaining()) {
+
+            if (wireFormat instanceof OpenWireFormat) {
+                long maxFrameSize = ((OpenWireFormat) wireFormat).getMaxFrameSize();
+                if (nextFrameSize > maxFrameSize) {
+                    throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) +
+                                          " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
+                }
+            }
+
+            // now we got the data, lets reallocate and store the size for the marshaler.
+            // if there's more data in plain, then the next call will start processing it.
+            currentBuffer = ByteBuffer.allocate(nextFrameSize + 4);
+            currentBuffer.putInt(nextFrameSize);
+
+        } else {
+
+            // If its all in one read then we can just take it all, otherwise take only
+            // the current frame size and the next iteration starts a new command.
             if (currentBuffer.remaining() >= plain.remaining()) {
                 currentBuffer.put(plain);
             } else {
@@ -222,15 +265,17 @@ public class NIOSSLTransport extends NIO
                 plain.get(fill);
                 currentBuffer.put(fill);
             }
-        }
 
-        if (currentBuffer.hasRemaining()) {
-            return;
-        } else {
-            currentBuffer.flip();
-            Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
-            doConsume((Command) command);
-            nextFrameSize = -1;
+            // Either we have enough data for a new command or we have to wait for some more.
+            if (currentBuffer.hasRemaining()) {
+                return;
+            } else {
+                currentBuffer.flip();
+                Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
+                doConsume((Command) command);
+                nextFrameSize = -1;
+                currentBuffer = null;
+            }
         }
     }
 
@@ -239,6 +284,10 @@ public class NIOSSLTransport extends NIO
         if (!(inputBuffer.position() != 0 && inputBuffer.hasRemaining()) || status == SSLEngineResult.Status.BUFFER_UNDERFLOW) {
             int bytesRead = channel.read(inputBuffer);
 
+            if (bytesRead == 0) {
+                return 0;
+            }
+
             if (bytesRead == -1) {
                 sslEngine.closeInbound();
                 if (inputBuffer.position() == 0 || status == SSLEngineResult.Status.BUFFER_UNDERFLOW) {

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOSSLBasicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOSSLBasicTest.java?rev=1402317&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOSSLBasicTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOSSLBasicTest.java Thu Oct 25 20:14:33 2012
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NIOSSLBasicTest {
+
+    public static final String KEYSTORE_TYPE = "jks";
+    public static final String PASSWORD = "password";
+    public static final String SERVER_KEYSTORE = "src/test/resources/org/apache/activemq/security/broker1.ks";
+    public static final String TRUST_KEYSTORE = "src/test/resources/org/apache/activemq/security/broker1.ks";
+
+    public static final int MESSAGE_COUNT = 1000;
+
+    @Before
+    public void before() throws Exception {
+        System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
+        System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
+        System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
+        System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
+        System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
+        System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
+        // Choose a value that's informative: ssl,handshake,data,trustmanager or all
+        //System.setProperty("javax.net.debug", "handshake");
+    }
+
+    @After
+    public void after() throws Exception {
+    }
+
+    public BrokerService createBroker(String connectorName, String connectorString) throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        TransportConnector connector = broker.addConnector(connectorString);
+        connector.setName(connectorName);
+        broker.start();
+        broker.waitUntilStarted();
+        return broker;
+    }
+
+    public void stopBroker(BrokerService broker) throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    @Test
+    public void basicConnector() throws Exception {
+        BrokerService broker = createBroker("nio+ssl", "nio+ssl://localhost:0?transport.needClientAuth=true");
+        basicSendReceive("ssl://localhost:" + broker.getConnectorByName("nio+ssl").getConnectUri().getPort());
+        stopBroker(broker);
+    }
+
+    @Test
+    public void enabledCipherSuites() throws Exception {
+        BrokerService broker = createBroker("nio+ssl", "nio+ssl://localhost:0?transport.needClientAuth=true&transport.enabledCipherSuites=SSL_RSA_WITH_RC4_128_SHA,SSL_DH_anon_WITH_3DES_EDE_CBC_SHA");
+        basicSendReceive("ssl://localhost:" + broker.getConnectorByName("nio+ssl").getConnectUri().getPort());
+        stopBroker(broker);
+    }
+
+    public void basicSendReceive(String uri) throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        connection.start();
+
+        String body = "hello world!";
+        Queue destination = session.createQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+        producer.send(session.createTextMessage(body));
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        Message received = consumer.receive(2000);
+        TestCase.assertEquals(body, ((TextMessage)received).getText());
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOSSLBasicTest.java
------------------------------------------------------------------------------
    svn:eol-style = native