You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/25 22:14:33 UTC
svn commit: r1402317 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
test/java/org/apache/activemq/transport/nio/NIOSSLBasicTest.java
Author: tabish
Date: Thu Oct 25 20:14:33 2012
New Revision: 1402317
URL: http://svn.apache.org/viewvc?rev=1402317&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4132
Allow the transport to reassemble OpenWire commands from multiple chunks of varying size which can occur depending on the cipher suite.
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOSSLBasicTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java?rev=1402317&r1=1402316&r2=1402317&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java Thu Oct 25 20:14:33 2012
@@ -118,7 +118,6 @@ public class NIOSSLTransport extends NIO
inputBuffer = ByteBuffer.allocate(sslSession.getPacketBufferSize());
inputBuffer.clear();
- currentBuffer = ByteBuffer.allocate(sslSession.getApplicationBufferSize());
NIOOutputStream outputStream = new NIOOutputStream(channel);
outputStream.setEngine(sslEngine);
@@ -171,11 +170,6 @@ public class NIOSSLTransport extends NIO
while (true) {
if (!plain.hasRemaining()) {
- if (status == SSLEngineResult.Status.OK && handshakeStatus != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {
- plain.clear();
- } else {
- plain.compact();
- }
int readCount = secureRead(plain);
if (readCount == 0) {
@@ -204,17 +198,66 @@ public class NIOSSLTransport extends NIO
}
protected void processCommand(ByteBuffer plain) throws Exception {
- nextFrameSize = plain.getInt();
- if (wireFormat instanceof OpenWireFormat) {
- long maxFrameSize = ((OpenWireFormat) wireFormat).getMaxFrameSize();
- if (nextFrameSize > maxFrameSize) {
- throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) +
- " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
+
+ // Are we waiting for the next Command or are we building on the current one
+ if (nextFrameSize == -1) {
+
+ // We can get small packets that don't give us enough for the frame size
+ // so allocate enough for the initial size value and
+ if (plain.remaining() < Integer.SIZE) {
+ if (currentBuffer == null) {
+ currentBuffer = ByteBuffer.allocate(4);
+ }
+
+ // Go until we fill the integer sized current buffer.
+ while (currentBuffer.hasRemaining() && plain.hasRemaining()) {
+ currentBuffer.put(plain.get());
+ }
+
+ // Didn't we get enough yet to figure out next frame size.
+ if (currentBuffer.hasRemaining()) {
+ return;
+ } else {
+ currentBuffer.flip();
+ nextFrameSize = currentBuffer.getInt();
+ }
+
+ } else {
+
+ // Either we are completing a previous read of the next frame size or its
+ // fully contained in plain already.
+ if (currentBuffer != null) {
+
+ // Finish the frame size integer read and get from the current buffer.
+ while (currentBuffer.hasRemaining()) {
+ currentBuffer.put(plain.get());
+ }
+
+ currentBuffer.flip();
+ nextFrameSize = currentBuffer.getInt();
+
+ } else {
+ nextFrameSize = plain.getInt();
+ }
}
- }
- currentBuffer = ByteBuffer.allocate(nextFrameSize + 4);
- currentBuffer.putInt(nextFrameSize);
- if (currentBuffer.hasRemaining()) {
+
+ if (wireFormat instanceof OpenWireFormat) {
+ long maxFrameSize = ((OpenWireFormat) wireFormat).getMaxFrameSize();
+ if (nextFrameSize > maxFrameSize) {
+ throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) +
+ " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
+ }
+ }
+
+ // now we got the data, lets reallocate and store the size for the marshaler.
+ // if there's more data in plain, then the next call will start processing it.
+ currentBuffer = ByteBuffer.allocate(nextFrameSize + 4);
+ currentBuffer.putInt(nextFrameSize);
+
+ } else {
+
+ // If its all in one read then we can just take it all, otherwise take only
+ // the current frame size and the next iteration starts a new command.
if (currentBuffer.remaining() >= plain.remaining()) {
currentBuffer.put(plain);
} else {
@@ -222,15 +265,17 @@ public class NIOSSLTransport extends NIO
plain.get(fill);
currentBuffer.put(fill);
}
- }
- if (currentBuffer.hasRemaining()) {
- return;
- } else {
- currentBuffer.flip();
- Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
- doConsume((Command) command);
- nextFrameSize = -1;
+ // Either we have enough data for a new command or we have to wait for some more.
+ if (currentBuffer.hasRemaining()) {
+ return;
+ } else {
+ currentBuffer.flip();
+ Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
+ doConsume((Command) command);
+ nextFrameSize = -1;
+ currentBuffer = null;
+ }
}
}
@@ -239,6 +284,10 @@ public class NIOSSLTransport extends NIO
if (!(inputBuffer.position() != 0 && inputBuffer.hasRemaining()) || status == SSLEngineResult.Status.BUFFER_UNDERFLOW) {
int bytesRead = channel.read(inputBuffer);
+ if (bytesRead == 0) {
+ return 0;
+ }
+
if (bytesRead == -1) {
sslEngine.closeInbound();
if (inputBuffer.position() == 0 || status == SSLEngineResult.Status.BUFFER_UNDERFLOW) {
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOSSLBasicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOSSLBasicTest.java?rev=1402317&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOSSLBasicTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOSSLBasicTest.java Thu Oct 25 20:14:33 2012
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NIOSSLBasicTest {
+
+ public static final String KEYSTORE_TYPE = "jks";
+ public static final String PASSWORD = "password";
+ public static final String SERVER_KEYSTORE = "src/test/resources/org/apache/activemq/security/broker1.ks";
+ public static final String TRUST_KEYSTORE = "src/test/resources/org/apache/activemq/security/broker1.ks";
+
+ public static final int MESSAGE_COUNT = 1000;
+
+ @Before
+ public void before() throws Exception {
+ System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
+ System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
+ System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
+ System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
+ System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
+ System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
+ // Choose a value that's informative: ssl,handshake,data,trustmanager or all
+ //System.setProperty("javax.net.debug", "handshake");
+ }
+
+ @After
+ public void after() throws Exception {
+ }
+
+ public BrokerService createBroker(String connectorName, String connectorString) throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+ TransportConnector connector = broker.addConnector(connectorString);
+ connector.setName(connectorName);
+ broker.start();
+ broker.waitUntilStarted();
+ return broker;
+ }
+
+ public void stopBroker(BrokerService broker) throws Exception {
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ }
+
+ @Test
+ public void basicConnector() throws Exception {
+ BrokerService broker = createBroker("nio+ssl", "nio+ssl://localhost:0?transport.needClientAuth=true");
+ basicSendReceive("ssl://localhost:" + broker.getConnectorByName("nio+ssl").getConnectUri().getPort());
+ stopBroker(broker);
+ }
+
+ @Test
+ public void enabledCipherSuites() throws Exception {
+ BrokerService broker = createBroker("nio+ssl", "nio+ssl://localhost:0?transport.needClientAuth=true&transport.enabledCipherSuites=SSL_RSA_WITH_RC4_128_SHA,SSL_DH_anon_WITH_3DES_EDE_CBC_SHA");
+ basicSendReceive("ssl://localhost:" + broker.getConnectorByName("nio+ssl").getConnectUri().getPort());
+ stopBroker(broker);
+ }
+
+ public void basicSendReceive(String uri) throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ connection.start();
+
+ String body = "hello world!";
+ Queue destination = session.createQueue("TEST");
+ MessageProducer producer = session.createProducer(destination);
+ producer.send(session.createTextMessage(body));
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ Message received = consumer.receive(2000);
+ TestCase.assertEquals(body, ((TextMessage)received).getText());
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOSSLBasicTest.java
------------------------------------------------------------------------------
svn:eol-style = native