You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2011/07/25 13:08:54 UTC
svn commit: r1150630 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/nio/
main/java/org/apache/activemq/transport/stomp/
main/java/org/apache/activemq/transport/tcp/
main/resources/META-INF/services/org/apache/activemq/t...
Author: dejanb
Date: Mon Jul 25 11:08:52 2011
New Revision: 1150630
URL: http://svn.apache.org/viewvc?rev=1150630&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-2583 - stomp+nio+ssl initial implementation
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java
activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio+ssl
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java?rev=1150630&r1=1150629&r2=1150630&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java Mon Jul 25 11:08:52 2011
@@ -128,6 +128,9 @@ public class NIOOutputStream extends Out
*/
public void close() throws IOException {
super.close();
+ if (engine != null) {
+ engine.closeOutbound();
+ }
closed = true;
}
@@ -159,6 +162,7 @@ public class NIOOutputStream extends Out
} else {
plain = data;
}
+
int remaining = plain.remaining();
int lastRemaining = remaining - 1;
long delay = 1;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java?rev=1150630&r1=1150629&r2=1150630&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java Mon Jul 25 11:08:52 2011
@@ -28,6 +28,7 @@ import javax.net.SocketFactory;
import javax.net.ssl.*;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
@@ -36,18 +37,18 @@ import java.nio.ByteBuffer;
public class NIOSSLTransport extends NIOTransport {
- private boolean needClientAuth;
- private boolean wantClientAuth;
- private String[] enabledCipherSuites;
+ protected boolean needClientAuth;
+ protected boolean wantClientAuth;
+ protected String[] enabledCipherSuites;
protected SSLContext sslContext;
protected SSLEngine sslEngine;
protected SSLSession sslSession;
- boolean handshakeInProgress = false;
- SSLEngineResult.Status status = null;
- SSLEngineResult.HandshakeStatus handshakeStatus = null;
+ protected boolean handshakeInProgress = false;
+ protected SSLEngineResult.Status status = null;
+ protected SSLEngineResult.HandshakeStatus handshakeStatus = null;
public NIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
@@ -90,11 +91,8 @@ public class NIOSSLTransport extends NIO
outputStream.setEngine(sslEngine);
this.dataOut = new DataOutputStream(outputStream);
this.buffOut = outputStream;
-
sslEngine.beginHandshake();
handshakeStatus = sslEngine.getHandshakeStatus();
-
-
doHandshake();
} catch (Exception e) {
@@ -125,8 +123,6 @@ public class NIOSSLTransport extends NIO
}
}
-
-
protected void serviceRead() {
try {
if (handshakeInProgress) {
@@ -136,62 +132,75 @@ public class NIOSSLTransport extends NIO
ByteBuffer plain = ByteBuffer.allocate(sslSession.getApplicationBufferSize());
plain.position(plain.limit());
- while (true) {
- if (nextFrameSize == -1) {
- if (!plain.hasRemaining()) {
- plain.clear();
- int readCount = secureRead(plain);
- if (readCount == 0)
- break;
- }
- nextFrameSize = plain.getInt();
- if (wireFormat instanceof OpenWireFormat) {
- long maxFrameSize = ((OpenWireFormat)wireFormat).getMaxFrameSize();
- if (nextFrameSize > maxFrameSize) {
- throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
- }
- }
- currentBuffer = ByteBuffer.allocate(nextFrameSize + 4);
- currentBuffer.putInt(nextFrameSize);
- if (currentBuffer.hasRemaining()) {
- if (currentBuffer.remaining() >= plain.remaining()) {
- currentBuffer.put(plain);
- } else {
- byte[] fill = new byte[currentBuffer.remaining()];
- plain.get(fill);
- currentBuffer.put(fill);
- }
- }
+ while(true) {
+ if (!plain.hasRemaining()) {
+
+ plain.clear();
+ int readCount = secureRead(plain);
- if (currentBuffer.hasRemaining()) {
- continue;
- } else {
- currentBuffer.flip();
- Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
- doConsume((Command) command);
- nextFrameSize = -1;
+ if (readCount == 0)
+ break;
+
+ // channel is closed, cleanup
+ if (readCount== -1) {
+ onException(new EOFException());
+ selection.close();
+ break;
}
}
- }
+ processCommand(plain);
+
+ }
} catch (IOException e) {
onException(e);
} catch (Throwable e) {
onException(IOExceptionSupport.create(e));
}
+ }
+ protected void processCommand(ByteBuffer plain) throws Exception {
+ nextFrameSize = plain.getInt();
+ if (wireFormat instanceof OpenWireFormat) {
+ long maxFrameSize = ((OpenWireFormat) wireFormat).getMaxFrameSize();
+ if (nextFrameSize > maxFrameSize) {
+ throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
+ }
+ }
+ currentBuffer = ByteBuffer.allocate(nextFrameSize + 4);
+ currentBuffer.putInt(nextFrameSize);
+ if (currentBuffer.hasRemaining()) {
+ if (currentBuffer.remaining() >= plain.remaining()) {
+ currentBuffer.put(plain);
+ } else {
+ byte[] fill = new byte[currentBuffer.remaining()];
+ plain.get(fill);
+ currentBuffer.put(fill);
+ }
+ }
+
+ if (currentBuffer.hasRemaining()) {
+ return;
+ } else {
+ currentBuffer.flip();
+ Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
+ doConsume((Command) command);
+ nextFrameSize = -1;
+ }
}
+ protected int secureRead(ByteBuffer plain) throws Exception {
+ if (!(inputBuffer.position() != 0 && inputBuffer.hasRemaining())) {
+ int bytesRead = channel.read(inputBuffer);
- private int secureRead(ByteBuffer plain) throws Exception {
- int bytesRead = channel.read(inputBuffer);
- if (bytesRead == -1) {
- sslEngine.closeInbound();
- if (inputBuffer.position() == 0 ||
- status == SSLEngineResult.Status.BUFFER_UNDERFLOW) {
- return -1;
+ if (bytesRead == -1) {
+ sslEngine.closeInbound();
+ if (inputBuffer.position() == 0 ||
+ status == SSLEngineResult.Status.BUFFER_UNDERFLOW) {
+ return -1;
+ }
}
}
@@ -206,12 +215,13 @@ public class NIOSSLTransport extends NIO
res.bytesProduced() == 0);
if (res.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.FINISHED) {
- finishHandshake();
+ finishHandshake();
}
status = res.getStatus();
handshakeStatus = res.getHandshakeStatus();
+
//TODO deal with BUFFER_OVERFLOW
if (status == SSLEngineResult.Status.CLOSED) {
@@ -253,6 +263,7 @@ public class NIOSSLTransport extends NIO
protected void doStop(ServiceStopper stopper) throws Exception {
if (channel != null) {
channel.close();
+ channel = null;
}
super.doStop(stopper);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java?rev=1150630&r1=1150629&r2=1150630&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java Mon Jul 25 11:08:52 2011
@@ -161,6 +161,7 @@ public class NIOTransport extends TcpTra
protected void doStop(ServiceStopper stopper) throws Exception {
if (selection != null) {
selection.close();
+ selection = null;
}
super.doStop(stopper);
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java?rev=1150630&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java Mon Jul 25 11:08:52 2011
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.wireformat.WireFormat;
+
+import java.io.ByteArrayInputStream;
+import java.util.HashMap;
+
+public class StompCodec {
+
+ TcpTransport transport;
+
+ ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
+ boolean processedHeaders = false;
+ String action;
+ HashMap<String, String> headers;
+ int contentLength = -1;
+ int readLength = 0;
+ int previousByte = -1;
+
+ public StompCodec(TcpTransport transport) {
+ this.transport = transport;
+ }
+
+ public void parse(ByteArrayInputStream input, int readSize) throws Exception {
+ int i = 0;
+ int b;
+ while(i++ < readSize) {
+ b = input.read();
+ // skip repeating nulls
+ if (!processedHeaders && previousByte == 0 && b == 0) {
+ continue;
+ }
+
+ if (!processedHeaders) {
+ currentCommand.write(b);
+ // end of headers section, parse action and header
+ if (previousByte == '\n' && b == '\n') {
+ if (transport.getWireFormat() instanceof StompWireFormat) {
+ DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
+ action = ((StompWireFormat)transport.getWireFormat()).parseAction(data);
+ headers = ((StompWireFormat)transport.getWireFormat()).parseHeaders(data);
+ String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
+ if (contentLengthHeader != null) {
+ contentLength = ((StompWireFormat)transport.getWireFormat()).parseContentLength(contentLengthHeader);
+ } else {
+ contentLength = -1;
+ }
+ }
+ processedHeaders = true;
+ currentCommand.reset();
+ }
+ } else {
+
+ if (contentLength == -1) {
+ // end of command reached, unmarshal
+ if (b == 0) {
+ processCommand();
+ } else {
+ currentCommand.write(b);
+ }
+ } else {
+ // read desired content length
+ if (readLength++ == contentLength) {
+ processCommand();
+ readLength = 0;
+ } else {
+ currentCommand.write(b);
+ }
+ }
+ }
+
+ previousByte = b;
+ }
+ }
+
+ protected void processCommand() throws Exception {
+ StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
+ transport.doConsume(frame);
+ processedHeaders = false;
+ currentCommand.reset();
+ contentLength = -1;
+ }
+}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java?rev=1150630&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java Mon Jul 25 11:08:52 2011
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import org.apache.activemq.transport.nio.NIOSSLTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+import javax.net.SocketFactory;
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+
+public class StompNIOSSLTransport extends NIOSSLTransport {
+
+ StompCodec codec;
+
+ public StompNIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
+ super(wireFormat, socketFactory, remoteLocation, localLocation);
+ }
+
+ public StompNIOSSLTransport(WireFormat wireFormat, Socket socket) throws IOException {
+ super(wireFormat, socket);
+ }
+
+ @Override
+ protected void initializeStreams() throws IOException {
+ codec = new StompCodec(this);
+ super.initializeStreams();
+ if (inputBuffer.position() != 0 && inputBuffer.hasRemaining()) {
+ serviceRead();
+ }
+ }
+
+ @Override
+ protected void processCommand(ByteBuffer plain) throws Exception {
+ byte[] fill = new byte[plain.remaining()];
+ plain.get(fill);
+ ByteArrayInputStream input = new ByteArrayInputStream(fill);
+ codec.parse(input, fill.length);
+ }
+
+}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java?rev=1150630&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java Mon Jul 25 11:08:52 2011
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import org.apache.activemq.broker.SslContext;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.wireformat.WireFormat;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+public class StompNIOSSLTransportFactory extends StompNIOTransportFactory {
+
+ SSLContext context;
+
+ @Override
+ protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+ return new TcpTransportServer(this, location, serverSocketFactory) {
+ protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+ StompNIOSSLTransport transport = new StompNIOSSLTransport(format, socket);
+ if (context != null) {
+ transport.setSslContext(context);
+ }
+ return transport;
+ }
+ };
+ }
+
+ @Override
+ protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
+ return new StompNIOSSLTransport(wf, socketFactory, location, localLocation);
+ }
+
+ @Override
+ public TransportServer doBind(URI location) throws IOException {
+ if (SslContext.getCurrentSslContext() != null) {
+ try {
+ context = SslContext.getCurrentSslContext().getSSLContext();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ return super.doBind(location);
+ }
+
+}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=1150630&r1=1150629&r2=1150630&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java Mon Jul 25 11:08:52 2011
@@ -54,13 +54,7 @@ public class StompNIOTransport extends T
private SelectorSelection selection;
private ByteBuffer inputBuffer;
- ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
- boolean processedHeaders = false;
- String action;
- HashMap<String, String> headers;
- int contentLength = -1;
- int readLength = 0;
- int previousByte = -1;
+ StompCodec codec;
public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
@@ -93,6 +87,7 @@ public class StompNIOTransport extends T
NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
this.dataOut = new DataOutputStream(outPutStream);
this.buffOut = outPutStream;
+ codec = new StompCodec(this);
}
private void serviceRead() {
@@ -114,57 +109,9 @@ public class StompNIOTransport extends T
inputBuffer.flip();
- int b;
ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
+ codec.parse(input, readSize);
- int i = 0;
- while(i++ < readSize) {
- b = input.read();
- // skip repeating nulls
- if (!processedHeaders && previousByte == 0 && b == 0) {
- continue;
- }
-
- if (!processedHeaders) {
- currentCommand.write(b);
- // end of headers section, parse action and header
- if (previousByte == '\n' && b == '\n') {
- if (wireFormat instanceof StompWireFormat) {
- DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
- action = ((StompWireFormat)wireFormat).parseAction(data);
- headers = ((StompWireFormat)wireFormat).parseHeaders(data);
- String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
- if (contentLengthHeader != null) {
- contentLength = ((StompWireFormat)wireFormat).parseContentLength(contentLengthHeader);
- } else {
- contentLength = -1;
- }
- }
- processedHeaders = true;
- currentCommand.reset();
- }
- } else {
-
- if (contentLength == -1) {
- // end of command reached, unmarshal
- if (b == 0) {
- processCommand();
- } else {
- currentCommand.write(b);
- }
- } else {
- // read desired content length
- if (readLength++ == contentLength) {
- processCommand();
- readLength = 0;
- } else {
- currentCommand.write(b);
- }
- }
- }
-
- previousByte = b;
- }
// clear the buffer
inputBuffer.clear();
@@ -176,14 +123,6 @@ public class StompNIOTransport extends T
}
}
- private void processCommand() throws Exception {
- StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
- doConsume(frame);
- processedHeaders = false;
- currentCommand.reset();
- contentLength = -1;
- }
-
protected void doStart() throws Exception {
connect();
selection.setInterestOps(SelectionKey.OP_READ);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=1150630&r1=1150629&r2=1150630&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Mon Jul 25 11:08:52 2011
@@ -687,4 +687,8 @@ public class TcpTransport extends Transp
this.typeOfServiceChosen = false;
return true;
}
+
+ public WireFormat getWireFormat() {
+ return wireFormat;
+ }
}
Added: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio+ssl
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp%2Bnio%2Bssl?rev=1150630&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio+ssl (added)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio+ssl Mon Jul 25 11:08:52 2011
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.stomp.StompNIOSSLTransportFactory
\ No newline at end of file
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLTest.java?rev=1150630&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLTest.java Mon Jul 25 11:08:52 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+
+public class StompNIOSSLTest extends StompTest {
+
+ protected void setUp() throws Exception {
+ bindAddress = "stomp+nio+ssl://localhost:61613";
+ confUri = "xbean:org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml";
+ System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
+ System.setProperty("javax.net.ssl.trustStorePassword", "password");
+ System.setProperty("javax.net.ssl.trustStoreType", "jks");
+ System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
+ System.setProperty("javax.net.ssl.keyStorePassword", "password");
+ System.setProperty("javax.net.ssl.keyStoreType", "jks");
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ protected Socket createSocket(URI connectUri) throws IOException {
+ SocketFactory factory = SSLSocketFactory.getDefault();
+ return factory.createSocket("127.0.0.1", connectUri.getPort());
+ }
+
+}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1150630&r1=1150629&r2=1150630&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Mon Jul 25 11:08:52 2011
@@ -111,6 +111,7 @@ public class StompTest extends Combinati
}
broker = BrokerFactory.createBroker(new URI(confUri));
broker.start();
+ broker.waitUntilStarted();
stompConnect();
@@ -143,6 +144,7 @@ public class StompTest extends Combinati
// Some tests explicitly disconnect from stomp so can ignore
} finally {
broker.stop();
+ broker.waitUntilStopped();
}
}
Modified: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml?rev=1150630&r1=1150629&r2=1150630&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml (original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml Mon Jul 25 11:08:52 2011
@@ -69,6 +69,7 @@
<transportConnectors>
<transportConnector name="stomp+ssl" uri="stomp+ssl://localhost:61612"/>
+ <transportConnector name="stomp+nio+ssl" uri="stomp+nio+ssl://localhost:61613"/>
</transportConnectors>