You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2011/07/25 13:08:54 UTC

svn commit: r1150630 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/nio/ main/java/org/apache/activemq/transport/stomp/ main/java/org/apache/activemq/transport/tcp/ main/resources/META-INF/services/org/apache/activemq/t...

Author: dejanb
Date: Mon Jul 25 11:08:52 2011
New Revision: 1150630

URL: http://svn.apache.org/viewvc?rev=1150630&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-2583 - stomp+nio+ssl initial implementation

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio+ssl
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java?rev=1150630&r1=1150629&r2=1150630&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java Mon Jul 25 11:08:52 2011
@@ -128,6 +128,9 @@ public class NIOOutputStream extends Out
      */
     public void close() throws IOException {
         super.close();
+        if (engine != null) {
+            engine.closeOutbound();
+        }
         closed = true;
     }
 
@@ -159,6 +162,7 @@ public class NIOOutputStream extends Out
         }  else {
             plain = data;
         }
+
         int remaining = plain.remaining();
         int lastRemaining = remaining - 1;
         long delay = 1;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java?rev=1150630&r1=1150629&r2=1150630&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java Mon Jul 25 11:08:52 2011
@@ -28,6 +28,7 @@ import javax.net.SocketFactory;
 import javax.net.ssl.*;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.URI;
@@ -36,18 +37,18 @@ import java.nio.ByteBuffer;
 
 public class NIOSSLTransport extends NIOTransport  {
 
-    private boolean needClientAuth;
-    private boolean wantClientAuth;
-    private String[] enabledCipherSuites;
+    protected boolean needClientAuth;
+    protected boolean wantClientAuth;
+    protected String[] enabledCipherSuites;
 
     protected SSLContext sslContext;
     protected SSLEngine sslEngine;
     protected SSLSession sslSession;
 
 
-    boolean handshakeInProgress = false;
-    SSLEngineResult.Status status = null;
-    SSLEngineResult.HandshakeStatus handshakeStatus = null;
+    protected boolean handshakeInProgress = false;
+    protected SSLEngineResult.Status status = null;
+    protected SSLEngineResult.HandshakeStatus handshakeStatus = null;
 
     public NIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
         super(wireFormat, socketFactory, remoteLocation, localLocation);
@@ -90,11 +91,8 @@ public class NIOSSLTransport extends NIO
             outputStream.setEngine(sslEngine);
             this.dataOut = new DataOutputStream(outputStream);
             this.buffOut = outputStream;
-
             sslEngine.beginHandshake();
             handshakeStatus = sslEngine.getHandshakeStatus();
-
-
             doHandshake();
 
         } catch (Exception e) {
@@ -125,8 +123,6 @@ public class NIOSSLTransport extends NIO
           }
     }
 
-
-
     protected void serviceRead() {
         try {
             if (handshakeInProgress) {
@@ -136,62 +132,75 @@ public class NIOSSLTransport extends NIO
             ByteBuffer plain = ByteBuffer.allocate(sslSession.getApplicationBufferSize());
             plain.position(plain.limit());
 
-            while (true) {
-                if (nextFrameSize == -1) {
-                    if (!plain.hasRemaining()) {
-                        plain.clear();
-                        int readCount = secureRead(plain);
-                        if (readCount == 0)
-                            break;
-                    }
-                    nextFrameSize = plain.getInt();
-                    if (wireFormat instanceof OpenWireFormat) {
-                        long maxFrameSize = ((OpenWireFormat)wireFormat).getMaxFrameSize();
-                        if (nextFrameSize > maxFrameSize) {
-                            throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
-                        }
-                    }
-                    currentBuffer = ByteBuffer.allocate(nextFrameSize + 4);
-                    currentBuffer.putInt(nextFrameSize);
-                    if (currentBuffer.hasRemaining()) {
-                        if (currentBuffer.remaining() >= plain.remaining()) {
-                            currentBuffer.put(plain);
-                        } else {
-                            byte[] fill = new byte[currentBuffer.remaining()];
-                            plain.get(fill);
-                            currentBuffer.put(fill);
-                        }
-                    }
+            while(true) {
+                if (!plain.hasRemaining()) {
+
+                    plain.clear();
+                    int readCount = secureRead(plain);
 
-                    if (currentBuffer.hasRemaining()) {
-                        continue;
-                    } else {
-                        currentBuffer.flip();
-                        Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
-                        doConsume((Command) command);
 
-                        nextFrameSize = -1;
+                    if (readCount == 0)
+                        break;
+
+                    // channel is closed, cleanup
+                    if (readCount== -1) {
+                        onException(new EOFException());
+                        selection.close();
+                        break;
                     }
                 }
-            }
 
+                processCommand(plain);
+
+            }
         } catch (IOException e) {
             onException(e);
         } catch (Throwable e) {
             onException(IOExceptionSupport.create(e));
         }
+    }
 
+    protected void processCommand(ByteBuffer plain) throws Exception {
+        nextFrameSize = plain.getInt();
+        if (wireFormat instanceof OpenWireFormat) {
+            long maxFrameSize = ((OpenWireFormat) wireFormat).getMaxFrameSize();
+            if (nextFrameSize > maxFrameSize) {
+                throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
+            }
+        }
+        currentBuffer = ByteBuffer.allocate(nextFrameSize + 4);
+        currentBuffer.putInt(nextFrameSize);
+        if (currentBuffer.hasRemaining()) {
+            if (currentBuffer.remaining() >= plain.remaining()) {
+                currentBuffer.put(plain);
+            } else {
+                byte[] fill = new byte[currentBuffer.remaining()];
+                plain.get(fill);
+                currentBuffer.put(fill);
+            }
+        }
+
+        if (currentBuffer.hasRemaining()) {
+            return;
+        } else {
+            currentBuffer.flip();
+            Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
+            doConsume((Command) command);
+            nextFrameSize = -1;
+        }
     }
 
+    protected int secureRead(ByteBuffer plain) throws Exception {
 
+        if (!(inputBuffer.position() != 0 && inputBuffer.hasRemaining())) {
+            int bytesRead = channel.read(inputBuffer);
 
-    private int secureRead(ByteBuffer plain) throws Exception {
-        int bytesRead = channel.read(inputBuffer);
-        if (bytesRead == -1) {
-            sslEngine.closeInbound();
-            if (inputBuffer.position() == 0 ||
-                    status == SSLEngineResult.Status.BUFFER_UNDERFLOW) {
-                return -1;
+            if (bytesRead == -1) {
+                sslEngine.closeInbound();
+                if (inputBuffer.position() == 0 ||
+                        status == SSLEngineResult.Status.BUFFER_UNDERFLOW) {
+                    return -1;
+                }
             }
         }
 
@@ -206,12 +215,13 @@ public class NIOSSLTransport extends NIO
                 res.bytesProduced() == 0);
 
         if (res.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.FINISHED) {
-            finishHandshake();
+           finishHandshake();
         }
 
         status = res.getStatus();
         handshakeStatus = res.getHandshakeStatus();
 
+
         //TODO deal with BUFFER_OVERFLOW
 
         if (status == SSLEngineResult.Status.CLOSED) {
@@ -253,6 +263,7 @@ public class NIOSSLTransport extends NIO
     protected void doStop(ServiceStopper stopper) throws Exception {
         if (channel != null) {
             channel.close();
+            channel = null;
         }
         super.doStop(stopper);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java?rev=1150630&r1=1150629&r2=1150630&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java Mon Jul 25 11:08:52 2011
@@ -161,6 +161,7 @@ public class NIOTransport extends TcpTra
     protected void doStop(ServiceStopper stopper) throws Exception {
         if (selection != null) {
             selection.close();
+            selection = null;
         }
         super.doStop(stopper);
     }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java?rev=1150630&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java Mon Jul 25 11:08:52 2011
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.wireformat.WireFormat;
+
+import java.io.ByteArrayInputStream;
+import java.util.HashMap;
+
+public class StompCodec {
+
+    TcpTransport transport;
+
+    ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
+    boolean processedHeaders = false;
+    String action;
+    HashMap<String, String> headers;
+    int contentLength = -1;
+    int readLength = 0;
+    int previousByte = -1;
+
+    public StompCodec(TcpTransport transport) {
+        this.transport = transport;
+    }
+
+    public void parse(ByteArrayInputStream input, int readSize) throws Exception {
+               int i = 0;
+               int b;
+               while(i++ < readSize) {
+                   b = input.read();
+                   // skip repeating nulls
+                   if (!processedHeaders && previousByte == 0 && b == 0) {
+                       continue;
+                   }
+
+                   if (!processedHeaders) {
+                       currentCommand.write(b);
+                       // end of headers section, parse action and header
+                       if (previousByte == '\n' && b == '\n') {
+                           if (transport.getWireFormat() instanceof StompWireFormat) {
+                               DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
+                               action = ((StompWireFormat)transport.getWireFormat()).parseAction(data);
+                               headers = ((StompWireFormat)transport.getWireFormat()).parseHeaders(data);
+                               String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
+                               if (contentLengthHeader != null) {
+                                   contentLength = ((StompWireFormat)transport.getWireFormat()).parseContentLength(contentLengthHeader);
+                               } else {
+                                   contentLength = -1;
+                               }
+                           }
+                           processedHeaders = true;
+                           currentCommand.reset();
+                       }
+                   } else {
+
+                       if (contentLength == -1) {
+                           // end of command reached, unmarshal
+                           if (b == 0) {
+                               processCommand();
+                           } else {
+                               currentCommand.write(b);
+                           }
+                       } else {
+                           // read desired content length
+                           if (readLength++ == contentLength) {
+                               processCommand();
+                               readLength = 0;
+                           } else {
+                               currentCommand.write(b);
+                           }
+                       }
+                   }
+
+                   previousByte = b;
+               }
+    }
+
+    protected void processCommand() throws Exception {
+        StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
+        transport.doConsume(frame);
+        processedHeaders = false;
+        currentCommand.reset();
+        contentLength = -1;
+    }
+}

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java?rev=1150630&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java Mon Jul 25 11:08:52 2011
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import org.apache.activemq.transport.nio.NIOSSLTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+import javax.net.SocketFactory;
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+
+public class StompNIOSSLTransport extends NIOSSLTransport {
+
+    StompCodec codec;
+
+    public StompNIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
+        super(wireFormat, socketFactory, remoteLocation, localLocation);
+    }
+
+    public StompNIOSSLTransport(WireFormat wireFormat, Socket socket) throws IOException {
+        super(wireFormat, socket);
+    }
+
+    @Override
+    protected void initializeStreams() throws IOException {
+        codec = new StompCodec(this);
+        super.initializeStreams();
+        if (inputBuffer.position() != 0 && inputBuffer.hasRemaining()) {
+            serviceRead();
+        }
+    }
+
+    @Override
+    protected void processCommand(ByteBuffer plain) throws Exception {
+        byte[] fill = new byte[plain.remaining()];
+        plain.get(fill);
+        ByteArrayInputStream input = new ByteArrayInputStream(fill);
+        codec.parse(input, fill.length);
+    }
+
+}

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java?rev=1150630&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java Mon Jul 25 11:08:52 2011
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import org.apache.activemq.broker.SslContext;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.wireformat.WireFormat;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+public class StompNIOSSLTransportFactory extends StompNIOTransportFactory {
+
+    SSLContext context;
+
+    @Override
+    protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+        return new TcpTransportServer(this, location, serverSocketFactory) {
+            protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+                StompNIOSSLTransport transport = new StompNIOSSLTransport(format, socket);
+                if (context != null) {
+                    transport.setSslContext(context);
+                }
+                return transport;
+            }
+        };
+    }
+
+    @Override
+    protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
+        return new StompNIOSSLTransport(wf, socketFactory, location, localLocation);
+    }
+
+        @Override
+    public TransportServer doBind(URI location) throws IOException {
+         if (SslContext.getCurrentSslContext() != null) {
+             try {
+                 context = SslContext.getCurrentSslContext().getSSLContext();
+             } catch (Exception e) {
+                 throw new IOException(e);
+             }
+         }
+        return super.doBind(location);
+    }
+
+}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=1150630&r1=1150629&r2=1150630&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java Mon Jul 25 11:08:52 2011
@@ -54,13 +54,7 @@ public class StompNIOTransport extends T
     private SelectorSelection selection;
 
     private ByteBuffer inputBuffer;
-    ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
-    boolean processedHeaders = false;
-    String action;
-    HashMap<String, String> headers;
-    int contentLength = -1;
-    int readLength = 0;
-    int previousByte = -1;
+    StompCodec codec;
 
     public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
         super(wireFormat, socketFactory, remoteLocation, localLocation);
@@ -93,6 +87,7 @@ public class StompNIOTransport extends T
         NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
         this.dataOut = new DataOutputStream(outPutStream);
         this.buffOut = outPutStream;
+        codec = new StompCodec(this);
     }
 
     private void serviceRead() {
@@ -114,57 +109,9 @@ public class StompNIOTransport extends T
 
                inputBuffer.flip();
 
-               int b;
                ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
+               codec.parse(input, readSize);
 
-               int i = 0;
-               while(i++ < readSize) {
-                   b = input.read();
-                   // skip repeating nulls
-                   if (!processedHeaders && previousByte == 0 && b == 0) {
-                       continue;
-                   }
-
-                   if (!processedHeaders) {
-                       currentCommand.write(b);
-                       // end of headers section, parse action and header
-                       if (previousByte == '\n' && b == '\n') {
-                           if (wireFormat instanceof StompWireFormat) {
-                               DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
-                               action = ((StompWireFormat)wireFormat).parseAction(data);
-                               headers = ((StompWireFormat)wireFormat).parseHeaders(data);
-                               String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
-                               if (contentLengthHeader != null) {
-                                   contentLength = ((StompWireFormat)wireFormat).parseContentLength(contentLengthHeader);
-                               } else {
-                                   contentLength = -1;
-                               }
-                           }
-                           processedHeaders = true;
-                           currentCommand.reset();
-                       }
-                   } else {
-
-                       if (contentLength == -1) {
-                           // end of command reached, unmarshal
-                           if (b == 0) {
-                               processCommand();
-                           } else {
-                               currentCommand.write(b);
-                           }
-                       } else {
-                           // read desired content length
-                           if (readLength++ == contentLength) {
-                               processCommand();
-                               readLength = 0;
-                           } else {
-                               currentCommand.write(b);
-                           }
-                       }
-                   }
-
-                   previousByte = b;
-               }
                // clear the buffer
                inputBuffer.clear();
 
@@ -176,14 +123,6 @@ public class StompNIOTransport extends T
         }
     }
 
-    private void processCommand() throws Exception {
-        StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
-        doConsume(frame);
-        processedHeaders = false;
-        currentCommand.reset();
-        contentLength = -1;
-    }
-
     protected void doStart() throws Exception {
         connect();
         selection.setInterestOps(SelectionKey.OP_READ);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=1150630&r1=1150629&r2=1150630&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Mon Jul 25 11:08:52 2011
@@ -687,4 +687,8 @@ public class TcpTransport extends Transp
         this.typeOfServiceChosen = false;
         return true;
     }
+
+    public WireFormat getWireFormat() {
+        return wireFormat;
+    }
 }

Added: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio+ssl
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp%2Bnio%2Bssl?rev=1150630&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio+ssl (added)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio+ssl Mon Jul 25 11:08:52 2011
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.stomp.StompNIOSSLTransportFactory
\ No newline at end of file

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLTest.java?rev=1150630&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLTest.java Mon Jul 25 11:08:52 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+
+public class StompNIOSSLTest extends StompTest {
+
+    protected void setUp() throws Exception {
+        bindAddress = "stomp+nio+ssl://localhost:61613";
+        confUri = "xbean:org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml";
+        System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
+        System.setProperty("javax.net.ssl.trustStorePassword", "password");
+        System.setProperty("javax.net.ssl.trustStoreType", "jks");
+        System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
+        System.setProperty("javax.net.ssl.keyStorePassword", "password");
+        System.setProperty("javax.net.ssl.keyStoreType", "jks");
+        super.setUp();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    protected Socket createSocket(URI connectUri) throws IOException {
+        SocketFactory factory = SSLSocketFactory.getDefault();
+        return factory.createSocket("127.0.0.1", connectUri.getPort());
+    }
+
+}

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1150630&r1=1150629&r2=1150630&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Mon Jul 25 11:08:52 2011
@@ -111,6 +111,7 @@ public class StompTest extends Combinati
         }
         broker = BrokerFactory.createBroker(new URI(confUri));
         broker.start();
+        broker.waitUntilStarted();
 
         stompConnect();
 
@@ -143,6 +144,7 @@ public class StompTest extends Combinati
             // Some tests explicitly disconnect from stomp so can ignore
         } finally {
             broker.stop();
+            broker.waitUntilStopped();
         }
     }
 

Modified: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml?rev=1150630&r1=1150629&r2=1150630&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml (original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml Mon Jul 25 11:08:52 2011
@@ -69,6 +69,7 @@
     
     <transportConnectors>
       <transportConnector name="stomp+ssl"   uri="stomp+ssl://localhost:61612"/>
+      <transportConnector name="stomp+nio+ssl"   uri="stomp+nio+ssl://localhost:61613"/>
     </transportConnectors>