You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/03 21:08:27 UTC

svn commit: r781510 - in /activemq/sandbox/activemq-flow: activemq-bio/ activemq-bio/src/main/java/org/apache/activemq/ activemq-bio/src/main/java/org/apache/activemq/transport/tcp/ activemq-broker/src/main/java/org/apache/activemq/broker/ activemq-bro...

Author: chirino
Date: Wed Jun  3 19:08:26 2009
New Revision: 781510

URL: http://svn.apache.org/viewvc?rev=781510&view=rev
Log:
applied patch at: http://issues.apache.org/activemq/browse/AMQ-2279
Thx colin!

Modified:
    activemq/sandbox/activemq-flow/activemq-bio/pom.xml
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/ActiveMQSslConnectionFactory.java
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/ResponseHolder.java
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
    activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-flow/activemq-openwire/pom.xml
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
    activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
    activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
    activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/TreeMap.java
    activemq/sandbox/activemq-flow/webgen/README
    activemq/sandbox/activemq-flow/webgen/src/architecture.page
    activemq/sandbox/activemq-flow/webgen/src/metainfo
    activemq/sandbox/activemq-flow/webgen/src/todo.page

Modified: activemq/sandbox/activemq-flow/activemq-bio/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/pom.xml?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-bio/pom.xml Wed Jun  3 19:08:26 2009
@@ -38,8 +38,14 @@
       <artifactId>activemq-transport</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-client</artifactId>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
     </dependency>
 
   <!--   In case we want to look at mina..
@@ -58,17 +64,7 @@
     </dependency>
 -->
     
-    <!-- Testing Dependencies -->    
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>log4j</groupId>
-      <artifactId>log4j</artifactId>
-      <scope>test</scope>
-    </dependency>
+    <!-- Testing Dependencies -->
 
   </dependencies>
 

Modified: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/ActiveMQSslConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/ActiveMQSslConnectionFactory.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/ActiveMQSslConnectionFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/ActiveMQSslConnectionFactory.java Wed Jun  3 19:08:26 2009
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq;
-
-import java.security.SecureRandom;
-import javax.jms.JMSException;
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.TrustManager;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.tcp.SslTransportFactory;
-import org.apache.activemq.util.JMSExceptionSupport;
-
-/**
- * An ActiveMQConnectionFactory that allows access to the key and trust managers
- * used for SslConnections. There is no reason to use this class unless SSL is
- * being used AND the key and trust managers need to be specified from within
- * code. In fact, if the URI passed to this class does not have an "ssl" scheme,
- * this class will pass all work on to its superclass.
- * 
- * @author sepandm@gmail.com
- */
-public class ActiveMQSslConnectionFactory extends ActiveMQConnectionFactory {
-    // The key and trust managers used to initialize the used SSLContext.
-    protected KeyManager[] keyManager;
-    protected TrustManager[] trustManager;
-    protected SecureRandom secureRandom;
-
-    /**
-     * Sets the key and trust managers used when creating SSL connections.
-     * 
-     * @param km The KeyManagers used.
-     * @param tm The TrustManagers used.
-     * @param random The SecureRandom number used.
-     */
-    public void setKeyAndTrustManagers(final KeyManager[] km, final TrustManager[] tm, final SecureRandom random) {
-        keyManager = km;
-        trustManager = tm;
-        secureRandom = random;
-    }
-
-    /**
-     * Overriding to make special considerations for SSL connections. If we are
-     * not using SSL, the superclass's method is called. If we are using SSL, an
-     * SslConnectionFactory is used and it is given the needed key and trust
-     * managers.
-     * 
-     * @author sepandm@gmail.com
-     */
-    protected Transport createTransport() throws JMSException {
-        // If the given URI is non-ssl, let superclass handle it.
-        if (!brokerURL.getScheme().equals("ssl")) {
-            return super.createTransport();
-        }
-
-        try {
-            SslTransportFactory sslFactory = new SslTransportFactory();
-            sslFactory.setKeyAndTrustManagers(keyManager, trustManager, secureRandom);
-            return sslFactory.doConnect(brokerURL);
-        } catch (Exception e) {
-            throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
-        }
-    }
-}

Modified: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/ResponseHolder.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/ResponseHolder.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/ResponseHolder.java (original)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/ResponseHolder.java Wed Jun  3 19:08:26 2009
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.tcp;
-
-import org.apache.activemq.command.Response;
-
-/**
- * ResponseHolder utility
- * 
- * @version $Revision: 1.1.1.1 $
- */
-public class ResponseHolder {
-    protected Response response;
-    protected Object lock = new Object();
-    protected boolean notified;
-
-    /**
-     * Construct a receipt holder
-     */
-    public ResponseHolder() {
-    }
-
-    /**
-     * Set the Response for this holder
-     * 
-     * @param r
-     */
-    public void setResponse(Response r) {
-        synchronized (lock) {
-            this.response = r;
-            notified = true;
-            lock.notify();
-        }
-    }
-
-    /**
-     * Get the Response
-     * 
-     * @return the Response or null if it is closed
-     */
-    public Response getResponse() {
-        return getResponse(0);
-    }
-
-    /**
-     * wait upto <Code>timeout</Code> timeout ms to get a receipt
-     * 
-     * @param timeout
-     * @return
-     */
-    public Response getResponse(int timeout) {
-        synchronized (lock) {
-            if (!notified) {
-                try {
-                    lock.wait(timeout);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                }
-            }
-        }
-        return this.response;
-    }
-
-    /**
-     * close this holder
-     */
-    public void close() {
-        synchronized (lock) {
-            notified = true;
-            lock.notifyAll();
-        }
-    }
-}

Modified: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java Wed Jun  3 19:08:26 2009
@@ -27,8 +27,8 @@
 import javax.net.ssl.SSLSocket;
 import javax.net.ssl.SSLSocketFactory;
 
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionInfo;
+//import org.apache.activemq.command.Command;
+//import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.wireformat.WireFormat;
 
 /**
@@ -45,20 +45,27 @@
     /**
      * Connect to a remote node such as a Broker.
      * 
-     * @param wireFormat The WireFormat to be used.
-     * @param socketFactory The socket factory to be used. Forcing SSLSockets
-     *                for obvious reasons.
-     * @param remoteLocation The remote location.
-     * @param localLocation The local location.
-     * @param needClientAuth If set to true, the underlying socket will need
-     *                client certificate authentication.
-     * @throws UnknownHostException If TcpTransport throws.
-     * @throws IOException If TcpTransport throws.
+     * @param wireFormat
+     *            The WireFormat to be used.
+     * @param socketFactory
+     *            The socket factory to be used. Forcing SSLSockets for obvious
+     *            reasons.
+     * @param remoteLocation
+     *            The remote location.
+     * @param localLocation
+     *            The local location.
+     * @param needClientAuth
+     *            If set to true, the underlying socket will need client
+     *            certificate authentication.
+     * @throws UnknownHostException
+     *             If TcpTransport throws.
+     * @throws IOException
+     *             If TcpTransport throws.
      */
     public SslTransport(WireFormat wireFormat, SSLSocketFactory socketFactory, URI remoteLocation, URI localLocation, boolean needClientAuth) throws IOException {
         super(wireFormat, socketFactory, remoteLocation, localLocation);
         if (this.socket != null) {
-            ((SSLSocket)this.socket).setNeedClientAuth(needClientAuth);
+            ((SSLSocket) this.socket).setNeedClientAuth(needClientAuth);
         }
     }
 
@@ -66,9 +73,12 @@
      * Initialize from a ServerSocket. No access to needClientAuth is given
      * since it is already set within the provided socket.
      * 
-     * @param wireFormat The WireFormat to be used.
-     * @param socket The Socket to be used. Forcing SSL.
-     * @throws IOException If TcpTransport throws.
+     * @param wireFormat
+     *            The WireFormat to be used.
+     * @param socket
+     *            The Socket to be used. Forcing SSL.
+     * @throws IOException
+     *             If TcpTransport throws.
      */
     public SslTransport(WireFormat wireFormat, SSLSocket socket) throws IOException {
         super(wireFormat, socket);
@@ -78,30 +88,35 @@
      * Overriding in order to add the client's certificates to ConnectionInfo
      * Commmands.
      * 
-     * @param command The Command coming in.
-     */
+     * @param command
+     *            The Command coming in.
+     
     public void doConsume(Object command) {
         // The instanceof can be avoided, but that would require modifying the
         // Command clas tree and that would require too much effort right
         // now.
-        if (command instanceof ConnectionInfo) {
-            ConnectionInfo connectionInfo = (ConnectionInfo)command;
-
-            SSLSocket sslSocket = (SSLSocket)this.socket;
-
-            SSLSession sslSession = sslSocket.getSession();
-
-            X509Certificate[] clientCertChain;
-            try {
-                clientCertChain = (X509Certificate[])sslSession.getPeerCertificates();
-            } catch (SSLPeerUnverifiedException e) {
-                clientCertChain = null;
-            }
-
-            connectionInfo.setTransportContext(clientCertChain);
-        }
+        //        if (command instanceof ConnectionInfo) {
+        //            ConnectionInfo connectionInfo = (ConnectionInfo)command;
+        //
+        //            SSLSocket sslSocket = (SSLSocket)this.socket;
+        //
+        //            SSLSession sslSession = sslSocket.getSession();
+        //
+        //            X509Certificate[] clientCertChain;
+        //            try {
+        //                clientCertChain = (X509Certificate[])sslSession.getPeerCertificates();
+        //            } catch (SSLPeerUnverifiedException e) {
+        //                clientCertChain = null;
+        //            }
+        //
+        //            connectionInfo.setTransportContext(clientCertChain);
+        //        }
 
         super.doConsume(command);
+    }*/
+
+    public X509Certificate[] getPeerCertificateChain() throws SSLPeerUnverifiedException {
+        return (X509Certificate[]) ((SSLSocket) this.socket).getSession().getPeerCertificates();
     }
 
     /**

Modified: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java Wed Jun  3 19:08:26 2009
@@ -32,11 +32,8 @@
 import javax.net.ssl.TrustManager;
 
 import org.apache.activemq.broker.SslContext;
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.transport.InactivityMonitor;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.WireFormatNegotiator;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.URISupport;
@@ -100,12 +97,11 @@
 //            }
 //        }
 
-        transport = new InactivityMonitor(transport, format);
+        boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true"));
+        sslTransport.setUseInactivityMonitor(useInactivityMonitor && isUseInactivityMonitor(transport));
+        
 
-        // Only need the WireFormatNegotiator if using openwire
-        if (format instanceof OpenWireFormat) {
-            transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, sslTransport.getMinmumWireFormatVersion());
-        }
+        transport = format.createTransportFilters(transport, options);
 
         return transport;
     }

Modified: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jun  3 19:08:26 2009
@@ -39,19 +39,20 @@
 import javax.net.SocketFactory;
 
 import org.apache.activemq.Service;
-import org.apache.activemq.transport.Transport;
-//import org.apache.activemq.transport.TransportLoggerFactory;
+import org.apache.activemq.transport.Transport; //import org.apache.activemq.transport.TransportLoggerFactory;
 import org.apache.activemq.transport.TransportThreadSupport;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * An implementation of the {@link Transport} interface using raw tcp/ip
  * 
- * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
+ * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging
+ *         improvement modifications)
  * @version $Revision$
  */
 public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
@@ -65,48 +66,50 @@
     protected int soTimeout;
     protected int socketBufferSize = 64 * 1024;
     protected int ioBufferSize = 8 * 1024;
-    protected boolean closeAsync=true;
+    protected boolean closeAsync = true;
     protected Socket socket;
     protected DataOutputStream dataOut;
     protected DataInputStream dataIn;
     protected TcpBufferedOutputStream buffOut = null;
+
+    private static final boolean ASYNC_WRITE = false;
     /**
-     * trace=true -> the Transport stack where this TcpTransport
-     * object will be, will have a TransportLogger layer
-     * trace=false -> the Transport stack where this TcpTransport
-     * object will be, will NOT have a TransportLogger layer, and therefore
-     * will never be able to print logging messages.
-     * This parameter is most probably set in Connection or TransportConnector URIs.
+     * trace=true -> the Transport stack where this TcpTransport object will be,
+     * will have a TransportLogger layer trace=false -> the Transport stack
+     * where this TcpTransport object will be, will NOT have a TransportLogger
+     * layer, and therefore will never be able to print logging messages. This
+     * parameter is most probably set in Connection or TransportConnector URIs.
      */
     protected boolean trace = false;
-//    /**
-//     * Name of the LogWriter implementation to use.
-//     * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
-//     * This parameter is most probably set in Connection or TransportConnector URIs.
-//     */
-//    protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
-    /**
-     * Specifies if the TransportLogger will be manageable by JMX or not.
-     * Also, as long as there is at least 1 TransportLogger which is manageable,
-     * a TransportLoggerControl MBean will me created.
+    /**
+     * Name of the LogWriter implementation to use. Names are mapped to classes
+     * in the
+     * resources/META-INF/services/org/apache/activemq/transport/logwriters
+     * directory. This parameter is most probably set in Connection or
+     * TransportConnector URIs.
+     */
+    //    protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
+    /**
+     * Specifies if the TransportLogger will be manageable by JMX or not. Also,
+     * as long as there is at least 1 TransportLogger which is manageable, a
+     * TransportLoggerControl MBean will me created.
      */
     protected boolean dynamicManagement = false;
     /**
      * startLogging=true -> the TransportLogger object of the Transport stack
-     * will initially write messages to the log.
-     * startLogging=false -> the TransportLogger object of the Transport stack
-     * will initially NOT write messages to the log.
-     * This parameter only has an effect if trace == true.
-     * This parameter is most probably set in Connection or TransportConnector URIs.
+     * will initially write messages to the log. startLogging=false -> the
+     * TransportLogger object of the Transport stack will initially NOT write
+     * messages to the log. This parameter only has an effect if trace == true.
+     * This parameter is most probably set in Connection or TransportConnector
+     * URIs.
      */
     protected boolean startLogging = true;
     /**
-     * Specifies the port that will be used by the JMX server to manage
-     * the TransportLoggers.
-     * This should only be set in an URI by a client (producer or consumer) since
-     * a broker will already create a JMX server.
-     * It is useful for people who test a broker and clients in the same machine
-     * and want to control both via JMX; a different port will be needed.
+     * Specifies the port that will be used by the JMX server to manage the
+     * TransportLoggers. This should only be set in an URI by a client (producer
+     * or consumer) since a broker will already create a JMX server. It is
+     * useful for people who test a broker and clients in the same machine and
+     * want to control both via JMX; a different port will be needed.
      */
     protected int jmxPort = 1099;
     protected boolean useLocalHost = true;
@@ -119,18 +122,20 @@
     private Boolean tcpNoDelay;
     private Thread runnerThread;
 
+    protected boolean useActivityMonitor;
+
     /**
      * Connect to a remote Node - e.g. a Broker
      * 
      * @param wireFormat
      * @param socketFactory
      * @param remoteLocation
-     * @param localLocation - e.g. local InetAddress and local port
+     * @param localLocation
+     *            - e.g. local InetAddress and local port
      * @throws IOException
      * @throws UnknownHostException
      */
-    public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
-                        URI localLocation) throws UnknownHostException, IOException {
+    public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
         this.wireFormat = wireFormat;
         this.socketFactory = socketFactory;
         try {
@@ -158,13 +163,48 @@
         setDaemon(true);
     }
 
+    LinkedBlockingQueue<Object> outbound = new LinkedBlockingQueue<Object>();
+    private Thread onewayThread;
+
     /**
      * A one way asynchronous send
      */
     public void oneway(Object command) throws IOException {
         checkStarted();
-        wireFormat.marshal(command, dataOut);
-        dataOut.flush();
+        try {
+            if (ASYNC_WRITE) {
+                outbound.put(command);
+            } else {
+                wireFormat.marshal(command, dataOut);
+                dataOut.flush();
+            }
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException();
+        }
+    }
+
+    protected void sendOneways() {
+        try {
+            LOG.debug("Started oneway thead");
+            while (!isStopped()) {
+                Object command = outbound.poll(500, TimeUnit.MILLISECONDS);
+                if (command != null) {
+                    try {
+                        // int count=0;
+                        while (command != null) {
+                            wireFormat.marshal(command, dataOut);
+                            // count++;
+                            command = outbound.poll();
+                        }
+                        // System.out.println(count);
+                        dataOut.flush();
+                    } catch (IOException e) {
+                        getTransportListener().onException(e);
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+        }
     }
 
     /**
@@ -179,7 +219,7 @@
      */
     public void run() {
         LOG.trace("TCP consumer thread for " + this + " starting");
-        this.runnerThread=Thread.currentThread();
+        this.runnerThread = Thread.currentThread();
         try {
             while (!isStopped()) {
                 doRun();
@@ -187,12 +227,12 @@
         } catch (IOException e) {
             stoppedLatch.get().countDown();
             onException(e);
-        } catch (Throwable e){
+        } catch (Throwable e) {
             stoppedLatch.get().countDown();
-            IOException ioe=new IOException("Unexpected error occured");
+            IOException ioe = new IOException("Unexpected error occured");
             ioe.initCause(e);
             onException(ioe);
-        }finally {
+        } finally {
             stoppedLatch.get().countDown();
         }
     }
@@ -220,14 +260,22 @@
     public void setTrace(boolean trace) {
         this.trace = trace;
     }
-    
-//    public String getLogWriterName() {
-//        return logWriterName;
-//    }
-//
-//    public void setLogWriterName(String logFormat) {
-//        this.logWriterName = logFormat;
-//    }
+
+    void setUseInactivityMonitor(boolean val) {
+        useActivityMonitor = val;
+    }
+
+    public boolean isUseInactivityMonitor() {
+        return useActivityMonitor;
+    }
+
+    //    public String getLogWriterName() {
+    //        return logWriterName;
+    //    }
+    //
+    //    public void setLogWriterName(String logFormat) {
+    //        this.logWriterName = logFormat;
+    //    }
 
     public boolean isDynamicManagement() {
         return dynamicManagement;
@@ -252,7 +300,7 @@
     public void setJmxPort(int jmxPort) {
         this.jmxPort = jmxPort;
     }
-    
+
     public int getMinmumWireFormatVersion() {
         return minmumWireFormatVersion;
     }
@@ -337,12 +385,13 @@
     }
 
     /**
-     * @param ioBufferSize the ioBufferSize to set
+     * @param ioBufferSize
+     *            the ioBufferSize to set
      */
     public void setIoBufferSize(int ioBufferSize) {
         this.ioBufferSize = ioBufferSize;
     }
-    
+
     /**
      * @return the closeAsync
      */
@@ -351,7 +400,8 @@
     }
 
     /**
-     * @param closeAsync the closeAsync to set
+     * @param closeAsync
+     *            the closeAsync to set
      */
     public void setCloseAsync(boolean closeAsync) {
         this.closeAsync = closeAsync;
@@ -399,6 +449,16 @@
 
     protected void doStart() throws Exception {
         connect();
+        if (ASYNC_WRITE) {
+            onewayThread = new Thread() {
+                @Override
+                public void run() {
+                    sendOneways();
+                }
+            };
+            onewayThread.start();
+        }
+
         stoppedLatch.set(new CountDownLatch(1));
         super.doStart();
     }
@@ -413,8 +473,7 @@
         InetSocketAddress remoteAddress = null;
 
         if (localLocation != null) {
-            localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()),
-                                                 localLocation.getPort());
+            localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
         }
 
         if (remoteLocation != null) {
@@ -442,8 +501,7 @@
             // For SSL sockets.. you can't create an unconnected socket :(
             // This means the timout option are not supported either.
             if (localAddress != null) {
-                socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(),
-                                                    localAddress.getAddress(), localAddress.getPort());
+                socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
             } else {
                 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
             }
@@ -463,31 +521,34 @@
         // closeStreams();
         if (socket != null) {
             if (closeAsync) {
-                //closing the socket can hang also 
+                // closing the socket can hang also
                 final CountDownLatch latch = new CountDownLatch(1);
-                
+
                 SOCKET_CLOSE.execute(new Runnable() {
-    
+
                     public void run() {
                         try {
                             socket.close();
                         } catch (IOException e) {
-                            LOG.debug("Caught exception closing socket",e);
-                        }finally {
+                            LOG.debug("Caught exception closing socket", e);
+                        } finally {
                             latch.countDown();
                         }
                     }
-                    
+
                 });
-                latch.await(1,TimeUnit.SECONDS);
-            }else {
+                latch.await(1, TimeUnit.SECONDS);
+            } else {
                 try {
                     socket.close();
                 } catch (IOException e) {
-                    LOG.debug("Caught exception closing socket",e);
+                    LOG.debug("Caught exception closing socket", e);
                 }
+
+            }
+            if (ASYNC_WRITE) {
+                onewayThread.join();
             }
-           
         }
     }
 
@@ -499,7 +560,7 @@
         super.stop();
         CountDownLatch countDownLatch = stoppedLatch.get();
         if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
-            countDownLatch.await(1,TimeUnit.SECONDS);
+            countDownLatch.await(1, TimeUnit.SECONDS);
         }
     }
 
@@ -529,22 +590,21 @@
         }
         return null;
     }
-    
+
     @Override
     public <T> T narrow(Class<T> target) {
         if (target == Socket.class) {
             return target.cast(socket);
-        } else if ( target == TcpBufferedOutputStream.class) {
+        } else if (target == TcpBufferedOutputStream.class) {
             return target.cast(buffOut);
         }
         return super.narrow(target);
     }
-    
 
     static {
-        SOCKET_CLOSE =   new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+        SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
             public Thread newThread(Runnable runnable) {
-                Thread thread = new Thread(runnable, "TcpSocketClose: "+runnable);
+                Thread thread = new Thread(runnable, "TcpSocketClose: " + runnable);
                 thread.setPriority(Thread.MAX_PRIORITY);
                 thread.setDaemon(true);
                 return thread;

Modified: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Wed Jun  3 19:08:26 2009
@@ -26,13 +26,10 @@
 import javax.net.ServerSocketFactory;
 import javax.net.SocketFactory;
 
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.transport.InactivityMonitor;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 //import org.apache.activemq.transport.TransportLoggerFactory;
 import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.WireFormatNegotiator;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.URISupport;
@@ -87,29 +84,26 @@
         Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
         tcpTransport.setSocketOptions(socketOptions);
         
-//        if (tcpTransport.isTrace()) {
+        if (tcpTransport.isTrace()) {
+            throw new UnsupportedOperationException("Trace not implemented");
 //            try {
 //                transport = TransportLoggerFactory.getInstance().createTransportLogger(transport, tcpTransport.getLogWriterName(),
 //                        tcpTransport.isDynamicManagement(), tcpTransport.isStartLogging(), tcpTransport.getJmxPort());
 //            } catch (Throwable e) {
 //                LOG.error("Could not create TransportLogger object for: " + tcpTransport.getLogWriterName() + ", reason: " + e, e);
 //            }
-//        }
-
-        boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true"));
-        if (useInactivityMonitor && isUseInactivityMonitor(transport)) {
-            transport = new InactivityMonitor(transport, format);
-        }
-
-        // Only need the WireFormatNegotiator if using openwire
-        if (format instanceof OpenWireFormat) {
-            transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
         }
+        
+        boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true"));
+        tcpTransport.setUseInactivityMonitor(useInactivityMonitor && isUseInactivityMonitor(transport));
+        
 
+        transport = format.createTransportFilters(transport, options);
+        
         return transport;
     }
 
-    private String getOption(Map options, String key, String def) {
+    protected String getOption(Map options, String key, String def) {
         String rc = (String) options.remove(key);
         if( rc == null ) {
             rc = def;

Modified: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Wed Jun  3 19:08:26 2009
@@ -35,9 +35,7 @@
 import javax.net.ServerSocketFactory;
 
 import org.apache.activemq.Service;
-import org.apache.activemq.ThreadPriorities;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.openwire.OpenWireFormatFactory;
+//import org.apache.activemq.ThreadPriorities;
 import org.apache.activemq.transport.Transport;
 //import org.apache.activemq.transport.TransportLoggerFactory;
 import org.apache.activemq.transport.TransportServer;
@@ -64,7 +62,7 @@
     private static final Log LOG = LogFactory.getLog(TcpTransportServer.class);
     protected ServerSocket serverSocket;
     protected int backlog = 5000;
-    protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
+    protected WireFormatFactory wireFormatFactory;
     protected final TcpTransportFactory transportFactory;
     protected long maxInactivityDuration = 30000;
     protected long maxInactivityDurationInitalDelay = 10000;
@@ -85,12 +83,14 @@
     protected int socketBufferSize = 64 * 1024;
     protected int connectionTimeout =  30000;
 
-//    /**
-//     * Name of the LogWriter implementation to use.
-//     * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
-//     * This parameter is most probably set in Connection or TransportConnector URIs.
-//     */
-//    protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
+    /**
+     * Name of the LogWriter implementation to use.
+     * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
+     * This parameter is most probably set in Connection or TransportConnector URIs.
+     
+    protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
+    */
+    
     /**
      * Specifies if the TransportLogger will be manageable by JMX or not.
      * Also, as long as there is at least 1 TransportLogger which is manageable,
@@ -175,15 +175,6 @@
         this.wireFormatFactory = wireFormatFactory;
     }
 
-    /**
-     * Associates a broker info with the transport server so that the transport
-     * can do discovery advertisements of the broker.
-     * 
-     * @param brokerInfo
-     */
-    public void setBrokerInfo(BrokerInfo brokerInfo) {
-    }
-
     public long getMaxInactivityDuration() {
         return maxInactivityDuration;
     }
@@ -215,7 +206,7 @@
     public void setTrace(boolean trace) {
         this.trace = trace;
     }
-    
+//    
 //    public String getLogWriterName() {
 //        return logWriterName;
 //    }
@@ -369,7 +360,7 @@
                     "ActiveMQ Transport Server Thread Handler: " + toString(),
                     getStackSize());
             socketHandlerThread.setDaemon(true);
-            socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
+            //socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
             socketHandlerThread.start();
         }
         super.doStart();
@@ -436,29 +427,29 @@
         
     }    
 
-	public int getSoTimeout() {
-		return soTimeout;
-	}
-
-	public void setSoTimeout(int soTimeout) {
-		this.soTimeout = soTimeout;
-	}
-
-	public int getSocketBufferSize() {
-		return socketBufferSize;
-	}
-
-	public void setSocketBufferSize(int socketBufferSize) {
-		this.socketBufferSize = socketBufferSize;
-	}
-
-	public int getConnectionTimeout() {
-		return connectionTimeout;
-	}
-
-	public void setConnectionTimeout(int connectionTimeout) {
-		this.connectionTimeout = connectionTimeout;
-	}
+    public int getSoTimeout() {
+        return soTimeout;
+    }
+
+    public void setSoTimeout(int soTimeout) {
+        this.soTimeout = soTimeout;
+    }
+
+    public int getSocketBufferSize() {
+        return socketBufferSize;
+    }
+
+    public void setSocketBufferSize(int socketBufferSize) {
+        this.socketBufferSize = socketBufferSize;
+    }
+
+    public int getConnectionTimeout() {
+        return connectionTimeout;
+    }
+
+    public void setConnectionTimeout(int connectionTimeout) {
+        this.connectionTimeout = connectionTimeout;
+    }
 
     /**
      * @return the maximumConnections

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java Wed Jun  3 19:08:26 2009
@@ -117,8 +117,9 @@
     private static final int DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD = 100 * 1024 * 1;
     private static final int DEFAULT_DURABLE_QUEUE_RESUME_THRESHOLD = 1;
     // Be default we don't page out elements to disk.
-    private static final int DEFAULT_DURABLE_QUEUE_SIZE = DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
-
+    //private static final int DEFAULT_DURABLE_QUEUE_SIZE = DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
+    private static final int DEFAULT_DURABLE_QUEUE_SIZE = 1024 * 1024 * 10;
+    
     private static final PersistencePolicy<MessageDelivery> DURABLE_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
 
         private static final boolean PAGING_ENABLED = DEFAULT_DURABLE_QUEUE_SIZE > DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Wed Jun  3 19:08:26 2009
@@ -51,7 +51,7 @@
 
     protected final boolean USE_KAHA_DB = true;
     protected final boolean PURGE_STORE = true;
-    protected final boolean PERSISTENT = true;
+    protected final boolean PERSISTENT = false;
     protected final boolean DURABLE = true;
 
     // Set to put senders and consumers on separate brokers.
@@ -479,7 +479,7 @@
             store = StoreFactory.createStore("memory");
         }
 
-        store.setStoreDirectory(new File("sub/test-data/broker-test/" + broker.getName()));
+        store.setStoreDirectory(new File("test-data/broker-test/" + broker.getName()));
         store.setDeleteAllMessages(PURGE_STORE);
         return store;
     }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/pom.xml?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/pom.xml Wed Jun  3 19:08:26 2009
@@ -45,6 +45,10 @@
       <groupId>org.apache.geronimo.specs</groupId>
       <artifactId>geronimo-jms_1.1_spec</artifactId>
     </dependency>
+    <dependency>
+    	<groupId>org.apache.activemq</groupId>
+    	<artifactId>activemq-bio</artifactId>
+    </dependency>
 
     <!-- Testing Dependencies -->
     <dependency>

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Wed Jun  3 19:08:26 2009
@@ -219,7 +219,11 @@
                 }
 
                 public Response processKeepAlive(KeepAliveInfo info) throws Exception {
-                    return ack(command);
+                    if (info.isResponseRequired()) {
+                        info.setResponseRequired(false);
+                        connection.write(info);
+                    }
+                    return null;
                 }
 
                 public Response processFlush(FlushCommand info) throws Exception {

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java Wed Jun  3 19:08:26 2009
@@ -27,6 +27,9 @@
 import org.apache.activemq.command.CommandTypes;
 import org.apache.activemq.command.DataStructure;
 import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.InactivityMonitor;
+import org.apache.activemq.transport.WireFormatNegotiator;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.ByteSequenceData;
 import org.apache.activemq.util.DataByteArrayInputStream;
@@ -62,7 +65,7 @@
     private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
     private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
     private WireFormatInfo preferedWireFormatInfo;
-    
+
     private AtomicBoolean receivingMessage = new AtomicBoolean(false);
 
     public OpenWireFormat() {
@@ -74,10 +77,8 @@
     }
 
     public int hashCode() {
-        return version ^ (cacheEnabled ? 0x10000000 : 0x20000000)
-               ^ (stackTraceEnabled ? 0x01000000 : 0x02000000)
-               ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000)
-               ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000);
+        return version ^ (cacheEnabled ? 0x10000000 : 0x20000000) ^ (stackTraceEnabled ? 0x01000000 : 0x02000000) ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000)
+                ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000);
     }
 
     public OpenWireFormat copy() {
@@ -96,16 +97,14 @@
         if (object == null) {
             return false;
         }
-        OpenWireFormat o = (OpenWireFormat)object;
-        return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled
-               && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled
-               && o.sizePrefixDisabled == sizePrefixDisabled;
+        OpenWireFormat o = (OpenWireFormat) object;
+        return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled
+                && o.sizePrefixDisabled == sizePrefixDisabled;
     }
 
-
     public String toString() {
-        return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
-               + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + "}";
+        return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled=" + tightEncodingEnabled
+                + ", sizePrefixDisabled=" + sizePrefixDisabled + "}";
         // return "OpenWireFormat{id="+id+",
         // tightEncodingEnabled="+tightEncodingEnabled+"}";
     }
@@ -120,12 +119,12 @@
             runMarshallCacheEvictionSweep();
         }
 
-//        MarshallAware ma = null;
-//        // If not using value caching, then the marshaled form is always the
-//        // same
-//        if (!cacheEnabled && ((DataStructure)command).isMarshallAware()) {
-//            ma = (MarshallAware)command;
-//        }
+        //        MarshallAware ma = null;
+        //        // If not using value caching, then the marshaled form is always the
+        //        // same
+        //        if (!cacheEnabled && ((DataStructure)command).isMarshallAware()) {
+        //            ma = (MarshallAware)command;
+        //        }
 
         ByteSequence sequence = null;
         // if( ma!=null ) {
@@ -137,9 +136,9 @@
             int size = 1;
             if (command != null) {
 
-                DataStructure c = (DataStructure)command;
+                DataStructure c = (DataStructure) command;
                 byte type = c.getDataStructureType();
-                DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+                DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
                 if (dsm == null) {
                     throw new IOException("Unknown data type: " + type);
                 }
@@ -162,8 +161,8 @@
                     bytesOut.restart();
                     if (!sizePrefixDisabled) {
                         bytesOut.writeInt(0); // we don't know the final size
-                                                // yet but write this here for
-                                                // now.
+                        // yet but write this here for
+                        // now.
                     }
                     bytesOut.writeByte(type);
                     dsm.looseMarshal(this, c, bytesOut);
@@ -220,9 +219,9 @@
         int size = 1;
         if (o != null) {
 
-            DataStructure c = (DataStructure)o;
+            DataStructure c = (DataStructure) o;
             byte type = c.getDataStructureType();
-            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+            DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
             if (dsm == null) {
                 throw new IOException("Unknown data type: " + type);
             }
@@ -260,7 +259,7 @@
 
         } else {
             if (!sizePrefixDisabled) {
-            	dataOut.writeInt(size);
+                dataOut.writeInt(size);
             }
             dataOut.writeByte(NULL_TYPE);
         }
@@ -285,9 +284,9 @@
     public int tightMarshal1(Object o, BooleanStream bs) throws IOException {
         int size = 1;
         if (o != null) {
-            DataStructure c = (DataStructure)o;
+            DataStructure c = (DataStructure) o;
             byte type = c.getDataStructureType();
-            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+            DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
             if (dsm == null) {
                 throw new IOException("Unknown data type: " + type);
             }
@@ -308,9 +307,9 @@
         }
 
         if (o != null) {
-            DataStructure c = (DataStructure)o;
+            DataStructure c = (DataStructure) o;
             byte type = c.getDataStructureType();
-            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+            DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
             if (dsm == null) {
                 throw new IOException("Unknown data type: " + type);
             }
@@ -332,21 +331,13 @@
         try {
             mfClass = Class.forName(mfName, false, getClass().getClassLoader());
         } catch (ClassNotFoundException e) {
-            throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: " + version
-                                                                         + ", could not load " + mfName)
-                .initCause(e);
+            throw (IllegalArgumentException) new IllegalArgumentException("Invalid version: " + version + ", could not load " + mfName).initCause(e);
         }
         try {
-            Method method = mfClass.getMethod("createMarshallerMap", new Class[] {OpenWireFormat.class});
-            dataMarshallers = (DataStreamMarshaller[])method.invoke(null, new Object[] {this});
+            Method method = mfClass.getMethod("createMarshallerMap", new Class[] { OpenWireFormat.class });
+            dataMarshallers = (DataStreamMarshaller[]) method.invoke(null, new Object[] { this });
         } catch (Throwable e) {
-            throw (IllegalArgumentException)new IllegalArgumentException(
-                                                                         "Invalid version: "
-                                                                             + version
-                                                                             + ", "
-                                                                             + mfName
-                                                                             + " does not properly implement the createMarshallerMap method.")
-                .initCause(e);
+            throw (IllegalArgumentException) new IllegalArgumentException("Invalid version: " + version + ", " + mfName + " does not properly implement the createMarshallerMap method.").initCause(e);
         }
         this.version = version;
     }
@@ -355,7 +346,7 @@
         byte dataType = dis.readByte();
         receivingMessage.set(true);
         if (dataType != NULL_TYPE) {
-            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
+            DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
             if (dsm == null) {
                 throw new IOException("Unknown data type: " + dataType);
             }
@@ -396,15 +387,14 @@
         }
 
         byte type = o.getDataStructureType();
-        DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+        DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
         if (dsm == null) {
             throw new IOException("Unknown data type: " + type);
         }
         return 1 + dsm.tightMarshal1(this, o, bs);
     }
 
-    public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs)
-        throws IOException {
+    public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs) throws IOException {
         if (!bs.readBoolean()) {
             return;
         }
@@ -423,7 +413,7 @@
 
         } else {
 
-            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+            DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
             if (dsm == null) {
                 throw new IOException("Unknown data type: " + type);
             }
@@ -436,7 +426,7 @@
         if (bs.readBoolean()) {
 
             byte dataType = dis.readByte();
-            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
+            DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
             if (dsm == null) {
                 throw new IOException("Unknown data type: " + dataType);
             }
@@ -469,7 +459,7 @@
         if (dis.readBoolean()) {
 
             byte dataType = dis.readByte();
-            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
+            DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
             if (dsm == null) {
                 throw new IOException("Unknown data type: " + dataType);
             }
@@ -487,7 +477,7 @@
         if (o != null) {
             byte type = o.getDataStructureType();
             dataOut.writeByte(type);
-            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+            DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
             if (dsm == null) {
                 throw new IOException("Unknown data type: " + type);
             }
@@ -529,7 +519,7 @@
         } else {
             // Use -1 to indicate that the value was not cached due to cache
             // being full.
-            return new Short((short)-1);
+            return new Short((short) -1);
         }
     }
 
@@ -595,9 +585,9 @@
     public WireFormatInfo getPreferedWireFormatInfo() {
         return preferedWireFormatInfo;
     }
-    
+
     public boolean inReceive() {
-    	return receivingMessage.get();
+        return receivingMessage.get();
     }
 
     public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
@@ -618,12 +608,10 @@
         this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled();
         info.setCacheEnabled(this.cacheEnabled);
 
-        this.tightEncodingEnabled = info.isTightEncodingEnabled()
-                                    && preferedWireFormatInfo.isTightEncodingEnabled();
+        this.tightEncodingEnabled = info.isTightEncodingEnabled() && preferedWireFormatInfo.isTightEncodingEnabled();
         info.setTightEncodingEnabled(this.tightEncodingEnabled);
 
-        this.sizePrefixDisabled = info.isSizePrefixDisabled()
-                                  && preferedWireFormatInfo.isSizePrefixDisabled();
+        this.sizePrefixDisabled = info.isSizePrefixDisabled() && preferedWireFormatInfo.isSizePrefixDisabled();
         info.setSizePrefixDisabled(this.sizePrefixDisabled);
 
         if (cacheEnabled) {
@@ -656,4 +644,14 @@
         }
         return version2;
     }
+
+    public Transport createTransportFilters(Transport transport, Map options) {
+
+        if (transport.isUseInactivityMonitor()) {
+            transport = new InactivityMonitor(transport, this);
+        }
+
+        transport = new WireFormatNegotiator(transport, this, 1);
+        return transport;
+    }
 }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed Jun  3 19:08:26 2009
@@ -655,6 +655,11 @@
         return true;
     }
     
+    public boolean isUseInactivityMonitor() {
+        //this is up to the underlying transport:
+        return false;
+    }
+    
    final boolean doReconnect() {
         Exception failure = null;
         synchronized (reconnectMutex) {

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java Wed Jun  3 19:08:26 2009
@@ -275,7 +275,7 @@
 
         private boolean paused;
 
-        public Cursor(CursoredQueue<V> queue, String name, boolean skipAcquired, boolean pageInElements, IFlowController<QueueElement<V>> memoryController) {
+        private Cursor(CursoredQueue<V> queue, String name, boolean skipAcquired, boolean pageInElements, IFlowController<QueueElement<V>> memoryController) {
             this.name = name;
             this.queue = queue.queue;
             this.loader = queue.loader;
@@ -718,6 +718,8 @@
         int softRefs = 0;
 
         // Indicates whether this element is loaded or a placeholder:
+        // A place holder indicates that one or more elements is paged
+        // out at and above this sequence number.
         boolean loaded = true;
 
         // Indicates that we have requested a save for the element
@@ -765,6 +767,7 @@
 
         public final void addHardRef() {
             hardRefs++;
+
             // Page in the element (providing it wasn't removed):
             if (elem == null && !deleted) {
                 // If this is the first request for this
@@ -877,6 +880,7 @@
                 // If deleted unlink this element from the queue, and link
                 // together adjacent paged out entries:
                 if (deleted) {
+                    loaded = false;
                     unlink();
                     // If both next and previous entries are unloaded,
                     // then collapse them:
@@ -1363,8 +1367,6 @@
             // in memory:
             if (persistencePolicy.isPagingEnabled()) {
 
-                // Otherwise check with any other open cursor to see if
-                // it can hang on to the element:
                 Collection<Cursor<V>> active = null;
 
                 active = reservedBlocks.get(qe.restoreBlock);
@@ -1423,11 +1425,24 @@
                     QueueElement<V> tail = queue.getTail();
                     // If we're at the end of the queue we don't need to
                     // load if we never paged out the block:
-                    if (tail != null && tail.isLoaded() && tail.restoreBlock == block && tail.isFirstInBlock()) {
-                        load = false;
-                    } else {
-                        load |= pageOutPlaceHolders;
+                    //                    if (tail != null && tail.isLoaded() && tail.restoreBlock == block && tail.isFirstInBlock()) {
+                    //                        load = false;
+                    //                    } else {
+                    //Otherwise add a soft ref for all element in the block that are already
+                    //loaded. 
+                    QueueElement<V> qe = queue.upper(RESTORE_BLOCK_SIZE * block, true);
+                    while (qe != null && qe.restoreBlock == block) {
+                        QueueElement<V> next = qe.getNext();
+                        if (qe.isLoaded()) {
+                            qe.addSoftRef();
+                        } else {
+                            //If we find an unloaded element
+                            //we'll need to load:
+                            load = true;
+                        }
+                        qe = next;
                     }
+                    //                    }
                 }
             }
             cursors.add(cursor);
@@ -1460,11 +1475,7 @@
                             QueueElement<V> qe = queue.upper(RESTORE_BLOCK_SIZE * block, true);
                             while (qe != null && qe.restoreBlock == block) {
                                 QueueElement<V> next = qe.getNext();
-                                // If we page out place holders, release the
-                                // soft ref we added when we loaded the element:
-                                if (pageOutPlaceHolders) {
-                                    qe.releaseSoftRef();
-                                }
+                                qe.releaseSoftRef();
                                 qe = next;
                             }
                         }

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java Wed Jun  3 19:08:26 2009
@@ -5,19 +5,21 @@
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import org.apache.activemq.flow.Commands.Destination.DestinationBean;
 import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
 import org.apache.activemq.flow.Commands.FlowControl.FlowControlBean;
 import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
 import org.apache.activemq.flow.Commands.Message.MessageBean;
+import org.apache.activemq.transport.Transport;
 import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.StatefulWireFormat;
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.activemq.wireformat.WireFormatFactory;
-import org.apache.activemq.util.DataByteArrayInputStream;
-import org.apache.activemq.util.DataByteArrayOutputStream;
 
 public class Proto2WireFormatFactory implements WireFormatFactory {
 
@@ -262,6 +264,10 @@
             DataByteArrayInputStream is = new DataByteArrayInputStream(data);
             return unmarshal(is);
         }
+        
+        public Transport createTransportFilters(Transport transport, Map options) {
+           return transport;
+        }
     }
 
 	public WireFormat createWireFormat() {

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java Wed Jun  3 19:08:26 2009
@@ -5,18 +5,20 @@
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
 import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
 import org.apache.activemq.flow.Commands.Message.MessageBuffer;
 import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.transport.Transport;
 import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.StatefulWireFormat;
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.activemq.wireformat.WireFormatFactory;
-import org.apache.activemq.util.DataByteArrayInputStream;
-import org.apache.activemq.util.DataByteArrayOutputStream;
 
 public class ProtoWireFormatFactory implements WireFormatFactory {
 
@@ -234,6 +236,10 @@
             DataByteArrayInputStream is = new DataByteArrayInputStream(data);
             return unmarshal(is);
         }
+
+        public Transport createTransportFilters(Transport transport, Map options) {
+           return transport;
+        }
     }
 
 	public WireFormat createWireFormat() {

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java Wed Jun  3 19:08:26 2009
@@ -7,7 +7,9 @@
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.util.Map;
 
+import org.apache.activemq.transport.Transport;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
@@ -57,6 +59,10 @@
         public Object unmarshal(ByteSequence data) throws IOException {
             return null;
         }
+
+        public Transport createTransportFilters(Transport transport, Map options) {
+           return transport;
+        }
     }
 
 	public WireFormat createWireFormat() {

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java Wed Jun  3 19:08:26 2009
@@ -138,6 +138,15 @@
     boolean isFaultTolerant();
     
     /**
+     * Indicates that the transport needs inactivity monitoring. This 
+     * is true for transports like tcp that may not otherwise detect
+     * a transport failure in a timely fashion. 
+     * 
+     * @return true if the transport requires inactivity monitoring.
+     */
+    boolean isUseInactivityMonitor();
+    
+    /**
      * @return true if the transport is disposed
      */
     boolean isDisposed();

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java Wed Jun  3 19:08:26 2009
@@ -137,4 +137,8 @@
 	public void reconnect(URI uri) throws IOException {
 		next.reconnect(uri);
 	}
+
+    public boolean isUseInactivityMonitor() {
+        return next.isUseInactivityMonitor();
+    }
 }

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java Wed Jun  3 19:08:26 2009
@@ -18,7 +18,7 @@
 
 import java.net.URI;
 
-import org.apache.activemq.ThreadPriorities;
+//import org.apache.activemq.ThreadPriorities;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -72,7 +72,7 @@
         LOG.info("Listening for connections at: " + getConnectURI());
         runner = new Thread(null, this, "ActiveMQ Transport Server: " + toString(), stackSize);
         runner.setDaemon(daemon);
-        runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT);
+        //runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT);
         runner.start();
     }
 

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java Wed Jun  3 19:08:26 2009
@@ -106,18 +106,21 @@
     public boolean isFaultTolerant() {
         return false;
     }
-    
-   
-	public void reconnect(URI uri) throws IOException {
-		throw new IOException("Not supported");
-	}
-	
-	public boolean isDisposed() {
-		return isStopped();
-	}
-	
-	public  boolean isConnected() {
-	    return isStarted();
-	}
+
+    public boolean isUseInactivityMonitor() {
+        return false;
+    }
+
+    public void reconnect(URI uri) throws IOException {
+        throw new IOException("Not supported");
+    }
+
+    public boolean isDisposed() {
+        return isStopped();
+    }
+
+    public boolean isConnected() {
+        return isStarted();
+    }
 
 }

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Wed Jun  3 19:08:26 2009
@@ -29,7 +29,6 @@
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.activemq.wireformat.WireFormatFactory;
 
-
 public class PipeTransportFactory extends TransportFactory {
 
     private final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
@@ -155,6 +154,10 @@
             return false;
         }
 
+        public boolean isUseInactivityMonitor() {
+            return false;
+        }
+
         public <T> T narrow(Class<T> target) {
             if (target.isAssignableFrom(getClass())) {
                 return target.cast(this);

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java Wed Jun  3 19:08:26 2009
@@ -23,7 +23,10 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Map;
 
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
@@ -136,6 +139,9 @@
                 maxHeaderLength = Math.max( maxHeaderLength, wff.maxWireformatHeaderLength());
             }
         }
+        public Transport createTransportFilters(Transport transport, Map options) {
+            return transport;
+        }
     }
         
     public WireFormat createWireFormat() {

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java Wed Jun  3 19:08:26 2009
@@ -24,7 +24,9 @@
 import java.io.InputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
+import java.util.Map;
 
+import org.apache.activemq.transport.Transport;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
@@ -79,7 +81,8 @@
 		// TODO implement the inactivity monitor
 		return false;
 	}
-    
-    
 
+    public Transport createTransportFilters(Transport transport, Map options) {
+        return transport;
+    }
 }

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java Wed Jun  3 19:08:26 2009
@@ -19,7 +19,10 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Map;
 
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.util.ByteSequence;
 
 
@@ -66,4 +69,13 @@
      */
     boolean inReceive();
     
+    /**
+     * Creates any transport filters appropriate for the given wire format:
+     * 
+     * @param transport The transport to filter.
+     * @param options The options with which the transport was created. 
+     * @return Either the given transport or a Transport filter wrapping the onw provided. 
+     */
+    Transport createTransportFilters(Transport transport, Map options);
+    
 }

Modified: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/TreeMap.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/TreeMap.java?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/TreeMap.java (original)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/TreeMap.java Wed Jun  3 19:08:26 2009
@@ -701,7 +701,7 @@
 
         while (currentNode != root && isBlack(currentNode)) {
             if (isLeftChild(currentNode)) {
-                TreeMapNode<K, V> siblingNode = getRight(currentNode.parent);
+                TreeMapNode<K, V> siblingNode = getRight(parent(currentNode));
 
                 if (isRed(siblingNode)) {
                     color(siblingNode, BLACK);
@@ -714,7 +714,7 @@
                 if (isBlack(getLeft(siblingNode)) && isBlack(getRight(siblingNode))) {
                     color(siblingNode, RED);
 
-                    currentNode = currentNode.parent;
+                    currentNode = parent(currentNode);
                 } else {
                     if (isBlack(getRight(siblingNode))) {
                         color(getLeft(siblingNode), BLACK);
@@ -732,7 +732,7 @@
                     currentNode = root;
                 }
             } else {
-                TreeMapNode<K, V> siblingNode = getRight(currentNode);
+                TreeMapNode<K, V> siblingNode = getLeft(parent(currentNode));
 
                 if (isRed(siblingNode)) {
                     color(siblingNode, BLACK);

Modified: activemq/sandbox/activemq-flow/webgen/README
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/webgen/README?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/webgen/README (original)
+++ activemq/sandbox/activemq-flow/webgen/README Wed Jun  3 19:08:26 2009
@@ -10,10 +10,17 @@
 Installing 
 -----------------------------
 
-You first need ruby and rubygems installed on your system.  Then you can 
-install webgen with the following command
+You first need ruby and rubygems installed on your system. 
+You should have gems version of 1.3.4 or greater. You can check this via 
 
-`sudo gem install webgen coderay feedtools haml`
+gem --version
+
+To update gems do 
+gem update --system 
+
+Then you can install webgen with the following command.
+
+`sudo gem install webgen coderay feedtools haml RedCloth`
 
 Building 
 -----------------------------

Modified: activemq/sandbox/activemq-flow/webgen/src/architecture.page
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/webgen/src/architecture.page?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/webgen/src/architecture.page (original)
+++ activemq/sandbox/activemq-flow/webgen/src/architecture.page Wed Jun  3 19:08:26 2009
@@ -0,0 +1,11 @@
+
+---
+title: Architectural Overview
+--- pipeline:textile
+
+h2. Overview
+This page explores some of the details around core components. 
+
+h2. Flow Control
+
+TODO: I've got some more background notes that I need to 

Modified: activemq/sandbox/activemq-flow/webgen/src/metainfo
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/webgen/src/metainfo?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/webgen/src/metainfo (original)
+++ activemq/sandbox/activemq-flow/webgen/src/metainfo Wed Jun  3 19:08:26 2009
@@ -15,5 +15,5 @@
   # Define the project properties here, these can be accessed in the 
   # pages using the {var:} syntax.
   # -------------------------------------------------------------------
-  project_name: "ActiveMQ Puma"
+  project_name: "ActiveMQ Apollo"
   project_slogan: 'The Next Generation of Messaging Middleware'

Modified: activemq/sandbox/activemq-flow/webgen/src/todo.page
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/webgen/src/todo.page?rev=781510&r1=781509&r2=781510&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/webgen/src/todo.page (original)
+++ activemq/sandbox/activemq-flow/webgen/src/todo.page Wed Jun  3 19:08:26 2009
@@ -0,0 +1,8 @@
+---
+title: TODO
+--- pipeline:textile
+
+h2. Tasks
+* InactivityMonitor needs to get inserted for OpenWire on the server side of MultiWireFormat negotiation.
+* BIO SSL Transport does not currently pass Client Certificate chain in OpenWire ConnectionInfo command. This used to be done by the SSL transport but this shouldn't have OpenWire dependencies; instead protocol handler should be able to retrieve it from the underlying transport as needed. 
+