You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:24:44 UTC

svn commit: r961062 [6/14] - in /activemq/sandbox/activemq-apollo-actor: ./ activemq-all/ activemq-all/src/test/ide-resources/ activemq-all/src/test/java/org/apache/activemq/jaxb/ activemq-all/src/test/java/org/apache/activemq/legacy/ activemq-all/src/...

Added: activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala?rev=961062&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala Wed Jul  7 03:24:02 2010
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.ng
+
+import _root_.java.util.{LinkedList, ArrayList}
+import _root_.org.apache.activemq.ng.Stomp
+import java.nio.channels.{SocketChannel}
+import java.nio.ByteBuffer
+import java.io.{EOFException, IOException}
+import _root_.org.apache.activemq.util.buffer._
+import collection.mutable.{ListBuffer, HashMap}
+
+import AsciiBuffer._
+import Stomp._
+import Stomp.Headers._
+
+object StompWireFormat {
+    val READ_BUFFFER_SIZE = 1024*64;
+    val MAX_COMMAND_LENGTH = 1024;
+    val MAX_HEADER_LENGTH = 1024 * 10;
+    val MAX_HEADERS = 1000;
+    val MAX_DATA_LENGTH = 1024 * 1024 * 100;
+    val TRIM=false
+    val SIZE_CHECK=false
+  }
+
+class StompWireFormat {
+  import StompWireFormat._
+
+  implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
+  implicit def wrap(x: Byte) = {
+    ByteBuffer.wrap(Array(x));
+  }
+
+//  var outbound_pos=0
+//  var outbound_limit=0
+//  var outbound_buffers: ListBuffer[ByteBuffer] = new ListBuffer[ByteBuffer]()
+//
+//  /**
+//   * @retruns true if the source has been drained of StompFrame objects and they are fully written to the socket
+//   */
+//  def drain_source(socket:SocketChannel)(source: =>StompFrame ):Boolean = {
+//    while(true) {
+//      // if we have a pending frame that is being sent over the socket...
+//      if( !outbound_buffers.isEmpty ) {
+//
+//        val data = outbound_buffers.toArray
+//
+//        socket.write(data)
+//
+//        // remove all the written buffers...
+//        while( !outbound_buffers.isEmpty && outbound_buffers.head.remaining==0 ) {
+//          outbound_buffers.remove(0)
+//        }
+//
+//        if( !outbound_buffers.isEmpty ) {
+//          // non blocking socket returned before the buffers were fully written to disk..
+//          // we are not yet fully drained.. but need to quit now.
+//          return false
+//        }
+//
+//      } else {
+//
+//        var frame = source
+//        while( frame!=null ) {
+//          marshall(outbound_buffers, frame)
+//          frame = source
+//        }
+//
+//        if( outbound_buffers.size == 0 ) {
+//          // the source is now drained...
+//          return true
+//        }
+//      }
+//    }
+//    true
+//  }
+//
+//  implicit def toByteBuffer(data:AsciiBuffer) = ByteBuffer.wrap(data.data, data.offset, data.length)
+//  implicit def toByteBuffer(data:Buffer) = ByteBuffer.wrap(data.data, data.offset, data.length)
+//
+//  def marshall(buffer:ListBuffer[ByteBuffer], frame:StompFrame) = {
+//    buffer.append(frame.action)
+//    buffer.append(NEWLINE)
+//
+//    // we can optimize a little if the headers and content are in the same buffer..
+//    if( !frame.headers.isEmpty && !frame.content.isEmpty &&
+//            ( frame.headers.getFirst._1.data eq frame.content.data ) ) {
+//      buffer.append(  ByteBuffer.wrap(frame.content.data, frame.headers.getFirst._1.offset, (frame.content.offset-frame.headers.getFirst._1.offset)+ frame.content.length) )
+//
+//    } else {
+//      val i = frame.headers.iterator
+//      while( i.hasNext ) {
+//        val (key, value) = i.next
+//        buffer.append(key)
+//        buffer.append(SEPERATOR)
+//        buffer.append(value)
+//        buffer.append(NEWLINE)
+//      }
+//
+//      buffer.append(NEWLINE)
+//      buffer.append(toByteBuffer(frame.content))
+//    }
+//    buffer.append(toByteBuffer(END_OF_FRAME_BUFFER))
+//  }
+
+  var outbound_frame: ByteBuffer = null
+  /**
+   * @retruns true if the source has been drained of StompFrame objects and they are fully written to the socket
+   */
+  def drain_source(socket:SocketChannel)(source: =>StompFrame ):Boolean = {
+    while(true) {
+      // if we have a pending frame that is being sent over the socket...
+      if( outbound_frame!=null ) {
+        socket.write(outbound_frame)
+        if( outbound_frame.remaining != 0 ) {
+          // non blocking socket returned before the buffers were fully written to disk..
+          // we are not yet fully drained.. but need to quit now.
+          return false
+        } else {
+          outbound_frame = null
+        }
+      } else {
+
+        // marshall all the available frames..
+        val buffer = new ByteArrayOutputStream()
+        var frame = source
+        while( frame!=null ) {
+          marshall(buffer, frame)
+          frame = source
+        }
+
+
+        if( buffer.size() ==0 ) {
+          // the source is now drained...
+          return true
+        } else {
+          val b = buffer.toBuffer;
+          outbound_frame = ByteBuffer.wrap(b.data, b.offset, b.length)
+        }
+      }
+    }
+    true
+  }
+
+  def marshall(buffer:ByteArrayOutputStream, frame:StompFrame) = {
+    buffer.write(frame.action)
+    buffer.write(NEWLINE)
+
+    // we can optimize a little if the headers and content are in the same buffer..
+    if( !frame.headers.isEmpty && !frame.content.isEmpty &&
+            ( frame.headers.getFirst._1.data eq frame.content.data ) ) {
+
+      val offset = frame.headers.getFirst._1.offset;
+      val buffer1 = frame.headers.getFirst._1;
+      val buffer2 = frame.content;
+      val length = (buffer2.offset-buffer1.offset)+buffer2.length
+      buffer.write( buffer1.data, offset, length)
+
+    } else {
+      val i = frame.headers.iterator
+      while( i.hasNext ) {
+        val (key, value) = i.next
+        buffer.write(key)
+        buffer.write(SEPERATOR)
+        buffer.write(value)
+        buffer.write(NEWLINE)
+      }
+
+      buffer.write(NEWLINE)
+      buffer.write(frame.content)
+    }
+    buffer.write(END_OF_FRAME_BUFFER)
+  }
+
+
+  var read_pos = 0
+  var read_offset = 0
+  var read_data:Array[Byte] = new Array[Byte](READ_BUFFFER_SIZE)
+  var read_bytebuffer:ByteBuffer = ByteBuffer.wrap(read_data)
+
+  def drain_socket(socket:SocketChannel)(handler:(StompFrame)=>Boolean) = {
+    var done = false
+
+    // keep going until the socket buffer is drained.
+    while( !done ) {
+      val frame = unmarshall()
+      if( frame!=null ) {
+        // the handler might want us to stop looping..
+        done = handler(frame)
+      } else {
+
+        // do we need to read in more data???
+        if( read_pos==read_bytebuffer.position ) {
+
+          // do we need a new data buffer to read data into??
+          if(read_bytebuffer.remaining==0) {
+
+            // The capacity needed grows by powers of 2...
+            val new_capacity = if( read_offset != 0 ) { READ_BUFFFER_SIZE } else { read_data.length << 2 }
+            val tmp_buffer = new Array[Byte](new_capacity)
+
+            // If there was un-consummed data.. copy it over...
+            val size = read_pos - read_offset
+            if( size > 0 ) {
+              System.arraycopy(read_data, read_offset, tmp_buffer, 0, size)
+            }
+            read_data = tmp_buffer
+            read_bytebuffer = ByteBuffer.wrap(read_data)
+            read_bytebuffer.position(size)
+            read_offset = 0;
+            read_pos = size
+
+          }
+
+          // Try to fill the buffer with data from the nio socket..
+          var p = read_bytebuffer.position
+          if( socket.read(read_bytebuffer) == -1 ) {
+            throw new EOFException();
+          }
+          // we are done if there was no data on the socket.
+          done = read_bytebuffer.position==p
+        }
+      }
+    }
+  }
+
+
+  type FrameReader = ()=>StompFrame
+  var unmarshall:FrameReader = read_action
+
+  def read_line( maxLength:Int, errorMessage:String):Buffer = {
+      val read_limit = read_bytebuffer.position
+      while( read_pos < read_limit ) {
+        if( read_data(read_pos) =='\n') {
+          var rc = new Buffer(read_data, read_offset, read_pos-read_offset)
+          read_pos += 1;
+          read_offset = read_pos;
+          return rc
+        }
+        if (SIZE_CHECK && read_pos-read_offset > maxLength) {
+            throw new IOException(errorMessage);
+        }
+        read_pos += 1;
+      }
+      return null;
+  }
+
+
+  def read_action:FrameReader = ()=> {
+    val line = read_line(MAX_COMMAND_LENGTH, "The maximum command length was exceeded")
+    if( line !=null ) {
+      var action = line
+      if( TRIM ) {
+          action = action.trim();
+      }
+      if (action.length() > 0) {
+          unmarshall = read_headers(action)
+      }
+    }
+    null
+  }
+
+  type HeaderMap = LinkedList[(AsciiBuffer, AsciiBuffer)]
+
+  def read_headers(action:Buffer, headers:HeaderMap=new LinkedList()):FrameReader = ()=> {
+    val line = read_line(MAX_HEADER_LENGTH, "The maximum header length was exceeded")
+    if( line !=null ) {
+      if( line.trim().length() > 0 ) {
+
+        if (SIZE_CHECK && headers.size > MAX_HEADERS) {
+            throw new IOException("The maximum number of headers was exceeded");
+        }
+
+        try {
+            val seperatorIndex = line.indexOf(SEPERATOR);
+            if( seperatorIndex<0 ) {
+                throw new IOException("Header line missing seperator [" + ascii(line) + "]");
+            }
+            var name = line.slice(0, seperatorIndex);
+            if( TRIM ) {
+                name = name.trim();
+            }
+            var value = line.slice(seperatorIndex + 1, line.length());
+            if( TRIM ) {
+                value = value.trim();
+            }
+            headers.add((ascii(name), ascii(value)));
+        } catch {
+            case e:Exception=>
+              throw new IOException("Unable to parser header line [" + line + "]");
+        }
+
+      } else {
+        val contentLength = get(headers, CONTENT_LENGTH)
+        if (contentLength.isDefined) {
+          // Bless the client, he's telling us how much data to read in.
+          var length=0;
+          try {
+              length = Integer.parseInt(contentLength.get.trim().toString());
+          } catch {
+            case e:NumberFormatException=>
+              throw new IOException("Specified content-length is not a valid integer");
+          }
+
+          if (SIZE_CHECK && length > MAX_DATA_LENGTH) {
+              throw new IOException("The maximum data length was exceeded");
+          }
+          unmarshall = read_binary_body(action, headers, length)
+
+        } else {
+          unmarshall = read_text_body(action, headers)
+        }
+      }
+    }
+    null
+  }
+
+  def get(headers:HeaderMap, name:AsciiBuffer):Option[AsciiBuffer] = {
+    val i = headers.iterator
+    while( i.hasNext ) {
+      val entry = i.next
+      if( entry._1 == name ) {
+        return Some(entry._2)
+      }
+    }
+    None
+  }
+  
+
+  def read_binary_body(action:Buffer, headers:HeaderMap, contentLength:Int):FrameReader = ()=> {
+    val content:Buffer=read_content(contentLength)
+    if( content != null ) {
+      unmarshall = read_action
+      new StompFrame(ascii(action), headers, content)
+    } else {
+      null
+    }
+  }
+
+
+  def read_content(contentLength:Int):Buffer = {
+      val read_limit = read_bytebuffer.position
+      if( (read_limit-read_offset) < contentLength+1 ) {
+        read_pos = read_limit;
+        null
+      } else {
+        if( read_data(read_offset+contentLength)!= 0 ) {
+           throw new IOException("Exected null termintor after "+contentLength+" content bytes");
+        }
+        var rc = new Buffer(read_data, read_offset, contentLength)
+        read_pos = read_offset+contentLength+1;
+        read_offset = read_pos;
+        rc;
+      }
+  }
+
+  def read_to_null():Buffer = {
+      val read_limit = read_bytebuffer.position
+      while( read_pos < read_limit ) {
+        if( read_data(read_pos) ==0) {
+          var rc = new Buffer(read_data, read_offset, read_pos-read_offset)
+          read_pos += 1;
+          read_offset = read_pos;
+          return rc;
+        }
+        read_pos += 1;
+      }
+      return null;
+  }
+
+
+  def read_text_body(action:Buffer, headers:HeaderMap):FrameReader = ()=> {
+    val content:Buffer=read_to_null
+    if( content != null ) {
+      unmarshall = read_action
+      new StompFrame(ascii(action), headers, content)
+    } else {
+      null
+    }
+  }
+
+
+
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml Wed Jul  7 03:24:02 2010
@@ -35,9 +35,10 @@
     
     <dependency>
       <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-queue</artifactId>
+      <artifactId>activemq-util</artifactId>
+      <version>6.0-SNAPSHOT</version>
     </dependency>
-        
+
     <!-- Testing Dependencies -->    
     <dependency>
       <groupId>junit</groupId>

Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java (from r946684, activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/QueueDescriptor.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/QueueDescriptor.java&r1=946684&r2=961062&rev=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/QueueDescriptor.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java Wed Jul  7 03:24:02 2010
@@ -14,10 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.queue;
+package org.apache.activemq.broker.store;
 
 import org.apache.activemq.util.buffer.AsciiBuffer;
 
+
 public class QueueDescriptor {
 
     public static final short SHARED = 0;
@@ -73,8 +74,6 @@ public class QueueDescriptor {
     }
 
     /**
-     * @param type
-     *            The type of the queue.
      */
     public short getApplicationType() {
         return applicationType;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java Wed Jul  7 03:24:02 2010
@@ -21,7 +21,7 @@ import java.util.Collection;
 import java.util.Iterator;
 
 import org.apache.activemq.Service;
-import org.apache.activemq.queue.QueueDescriptor;
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 import org.apache.activemq.util.buffer.Buffer;
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Wed Jul  7 03:24:02 2010
@@ -27,11 +27,8 @@ import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.broker.store.Store;
-import org.apache.activemq.broker.store.Store.DuplicateKeyException;
-import org.apache.activemq.broker.store.Store.Session;
-import org.apache.activemq.broker.store.Store.SubscriptionRecord;
-import org.apache.activemq.queue.QueueDescriptor;
 import org.apache.activemq.util.Comparators;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 import org.apache.activemq.util.buffer.Buffer;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java Wed Jul  7 03:24:02 2010
@@ -33,7 +33,7 @@ import org.apache.activemq.broker.store.
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.MetricCounter;
 import org.apache.activemq.metric.Period;
-import org.apache.activemq.queue.QueueDescriptor;
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 import org.apache.activemq.util.buffer.Buffer;
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java Wed Jul  7 03:24:02 2010
@@ -31,7 +31,7 @@ import org.apache.activemq.broker.store.
 import org.apache.activemq.broker.store.Store.Session;
 import org.apache.activemq.broker.store.Store.SubscriptionRecord;
 import org.apache.activemq.broker.store.Store.VoidCallback;
-import org.apache.activemq.queue.QueueDescriptor;
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 import org.apache.activemq.util.buffer.Buffer;
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/pom.xml?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/pom.xml Wed Jul  7 03:24:02 2010
@@ -34,15 +34,23 @@
   <dependencies>
 
     <dependency>
-      <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-dispatcher</artifactId>
-    </dependency>
-    <dependency>
       <groupId>commons-logging</groupId>
       <artifactId>commons-logging</artifactId>
       <version>1.1</version>
     </dependency>
     
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-util</artifactId>
+      <version>6.0-SNAPSHOT</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.fusesource.hawtdispatch</groupId>
+      <artifactId>hawtdispatch</artifactId>
+      <version>${hawtdispatch-version}</version>
+    </dependency>
+
     <!-- Testing Dependencies -->    
     <dependency>
       <groupId>junit</groupId>

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java (from r946684, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java&r1=946684&r2=961062&rev=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java Wed Jul  7 03:24:02 2010
@@ -14,15 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.activemq.dispatch;
+package org.apache.activemq.transport;
 
 /**
- * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public enum DispatchPriority {
-    HIGH,
-    DEFAULT,
-    LOW;
+public interface CompletionCallback {
+    void onCompletion();
+    public void onFailure(Throwable caught);
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompositeTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompositeTransport.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompositeTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompositeTransport.java Wed Jul  7 03:24:02 2010
@@ -19,6 +19,6 @@ package org.apache.activemq.transport;
 import java.net.URI;
 
 public interface CompositeTransport extends Transport {
-    void add(URI[] uris);
-    void remove(URI[] uris);
+    void add(URI...uris);
+    void remove(URI...uris);
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java Wed Jul  7 03:24:02 2010
@@ -41,15 +41,16 @@ public class DefaultTransportListener im
     }
 
     /**
-     * The transport has suffered an interuption from which it hopes to recover
+     * The transport has been connected.
      */
-    public void transportInterupted() {
+    public void onConnected() {
     }
 
     /**
-     * The transport has resumed after an interuption
+     * The transport has suffered a disconnection from
+     * which it hopes to recover
      */
-    public void transportResumed() {
+    public void onDisconnected() {
     }
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/FutureResponse.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/FutureResponse.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/FutureResponse.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/FutureResponse.java Wed Jul  7 03:24:02 2010
@@ -27,10 +27,10 @@ import org.apache.commons.logging.LogFac
 public class FutureResponse<T> {
     private static final Log LOG = LogFactory.getLog(FutureResponse.class);
 
-    private final ResponseCallback<T> responseCallback;
+    private final RequestCallback<T> responseCallback;
     private final ArrayBlockingQueue<T> responseSlot = new ArrayBlockingQueue<T>(1);
 
-    public FutureResponse(ResponseCallback<T> responseCallback) {
+    public FutureResponse(RequestCallback<T> responseCallback) {
         this.responseCallback = responseCallback;
     }
 
@@ -57,7 +57,7 @@ public class FutureResponse<T> {
     public void set(T result) {
         if (responseSlot.offer(result)) {
             if (responseCallback != null) {
-                responseCallback.onCompletion(this);
+                responseCallback.onCompletion(result);
             }
         }
     }

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/RequestCallback.java (from r946684, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/IPizzaService.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/RequestCallback.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/RequestCallback.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/IPizzaService.java&r1=946684&r2=961062&rev=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/IPizzaService.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/RequestCallback.java Wed Jul  7 03:24:02 2010
@@ -14,12 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.activemq.actor;
+package org.apache.activemq.transport;
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public interface IPizzaService {
-    public void order(long count);
-}
\ No newline at end of file
+public interface RequestCallback<T> {
+    void onCompletion(T resp);
+    public void onFailure(Throwable caught);
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java Wed Jul  7 03:24:02 2010
@@ -21,104 +21,65 @@ import java.net.URI;
 
 import org.apache.activemq.Service;
 import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtdispatch.DispatchQueue;
 
 /**
- * Represents the client side of a transport allowing messages to be sent
- * synchronously, asynchronously and consumed.
+ * Represents an abstract connection.  It can be a client side or server side connection.
  * 
- * @version $Revision: 1.5 $
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 public interface Transport extends Service {
 
+    @Deprecated
+    void oneway(Object command);
+
     /**
-     * A one way asynchronous send
+     * A one way asynchronous send.  Once the command is transmitted the callback
+     * is invoked.
      * 
      * @param command
+     * @param callback
      * @throws IOException
      */
-    void oneway(Object command) throws IOException;
+    void oneway(Object command, CompletionCallback callback);
 
     /**
-     * An asynchronous request response where the Receipt will be returned in
-     * the future. If responseCallback is not null, then it will be called when
-     * the response has been completed.
-     * 
-     * @param command
-     * @param responseCallback TODO
-     * @return the FutureResponse
-     * @throws IOException
+     * Returns the current transport listener
+     *
+     * @return
      */
-    <T> FutureResponse<T> asyncRequest(Object command, ResponseCallback<T> responseCallback) throws IOException;
+    TransportListener getTransportListener();
 
     /**
-     * A synchronous request response
-     * 
-     * @param command
-     * @return the response
-     * @throws IOException
+     * Registers an inbound command listener
+     *
+     * @param commandListener
      */
-    Object request(Object command) throws IOException;
+    void setTransportListener(TransportListener commandListener);
 
     /**
-     * A synchronous request response
-     * 
-     * @param command
-     * @param timeout
-     * @return the repsonse or null if timeout
-     * @throws IOException
+     * Returns the dispatch queue used by the transport
+     *
+     * @return
      */
-    Object request(Object command, int timeout) throws IOException;
+    DispatchQueue getDispatchQueue();
 
-    // /**
-    // * A one way asynchronous send
-    // * @param command
-    // * @throws IOException
-    // */
-    // void oneway(Command command) throws IOException;
-    //
-    // /**
-    // * An asynchronous request response where the Receipt will be returned
-    // * in the future. If responseCallback is not null, then it will be called
-    // * when the response has been completed.
-    // *
-    // * @param command
-    // * @param responseCallback TODO
-    // * @return the FutureResponse
-    // * @throws IOException
-    // */
-    // FutureResponse asyncRequest(Command command, ResponseCallback
-    // responseCallback) throws IOException;
-    //    
-    // /**
-    // * A synchronous request response
-    // * @param command
-    // * @return the response
-    // * @throws IOException
-    // */
-    // Response request(Command command) throws IOException;
-    //
-    // /**
-    // * A synchronous request response
-    // * @param command
-    // * @param timeout
-    // * @return the repsonse or null if timeout
-    // * @throws IOException
-    // */
-    // Response request(Command command, int timeout) throws IOException;
+    /**
+     * Sets the dispatch queue used by the transport
+     *
+     * @param queue
+     */
+    void setDispatchQueue(DispatchQueue queue);
 
     /**
-     * Returns the current transport listener
-     * 
-     * @return
+     * suspend delivery of commands.
      */
-    TransportListener getTransportListener();
+    void suspend();
 
     /**
-     * Registers an inbound command listener
-     * 
-     * @param commandListener
+     * resume delivery of commands.
      */
-    void setTransportListener(TransportListener commandListener);
+    void resume();
 
     /**
      * @param target
@@ -137,16 +98,7 @@ public interface Transport extends Servi
      * @return true if fault tolerant
      */
     boolean isFaultTolerant();
-    
-    /**
-     * Indicates that the transport needs inactivity monitoring. This 
-     * is true for transports like tcp that may not otherwise detect
-     * a transport failure in a timely fashion. 
-     * 
-     * @return true if the transport requires inactivity monitoring.
-     */
-    boolean isUseInactivityMonitor();
-    
+
     /**
      * @return true if the transport is disposed
      */
@@ -161,12 +113,14 @@ public interface Transport extends Servi
      * @return The wireformat for the connection.
      */
     WireFormat getWireformat();
-    
+
+    void setWireformat(WireFormat wireformat);
+
     /**
      * reconnect to another location
      * @param uri
      * @throws IOException on failure of if not supported
      */
-    void reconnect(URI uri) throws IOException;
+    void reconnect(URI uri, CompletionCallback callback);
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java Wed Jul  7 03:24:02 2010
@@ -16,191 +16,40 @@
  */
 package org.apache.activemq.transport;
 
+import org.apache.activemq.util.FactoryFinder;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.fusesource.hawtdispatch.DispatchQueue;
+
 import java.io.IOException;
-import java.net.MalformedURLException;
 import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-
-import org.apache.activemq.util.FactoryFinder;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.util.URISupport;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.wireformat.WireFormatFactory;
 
-public abstract class TransportFactory {
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class TransportFactory {
 
     private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
-    private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
-    private static final ConcurrentHashMap<String, TransportFactory> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactory>();
-
-    private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
-    private static final String THREAD_NAME_FILTER = "threadName";
-    
-    public abstract TransportServer doBind(URI location) throws IOException;
-
-    public Transport doConnect(URI location, Executor ex) throws Exception {
-        return doConnect(location);
-    }
-
-    public Transport doCompositeConnect(URI location, Executor ex) throws Exception {
-        return doCompositeConnect(location);
-    }
-
-    /**
-     * Creates a normal transport.
-     * 
-     * @param location
-     * @return the transport
-     * @throws Exception
-     */
-    public static Transport connect(URI location) throws Exception {
-        TransportFactory tf = findTransportFactory(location);
-        return tf.doConnect(location);
-    }
-
-    /**
-     * Creates a normal transport.
-     * 
-     * @param location
-     * @param ex
-     * @return the transport
-     * @throws Exception
-     */
-    public static Transport connect(URI location, Executor ex) throws Exception {
-        TransportFactory tf = findTransportFactory(location);
-        return tf.doConnect(location, ex);
-    }
-
-    /**
-     * Creates a slimmed down transport that is more efficient so that it can be
-     * used by composite transports like reliable and HA.
-     * 
-     * @param location
-     * @return the Transport
-     * @throws Exception
-     */
-    public static Transport compositeConnect(URI location) throws Exception {
-        TransportFactory tf = findTransportFactory(location);
-        return tf.doCompositeConnect(location);
-    }
-
-    /**
-     * Creates a slimmed down transport that is more efficient so that it can be
-     * used by composite transports like reliable and HA.
-     * 
-     * @param location
-     * @param ex
-     * @return the Transport
-     * @throws Exception
-     */
-    public static Transport compositeConnect(URI location, Executor ex) throws Exception {
-        TransportFactory tf = findTransportFactory(location);
-        return tf.doCompositeConnect(location, ex);
-    }
-
-    public static TransportServer bind(URI location) throws IOException {
-        TransportFactory tf = findTransportFactory(location);
-        return tf.doBind(location);
-    }
-
-    /**
-     * @deprecated 
-     */
-    public static TransportServer bind(String brokerId, URI location) throws IOException {
-        return bind(location);
-    }
-    
-//    public static TransportServer bind(BrokerService brokerService, URI location) throws IOException {
-//        TransportFactory tf = findTransportFactory(location);
-//        if( brokerService!=null && tf instanceof BrokerServiceAware ) {
-//            ((BrokerServiceAware)tf).setBrokerService(brokerService);
-//        }
-//        try {
-//            if( brokerService!=null ) {
-//                SslContext.setCurrentSslContext(brokerService.getSslContext());
-//            }
-//            return tf.doBind(location);
-//        } finally {
-//            SslContext.setCurrentSslContext(null);
-//        }
-//    }    
-
-    public Transport doConnect(URI location) throws Exception {
-        try {
-            Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
-            WireFormat wf = createWireFormat(options);
-            Transport transport = createTransport(location, wf);
-            Transport rc = configure(transport, wf, options);
-            if (!options.isEmpty()) {
-            	// Release the transport resource as we are erroring out...
-            	try {
-            		rc.stop();
-            	} catch (Throwable cleanup) {
-            	}
-                throw new IllegalArgumentException("Invalid connect parameters: " + options);
-            }
-            return rc;
-        } catch (URISyntaxException e) {
-            throw IOExceptionSupport.create(e);
-        }
-    }
+    private static final ConcurrentHashMap<String, TransportFactorySPI> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactorySPI>();
 
-    public Transport doCompositeConnect(URI location) throws Exception {
-        try {
-            Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
-            WireFormat wf = createWireFormat(options);
-            Transport transport = createTransport(location, wf);
-            Transport rc = compositeConfigure(transport, wf, options);
-            if (!options.isEmpty()) {
-                throw new IllegalArgumentException("Invalid connect parameters: " + options);
-            }
-            return rc;
-
-        } catch (URISyntaxException e) {
-            throw IOExceptionSupport.create(e);
-        }
+    public interface TransportFactorySPI {
+        public TransportServer bind(URI location) throws Exception;
+        public Transport connect(URI location) throws Exception;
     }
     
-     /**
-      * Allow registration of a transport factory without wiring via META-INF classes
-     * @param scheme
-     * @param tf
-     */
-    public static void registerTransportFactory(String scheme, TransportFactory tf) {
-        TRANSPORT_FACTORYS.put(scheme, tf);
-      }
-
     /**
-     * Factory method to create a new transport
-     * 
-     * @throws IOException
-     * @throws UnknownHostException
      */
-    protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException, UnknownHostException, IOException {
-        throw new IOException("createTransport() method not implemented!");
-    }
-
-    /**
-     * @param location
-     * @return
-     * @throws IOException
-     */
-    private static TransportFactory findTransportFactory(URI location) throws IOException {
+    private static TransportFactorySPI factory(URI location) throws IOException {
         String scheme = location.getScheme();
         if (scheme == null) {
             throw new IOException("Transport not scheme specified: [" + location + "]");
         }
-        TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
+        TransportFactorySPI tf = TRANSPORT_FACTORYS.get(scheme);
         if (tf == null) {
             // Try to load if from a META-INF property.
             try {
-                tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
+                tf = (TransportFactorySPI)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
                 TRANSPORT_FACTORYS.put(scheme, tf);
             } catch (Throwable e) {
                 throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
@@ -209,94 +58,29 @@ public abstract class TransportFactory {
         return tf;
     }
 
-    protected WireFormat createWireFormat(Map<String, String> options) throws IOException {
-        WireFormatFactory factory = createWireFormatFactory(options);
-        if( factory == null ) {
-            return null;
-        }
-        WireFormat format = factory.createWireFormat();
-        return format;
-    }
-
-    protected WireFormatFactory createWireFormatFactory(Map<String, String> options) throws IOException {
-        String wireFormat = (String)options.remove("wireFormat");
-        if (wireFormat == null) {
-            wireFormat = getDefaultWireFormatType();
-        }
-        if( "null".equals(wireFormat) ) {
-            return null;
-        }
-
-        try {
-            WireFormatFactory wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat);
-            IntrospectionSupport.setProperties(wff, options, "wireFormat.");
-            return wff;
-        } catch (Throwable e) {
-            throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
-        }
-    }
 
-    protected String getDefaultWireFormatType() {
-        return "default";
-    }
-
-    /**
-     * Fully configures and adds all need transport filters so that the
-     * transport can be used by the JMS client.
-     * 
-     * @param transport
-     * @param wf
-     * @param options
-     * @return
-     * @throws Exception
+     /**
+      * Allow registration of a transport factory without wiring via META-INF classes
+     * @param scheme
+     * @param tf
      */
-    public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
-        transport = compositeConfigure(transport, wf, options);
-
-        transport = new MutexTransport(transport);
-
-        return transport;
+    public static void registerTransportFactory(String scheme, TransportFactorySPI tf) {
+        TRANSPORT_FACTORYS.put(scheme, tf);
     }
 
     /**
-     * Fully configures and adds all need transport filters so that the
-     * transport can be used by the ActiveMQ message broker. The main difference
-     * between this and the configure() method is that the broker does not issue
-     * requests to the client so the ResponseCorrelator is not needed.
-     * 
-     * @param transport
-     * @param format
-     * @param options
-     * @return
-     * @throws Exception
+     * Creates a client transport.
      */
-    public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
-        if (options.containsKey(WRITE_TIMEOUT_FILTER)) {
-            transport = new WriteTimeoutFilter(transport);
-            String soWriteTimeout = (String)options.get(WRITE_TIMEOUT_FILTER);
-            if (soWriteTimeout!=null) ((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout));
-        }
-        if (options.containsKey(THREAD_NAME_FILTER)) {
-            transport = new ThreadNameFilter(transport);
-        }
-        transport = compositeConfigure(transport, format, options);
-        transport = new MutexTransport(transport);
-        return transport;
+    public static Transport connect(URI location) throws Exception {
+        return factory(location).connect(location);
     }
 
     /**
-     * Similar to configure(...) but this avoid adding in the MutexTransport and
-     * ResponseCorrelator transport layers so that the resulting transport can
-     * more efficiently be used as part of a composite transport.
-     * 
-     * @param transport
-     * @param format
-     * @param options
-     * @return
+     * Creates a transport server.
      */
-    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-        IntrospectionSupport.setProperties(transport, options);
-        return transport;
+    public static TransportServer bind(URI location) throws Exception {
+        return factory(location).bind(location);
     }
 
+
 }

Added: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java?rev=961062&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java Wed Jul  7 03:24:02 2010
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import org.apache.activemq.util.FactoryFinder;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class  TransportFactorySupport{
+
+    private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
+
+    private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
+    private static final String THREAD_NAME_FILTER = "threadName";
+
+    static public Transport configure(Transport transport, Map<String, String> options) throws IOException {
+        WireFormat wf = createWireFormat(options);
+        transport.setWireformat(wf);
+        IntrospectionSupport.setProperties(transport, options);
+        return transport;
+    }
+
+    public static Transport verify(Transport transport, Map<String, String> options) {
+        if (!options.isEmpty()) {
+            // Release the transport resource as we are erroring out...
+            try {
+                transport.stop();
+            } catch (Throwable cleanup) {
+            }
+            throw new IllegalArgumentException("Invalid connect parameters: " + options);
+        }
+        return transport;
+    }
+
+    static public WireFormat createWireFormat(Map<String, String> options) throws IOException {
+        WireFormatFactory factory = createWireFormatFactory(options);
+        if( factory == null ) {
+            return null;
+        }
+        WireFormat format = factory.createWireFormat();
+        return format;
+    }
+
+    static public WireFormatFactory createWireFormatFactory(Map<String, String> options) throws IOException {
+        String wireFormat = (String)options.remove("wireFormat");
+        if (wireFormat == null) {
+            wireFormat = getDefaultWireFormatType();
+        }
+        if( "null".equals(wireFormat) ) {
+            return null;
+        }
+
+        try {
+            WireFormatFactory wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat);
+            IntrospectionSupport.setProperties(wff, options, "wireFormat.");
+            return wff;
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
+        }
+    }
+
+    static protected String getDefaultWireFormatType() {
+        return "default";
+    }
+
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java Wed Jul  7 03:24:02 2010
@@ -20,31 +20,60 @@ import java.io.IOException;
 import java.net.URI;
 
 import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtdispatch.DispatchQueue;
 
 /**
  * @version $Revision: 1.5 $
  */
 public class TransportFilter implements TransportListener, Transport {
-    protected final Transport next;
+
+    protected Transport next;
     protected TransportListener transportListener;
 
     public TransportFilter(Transport next) {
         this.next = next;
     }
 
+    /**
+     * @return Returns the next transport.
+     */
+    public Transport getNext() {
+        return next;
+    }
+
+    public void setNext(Transport next) {
+        this.next = next;
+    }
+
     public TransportListener getTransportListener() {
         return transportListener;
     }
 
-    public void setTransportListener(TransportListener channelListener) {
-        this.transportListener = channelListener;
-        if (channelListener == null) {
+    public void setTransportListener(TransportListener listener) {
+        this.transportListener = listener;
+        if (listener == null) {
             next.setTransportListener(null);
         } else {
             next.setTransportListener(this);
         }
     }
 
+    public DispatchQueue getDispatchQueue() {
+        return next.getDispatchQueue();
+    }
+
+    public void setDispatchQueue(DispatchQueue queue) {
+        next.setDispatchQueue(queue);
+    }
+
+    public void suspend() {
+        next.suspend();
+    }
+
+    public void resume() {
+        next.resume();
+    }
+
     /**
      * @see org.apache.activemq.Service#start()
      * @throws IOException
@@ -52,7 +81,7 @@ public class TransportFilter implements 
      */
     public void start() throws Exception {
         if (next == null) {
-            throw new IOException("The next channel has not been set.");
+            throw new IOException("The next transport has not been set.");
         }
         if (transportListener == null) {
             throw new IOException("The command listener has not been set.");
@@ -71,43 +100,31 @@ public class TransportFilter implements 
         transportListener.onCommand(command);
     }
 
-    /**
-     * @return Returns the next.
-     */
-    public Transport getNext() {
-        return next;
-    }
 
     public String toString() {
         return next.toString();
     }
 
-    public void oneway(Object command) throws IOException {
-        next.oneway(command);
+    @Deprecated
+    public void oneway(Object command) {
+        oneway(command, null);
     }
 
-    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
-        return next.asyncRequest(command, null);
+    public void oneway(Object command, CompletionCallback callback) {
+        next.oneway(command, callback);
     }
 
-    public Object request(Object command) throws IOException {
-        return next.request(command);
-    }
-
-    public Object request(Object command, int timeout) throws IOException {
-        return next.request(command, timeout);
-    }
 
     public void onException(IOException error) {
         transportListener.onException(error);
     }
 
-    public void transportInterupted() {
-        transportListener.transportInterupted();
+    public void onDisconnected() {
+        transportListener.onDisconnected();
     }
 
-    public void transportResumed() {
-        transportListener.transportResumed();
+    public void onConnected() {
+        transportListener.onConnected();
     }
 
     public <T> T narrow(Class<T> target) {
@@ -137,15 +154,16 @@ public class TransportFilter implements 
         return next.isConnected();
     }
 
-    public void reconnect(URI uri) throws IOException {
-        next.reconnect(uri);
-    }
-
-    public boolean isUseInactivityMonitor() {
-        return next.isUseInactivityMonitor();
+    public void reconnect(URI uri, CompletionCallback callback) {
+        next.reconnect(uri, callback);
     }
 
     public WireFormat getWireformat() {
         return next.getWireformat();
     }
+    public void setWireformat(WireFormat wireformat) {
+        next.setWireformat(wireformat);
+    }
+
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java Wed Jul  7 03:24:02 2010
@@ -37,16 +37,14 @@ public interface TransportListener {
     void onException(IOException error);
     
     /**
-     * The transport has suffered an interuption from which it hopes to recover
-     *
+     * The transport has been connected.
      */
-    void transportInterupted();
-    
-    
+    public void onConnected();
+
     /**
-     * The transport has resumed after an interuption
-     *
+     * The transport has suffered a disconnection from
+     * which it hopes to recover
      */
-    void transportResumed();
-    
+    public void onDisconnected();
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java Wed Jul  7 03:24:02 2010
@@ -20,6 +20,7 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 
 import org.apache.activemq.Service;
+import org.fusesource.hawtdispatch.DispatchQueue;
 
 /**
  * A TransportServer asynchronously accepts {@see Transport} objects and then
@@ -46,4 +47,28 @@ public interface TransportServer extends
      */
     InetSocketAddress getSocketAddress();
 
+    /**
+     * Returns the dispatch queue used by the transport
+     *
+     * @return
+     */
+    DispatchQueue getDispatchQueue();
+
+    /**
+     * Sets the dispatch queue used by the transport
+     *
+     * @param queue
+     */
+    void setDispatchQueue(DispatchQueue queue);
+
+    /**
+     * suspend accepting new transports
+     */
+    void suspend();
+
+    /**
+     * resume accepting new transports
+     */
+    void resume();
+
 }

Added: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java?rev=961062&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java Wed Jul  7 03:24:02 2010
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.pipe;
+
+import org.apache.activemq.transport.CompletionCallback;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtdispatch.CustomDispatchSource;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.EventAggregators;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class PipeTransport implements Transport {
+    static private final Object EOF_TOKEN = new Object();
+
+    final private PipeTransportServer server;
+    PipeTransport peer;
+    private TransportListener listener;
+    private String remoteAddress;
+    private AtomicBoolean stopping = new AtomicBoolean();
+    private String name;
+    private WireFormat wireformat;
+    private boolean marshal;
+    private boolean trace;
+
+    private DispatchQueue dispatchQueue;
+    private CustomDispatchSource<Object,LinkedList<Object>> dispatchSource;
+    private boolean connected;
+
+    public PipeTransport(PipeTransportServer server) {
+        this.server = server;
+    }
+
+    public DispatchQueue getDispatchQueue() {
+        return dispatchQueue;
+    }
+    public void setDispatchQueue(DispatchQueue queue) {
+        if( dispatchQueue!=null ) {
+            dispatchQueue.release();
+        }
+        this.dispatchQueue = queue;
+        if( dispatchQueue!=null ) {
+            dispatchQueue.retain();
+        }
+    }
+
+    public void start() throws Exception {
+        if (dispatchQueue == null) {
+            throw new IllegalArgumentException("dispatchQueue is not set");
+        }
+        server.dispatchQueue.dispatchAsync(new Runnable(){
+            public void run() {
+                dispatchSource = Dispatch.createSource(EventAggregators.linkedList(), dispatchQueue);
+                dispatchSource.setEventHandler(new Runnable(){
+                    public void run() {
+                        try {
+                            final LinkedList<Object> commands = dispatchSource.getData();
+                            for (Object o : commands) {
+
+                                if(o == EOF_TOKEN) {
+                                    throw new EOFException();
+                                }
+
+                                if (wireformat != null && marshal) {
+                                    listener.onCommand(wireformat.unmarshal((Buffer) o));
+                                } else {
+                                    listener.onCommand(o);
+                                }
+                            }
+
+                            // let the peer know that they have been processed.
+                            peer.dispatchQueue.dispatchAsync(new Runnable() {
+                                public void run() {
+                                    outbound -= commands.size();
+                                    drainInbound();
+                                }
+                            });
+                        } catch (IOException e) {
+                            listener.onException(e);
+                        }
+
+                    }
+                });
+                if( peer.dispatchSource != null ) {
+                    fireConnected();
+                    peer.fireConnected();
+                }
+            }
+        });
+    }
+
+    private void fireConnected() {
+        dispatchQueue.dispatchAsync(new Runnable() {
+            public void run() {
+                connected = true;
+                dispatchSource.resume();
+                listener.onConnected();
+                drainInbound();
+            }
+        });
+    }
+
+    public void stop() throws Exception {
+        if( connected ) {
+            peer.dispatchSource.merge(EOF_TOKEN);
+        }
+        if( dispatchSource!=null ) {
+            dispatchSource.release();
+            dispatchSource = null;
+        }
+        setDispatchQueue(null);
+    }
+
+    static final class OneWay {
+        final Object command;
+        final CompletionCallback callback;
+
+        public OneWay(Object command, CompletionCallback callback) {
+            this.callback = callback;
+            this.command = command;
+        }
+    }
+
+    final LinkedList<OneWay> inbound = new LinkedList<OneWay>();
+    int outbound = 0;
+    int maxOutbound = 100;
+
+    @Deprecated
+    public void oneway(Object command) {
+        oneway(command, null);
+    }
+
+    public void oneway(Object command, CompletionCallback callback) {
+        if( !connected ) {
+            throw new IllegalStateException("Not connected.");
+        }
+        if( outbound < maxOutbound ) {
+            transmit(command, callback);
+        } else {
+            inbound.add(new OneWay(command, callback));
+        }
+    }
+
+    private void drainInbound() {
+        while( outbound < maxOutbound && !inbound.isEmpty() ) {
+            OneWay oneWay = inbound.poll();
+            transmit(oneWay.command, oneWay.callback);
+        }
+    }
+
+    private void transmit(Object command, CompletionCallback callback) {
+        outbound++;
+        peer.dispatchSource.merge(command);
+        if( callback!=null ) {
+            callback.onCompletion();
+        }
+    }
+
+    public String getRemoteAddress() {
+        return remoteAddress;
+    }
+
+    public <T> T narrow(Class<T> target) {
+        if (target.isAssignableFrom(getClass())) {
+            return target.cast(this);
+        }
+        return null;
+    }
+
+    public void suspend() {
+        dispatchSource.suspend();
+    }
+
+    public void resume() {
+        dispatchSource.resume();
+    }
+    public void reconnect(URI uri, CompletionCallback callback) {
+        throw new UnsupportedOperationException();
+    }
+
+    public void setRemoteAddress(String remoteAddress) {
+        this.remoteAddress = remoteAddress;
+        if (name == null) {
+            name = remoteAddress;
+        }
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public TransportListener getTransportListener() {
+        return listener;
+    }
+    public void setTransportListener(TransportListener listener) {
+        this.listener = listener;
+    }
+
+    public WireFormat getWireformat() {
+        return wireformat;
+    }
+    public void setWireformat(WireFormat wireformat) {
+        this.wireformat = wireformat;
+    }
+
+
+    public boolean isTrace() {
+        return trace;
+    }
+
+    public void setTrace(boolean trace) {
+        this.trace = trace;
+    }
+
+    public boolean isMarshal() {
+        return marshal;
+    }
+    public void setMarshal(boolean marshall) {
+        this.marshal = marshall;
+    }
+
+    public boolean isConnected() {
+        return !stopping.get();
+    }
+    public boolean isDisposed() {
+        return false;
+    }
+    public boolean isFaultTolerant() {
+        return false;
+    }
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Wed Jul  7 03:24:02 2010
@@ -1,378 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.activemq.transport.pipe;
 
-import java.io.EOFException;
 import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.dispatch.DispatchPriority;
-import org.apache.activemq.dispatch.DispatchQueue;
-import org.apache.activemq.dispatch.Dispatcher;
-import org.apache.activemq.dispatch.internal.RunnableCountDownLatch;
-import org.apache.activemq.transport.DispatchableTransport;
-import org.apache.activemq.transport.FutureResponse;
-import org.apache.activemq.transport.ResponseCallback;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportAcceptListener;
-import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.pipe.Pipe.ReadReadyListener;
-import org.apache.activemq.util.IOExceptionSupport;
+
+import static org.apache.activemq.transport.TransportFactorySupport.*;
+import org.apache.activemq.transport.*;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.URISupport;
-import org.apache.activemq.util.buffer.Buffer;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.wireformat.WireFormatFactory;
-
-import static org.apache.activemq.dispatch.DispatchOption.*;
 
-public class PipeTransportFactory extends TransportFactory {
-    static private final Object EOF_TOKEN = new Object();
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class PipeTransportFactory implements TransportFactory.TransportFactorySPI {
 
     static protected final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
 
-    protected static class PipeTransport implements DispatchableTransport, Runnable, ReadReadyListener<Object> {
-
-        private final Pipe<Object> pipe;
-        private TransportListener listener;
-        private String remoteAddress;
-        private AtomicBoolean stopping = new AtomicBoolean();
-        private Thread thread;
-        private String name;
-        private WireFormat wireFormat;
-        private boolean marshal;
-        private boolean trace;
-        private DispatchQueue dispatchQueue;
-        private Runnable dispatchTask;
-
-        public PipeTransport(Pipe<Object> pipe) {
-            this.pipe = pipe;
-        }
-
-        public void start() throws Exception {
-            if (dispatchQueue != null) {
-                pipe.setMode(Pipe.ASYNC);
-                dispatchQueue.dispatchAsync(dispatchTask);
-            } else {
-                thread = new Thread(this, getRemoteAddress());
-                thread.start();
-            }
-        }
-
-        public void stop() throws Exception {
-        	pipe.write(EOF_TOKEN);
-            if (dispatchQueue != null) {
-                RunnableCountDownLatch done = new RunnableCountDownLatch(1);
-                dispatchQueue.addShutdownWatcher(done);
-                dispatchQueue.release();
-                done.await();
-            } else {
-                stopping.set(true);
-                if( thread!=null ) {
-                	thread.join();
-                }
-            }
-        }
-
-        public void setDispatcher(Dispatcher dispatcher) {
-            dispatchQueue = dispatcher.createSerialQueue(name, STICK_TO_CALLER_THREAD);
-            dispatchTask = new Runnable(){
-                public void run() {
-                    dispatch();
-                }
-            };
-        }
-
-        public void onReadReady(Pipe<Object> pipe) {
-            if (dispatchQueue != null) {
-                dispatchQueue.dispatchAsync(dispatchTask);
-            }
-        }
-
-        public void setName(String name) {
-            this.name = name;
-        }
-
-        public void oneway(Object command) throws IOException {
-
-            try {
-                if (wireFormat != null && marshal) {
-                    pipe.write(wireFormat.marshal(command));
-                } else {
-                    pipe.write(command);
-                }
-            } catch (InterruptedException e) {
-                throw new InterruptedIOException();
-            }
-            /*
-             * try { while( !stopping.get() ) { if( pipe.offer(command, 500,
-             * TimeUnit.MILLISECONDS) ) { break; } } } catch
-             * (InterruptedException e) { throw new InterruptedIOException(); }
-             */
-        }
+    public TransportServer bind(URI uri) throws URISyntaxException, IOException {
 
-        public void dispatch() {
-            while (true) {
-                try {
-                    Object o = pipe.poll();
-                    if (o == null) {
-                        pipe.setReadReadyListener(this);
-                        return;
-                    } else {
-                    	if(o == EOF_TOKEN) {
-                    		throw new EOFException();
-                    	}                    	
-                        if (wireFormat != null && marshal) {
-                            listener.onCommand(wireFormat.unmarshal((Buffer) o));
-                        } else {
-                            listener.onCommand(o);
-                        }
-                        dispatchQueue.dispatchAsync(dispatchTask);
-                        return;
-                    }
-                } catch (IOException e) {
-                    listener.onException(e);
-                }
+        Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
+        String node = uri.getHost();
+        synchronized(servers) {
+            if (servers.containsKey(node)) {
+                throw new IOException("Server already bound: " + node);
             }
-        }
+            PipeTransportServer server = new PipeTransportServer();
+            server.setConnectURI(uri);
+            server.setName(node);
+            server.setWireFormatFactory(TransportFactorySupport.createWireFormatFactory(options));
+            IntrospectionSupport.setProperties(server, options);
 
-        public void run() {
-
-            try {
-                while (!stopping.get()) {
-                    Object value = pipe.poll(500, TimeUnit.MILLISECONDS);
-                    if (value != null) {
-                    	if(value == EOF_TOKEN) {
-                    		throw new EOFException();
-                    	} else {
-                            if (wireFormat != null && marshal) {
-                                listener.onCommand(wireFormat.unmarshal((Buffer)value));
-                            } else {
-                                listener.onCommand(value);
-                            }
-                    	}
-                    }
-                }
-            } catch (IOException e) {
-                listener.onException(e);
-            } catch (InterruptedException e) {
+            if (!options.isEmpty()) {
+                throw new IllegalArgumentException("Invalid bind parameters: " + options);
             }
+            servers.put(node, server);
+            return server;
         }
 
-        public String getRemoteAddress() {
-            return remoteAddress;
-        }
-
-        public TransportListener getTransportListener() {
-            return listener;
-        }
-
-        public boolean isConnected() {
-            return !stopping.get();
-        }
-
-        public boolean isDisposed() {
-            return false;
-        }
-
-        public boolean isFaultTolerant() {
-            return false;
-        }
-
-        public boolean isUseInactivityMonitor() {
-            return false;
-        }
-
-        public <T> T narrow(Class<T> target) {
-            if (target.isAssignableFrom(getClass())) {
-                return target.cast(this);
-            }
-            return null;
-        }
-
-        public void reconnect(URI uri) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        public Object request(Object command) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        public Object request(Object command, int timeout) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        public void setTransportListener(TransportListener listener) {
-            this.listener = listener;
-        }
-
-        public void setRemoteAddress(String remoteAddress) {
-            this.remoteAddress = remoteAddress;
-            if (name == null) {
-                name = remoteAddress;
-            }
-        }
-
-        public void setWireFormat(WireFormat wireFormat) {
-            this.wireFormat = wireFormat;
-        }
-
-        public void setDispatchPriority(DispatchPriority priority) {
-//            TODO:
-//            readContext.updatePriority(priority);
-        }
-
-        public WireFormat getWireformat() {
-            return wireFormat;
-        }
-
-		public boolean isTrace() {
-			return trace;
-		}
-
-		public void setTrace(boolean trace) {
-			this.trace = trace;
-		}
-
-		public boolean isMarshal() {
-			return marshal;
-		}
-
-		public void setMarshal(boolean marshall) {
-			this.marshal = marshall;
-		}
     }
 
-    protected class PipeTransportServer implements TransportServer {
-    	protected URI connectURI;
-        protected TransportAcceptListener listener;
-        protected String name;
-        protected WireFormatFactory wireFormatFactory;
-        protected boolean marshal;
-        protected final AtomicInteger connectionCounter = new AtomicInteger();
-
-        public URI getConnectURI() {
-            return connectURI;
-        }
-
-        public InetSocketAddress getSocketAddress() {
-            return null;
-        }
-
-        public void setAcceptListener(TransportAcceptListener listener) {
-            this.listener = listener;
-        }
-
-        public void start() throws Exception {
-        }
-
-        public void stop() throws Exception {
-            unbind(this);
-        }
-
-        public void setConnectURI(URI connectURI) {
-            this.connectURI = connectURI;
-        }
-
-        public void setName(String name) {
-            this.name = name;
-        }
-
-        public String getName() {
-            return name;
-        }
-
-        public PipeTransport connect() {
-            int connectionId = connectionCounter.incrementAndGet();
-            String remoteAddress = connectURI.toString() + "#" + connectionId;
-            assert listener != null : "Server does not have an accept listener";
-            Pipe<Object> pipe = new Pipe<Object>();
-            PipeTransport rc = createClientTransport(pipe);
-            rc.setRemoteAddress(remoteAddress);
-            PipeTransport serverSide = cerateServerTransport(pipe);
-            serverSide.setMarshal(marshal);
-            serverSide.setRemoteAddress(remoteAddress);
-            if (wireFormatFactory != null) {
-                rc.setWireFormat(wireFormatFactory.createWireFormat());
-                serverSide.setWireFormat(wireFormatFactory.createWireFormat());
+    public Transport connect(URI location) throws IOException, URISyntaxException {
+        String name = location.getHost();
+        synchronized(servers) {
+            PipeTransportServer server = lookup(name);
+            if (server == null) {
+                throw new IOException("Server is not bound: " + name);
             }
-            listener.onAccept(serverSide);
-            return rc;
-        }
-
-		protected PipeTransport createClientTransport(Pipe<Object> pipe) {
-			return new PipeTransport(pipe);
-		}
-
-		protected PipeTransport cerateServerTransport(Pipe<Object> pipe) {
-			return new PipeTransport(pipe.connect());
-		}
-
-        public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
-            this.wireFormatFactory = wireFormatFactory;
-        }
+            PipeTransport transport = server.connect();
 
-		public boolean isMarshal() {
-			return marshal;
-		}
-
-		public void setMarshal(boolean marshal) {
-			this.marshal = marshal;
-		}
-    }
-
-    @Override
-    public TransportServer doBind(URI uri) throws IOException {
-        try {
-            Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
-            String node = uri.getHost();
-    		synchronized(servers) {
-	            if (servers.containsKey(node)) {
-	                throw new IOException("Server already bound: " + node);
-	            }
-	            PipeTransportServer server = createTransportServer();
-	            server.setConnectURI(uri);
-	            server.setName(node);
-                server.setWireFormatFactory(createWireFormatFactory(options));
-                IntrospectionSupport.setProperties(server, options);
-                
-                if (!options.isEmpty()) {
-                    throw new IllegalArgumentException("Invalid bind parameters: " + options);
-                }
-                
-                
-	            servers.put(node, server);
-	            return server;
-    		}
-        } catch (URISyntaxException e) {
-            throw IOExceptionSupport.create(e);
+            Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
+            return verify( configure(transport, options), options);
         }
     }
 
-	protected PipeTransportServer createTransportServer() {
-		return new PipeTransportServer();
-	}
-	
-
 	static public PipeTransportServer lookup(String name) {
 		synchronized(servers) {
 			return servers.get(name);
     	}
 	}
-    
+
     static public Map<String, PipeTransportServer> getServers() {
     	synchronized(servers) {
     		return new HashMap<String, PipeTransportServer>(servers);
@@ -384,47 +89,4 @@ public class PipeTransportFactory extend
 			servers.remove(server.getName());
 		}
     }
-	
-	public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-
-        PipeTransport pipeTransport = (PipeTransport)transport.narrow(PipeTransport.class);
-        IntrospectionSupport.setProperties(pipeTransport, options);
-        
-        if (pipeTransport.isTrace()) {
-            throw new UnsupportedOperationException("Trace not implemented");
-//            try {
-//                transport = TransportLoggerFactory.getInstance().createTransportLogger(transport, pipeTransport.getLogWriterName(),
-//                        pipeTransport.isDynamicManagement(), pipeTransport.isStartLogging(), pipeTransport.getJmxPort());
-//            } catch (Throwable e) {
-//                LOG.error("Could not create TransportLogger object for: " + pipeTransport.getLogWriterName() + ", reason: " + e, e);
-//            }
-        }
-        
-        transport = format.createTransportFilters(transport, options);
-        return transport;
-    }
-
-    protected String getOption(Map options, String key, String def) {
-        String rc = (String) options.remove(key);
-        if( rc == null ) {
-            rc = def;
-        }
-        return rc;
-    }
-
-    protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
-        String name = location.getHost();
-		synchronized(servers) {
-	        PipeTransportServer server = lookup(name);
-	        if (server == null) {
-	            throw new IOException("Server is not bound: " + name);
-	        }
-	        PipeTransport transport = server.connect();
-	        transport.setWireFormat(wf);
-	        return transport;
-		}
-    }
-
-	
-    
 }