You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:23:37 UTC

svn commit: r961218 [2/2] - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/ activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/wireformat/ activemq-broke...

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Wed Jul  7 04:23:35 2010
@@ -16,12 +16,11 @@
  */
 package org.apache.activemq.transport.tcp;
 
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportAcceptListener;
-import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.apollo.transport.TransportServer;
+import org.apache.activemq.apollo.transport.Transport;
+import org.apache.activemq.apollo.transport.TransportAcceptListener;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.wireformat.WireFormatFactory;
 import org.fusesource.hawtdispatch.Dispatch;
 import org.fusesource.hawtdispatch.DispatchQueue;
 import org.fusesource.hawtdispatch.DispatchSource;
@@ -42,7 +41,6 @@ import java.util.Map;
 
 public class TcpTransportServer implements TransportServer {
 
-//    protected WireFormatFactory wireFormatFactory;
     private ServerSocketChannel channel;
     private TransportAcceptListener listener;
     private URI bindURI;
@@ -193,7 +191,6 @@ public class TcpTransportServer implemen
         HashMap<String, Object> options = new HashMap<String, Object>();
 //      options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
 //      options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
-//      options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
 //      options.put("trace", Boolean.valueOf(trace));
 //      options.put("soTimeout", Integer.valueOf(soTimeout));
 //      options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
@@ -221,364 +218,11 @@ public class TcpTransportServer implemen
         this.transportOptions = transportOptions;
     }
 
-
-
-//    private static final Log LOG = LogFactory.getLog(TcpTransportServer.class);
-//    protected ServerSocket serverSocket;
-//    protected int backlog = 5000;
-//    protected WireFormatFactory wireFormatFactory;
-//    protected final TcpTransportFactory transportFactory;
-//    protected long maxInactivityDuration = 30000;
-//    protected long maxInactivityDurationInitalDelay = 10000;
-//    protected int minmumWireFormatVersion;
-//    protected boolean useQueueForAccept=true;
-//
-//    /**
-//     * trace=true -> the Transport stack where this TcpTransport
-//     * object will be, will have a TransportLogger layer
-//     * trace=false -> the Transport stack where this TcpTransport
-//     * object will be, will NOT have a TransportLogger layer, and therefore
-//     * will never be able to print logging messages.
-//     * This parameter is most probably set in Connection or TransportConnector URIs.
-//     */
-//    protected boolean trace = false;
-//
-//    protected int soTimeout = 0;
-//    protected int socketBufferSize = 64 * 1024;
-//    protected int connectionTimeout =  30000;
-//
-//    /**
-//     * Name of the LogWriter implementation to use.
-//     * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
-//     * This parameter is most probably set in Connection or TransportConnector URIs.
-//
-//    protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
-//    */
-//
-//    /**
-//     * Specifies if the TransportLogger will be manageable by JMX or not.
-//     * Also, as long as there is at least 1 TransportLogger which is manageable,
-//     * a TransportLoggerControl MBean will me created.
-//     */
-//    protected boolean dynamicManagement = false;
-//    /**
-//     * startLogging=true -> the TransportLogger object of the Transport stack
-//     * will initially write messages to the log.
-//     * startLogging=false -> the TransportLogger object of the Transport stack
-//     * will initially NOT write messages to the log.
-//     * This parameter only has an effect if trace == true.
-//     * This parameter is most probably set in Connection or TransportConnector URIs.
-//     */
-//    protected boolean startLogging = true;
-//    protected Map<String, Object> transportOptions;
-//    protected final ServerSocketFactory serverSocketFactory;
-//    protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
-//    protected Thread socketHandlerThread;
-//    /**
-//     * The maximum number of sockets allowed for this server
-//     */
-//    protected int maximumConnections = Integer.MAX_VALUE;
-//    protected int currentTransportCount=0;
-//
-//    public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
-//        super(location);
-//        this.transportFactory = transportFactory;
-//        this.serverSocketFactory = serverSocketFactory;
-//
-//    }
-//
-//    public void bind() throws IOException {
-//        URI bind = getBindLocation();
-//
-//        String host = bind.getHost();
-//        host = (host == null || host.length() == 0) ? "localhost" : host;
-//        InetAddress addr = InetAddress.getByName(host);
-//
-//        try {
-//
-//            this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
-//            configureServerSocket(this.serverSocket);
-//
-//        } catch (IOException e) {
-//            throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
-//        }
-//        try {
-//            setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
-//                .getFragment()));
-//        } catch (URISyntaxException e) {
-//
-//            // it could be that the host name contains invalid characters such
-//            // as _ on unix platforms
-//            // so lets try use the IP address instead
-//            try {
-//                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
-//            } catch (URISyntaxException e2) {
-//                throw IOExceptionSupport.create(e2);
-//            }
-//        }
-//    }
-//
-//    private void configureServerSocket(ServerSocket socket) throws SocketException {
-//        socket.setSoTimeout(2000);
-//        if (transportOptions != null) {
-//            IntrospectionSupport.setProperties(socket, transportOptions);
-//        }
-//    }
-//
-//    /**
-//     * @return Returns the wireFormatFactory.
-//     */
-//    public WireFormatFactory getWireFormatFactory() {
-//        return wireFormatFactory;
-//    }
-//
-//    /**
-//     * @param wireFormatFactory The wireFormatFactory to set.
-//     */
-//    public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
-//        this.wireFormatFactory = wireFormatFactory;
-//    }
-//
-//    public long getMaxInactivityDuration() {
-//        return maxInactivityDuration;
-//    }
-//
-//    public void setMaxInactivityDuration(long maxInactivityDuration) {
-//        this.maxInactivityDuration = maxInactivityDuration;
-//    }
-//
-//    public long getMaxInactivityDurationInitalDelay() {
-//        return this.maxInactivityDurationInitalDelay;
-//    }
-//
-//    public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
-//        this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
-//    }
-//
-//    public int getMinmumWireFormatVersion() {
-//        return minmumWireFormatVersion;
-//    }
-//
-//    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
-//        this.minmumWireFormatVersion = minmumWireFormatVersion;
-//    }
-//
-//    public boolean isTrace() {
-//        return trace;
-//    }
-//
-//    public void setTrace(boolean trace) {
-//        this.trace = trace;
-//    }
-////
-////    public String getLogWriterName() {
-////        return logWriterName;
-////    }
-////
-////    public void setLogWriterName(String logFormat) {
-////        this.logWriterName = logFormat;
-////    }
-//
-//    public boolean isDynamicManagement() {
-//        return dynamicManagement;
-//    }
-//
-//    public void setDynamicManagement(boolean useJmx) {
-//        this.dynamicManagement = useJmx;
-//    }
-//
-//    public boolean isStartLogging() {
-//        return startLogging;
-//    }
-//
-//
-//    public void setStartLogging(boolean startLogging) {
-//        this.startLogging = startLogging;
-//    }
-//
-//    /**
-//     * @return the backlog
-//     */
-//    public int getBacklog() {
-//        return backlog;
-//    }
-//
-//    /**
-//     * @param backlog the backlog to set
-//     */
-//    public void setBacklog(int backlog) {
-//        this.backlog = backlog;
-//    }
-//
-//    /**
-//     * @return the useQueueForAccept
-//     */
-//    public boolean isUseQueueForAccept() {
-//        return useQueueForAccept;
-//    }
-//
-//    /**
-//     * @param useQueueForAccept the useQueueForAccept to set
-//     */
-//    public void setUseQueueForAccept(boolean useQueueForAccept) {
-//        this.useQueueForAccept = useQueueForAccept;
-//    }
-//
-//
-//    /**
-//     * pull Sockets from the ServerSocket
-//     */
-//    public void run() {
-//        while (!isStopped()) {
-//            Socket socket = null;
-//            try {
-//                socket = serverSocket.accept();
-//                if (socket != null) {
-//                    if (isStopped() || getAcceptListener() == null) {
-//                        socket.close();
-//                    } else {
-//                        if (useQueueForAccept) {
-//                            socketQueue.put(socket);
-//                        }else {
-//                            handleSocket(socket);
-//                        }
-//                    }
-//                }
-//            } catch (SocketTimeoutException ste) {
-//                // expect this to happen
-//            } catch (Exception e) {
-//                if (!isStopping()) {
-//                    onAcceptError(e);
-//                } else if (!isStopped()) {
-//                    LOG.warn("run()", e);
-//                    onAcceptError(e);
-//                }
-//            }
-//        }
-//    }
-//
-//    /**
-//     * Allow derived classes to override the Transport implementation that this
-//     * transport server creates.
-//     *
-//     * @param socket
-//     * @param format
-//     * @return
-//     * @throws IOException
-//     */
-//    protected  Transport createTransport(Socket socket, WireFormat format) throws IOException {
-//        return new TcpTransport(format, socket);
-//    }
-//
-
     /**
      * @return pretty print of this
      */
     public String toString() {
         return "" + bindURI;
     }
-//
-//    /**
-//     * @param socket
-//     * @param inetAddress
-//     * @return real hostName
-//     * @throws UnknownHostException
-//     */
-//
-//    protected void doStart() throws Exception {
-//        if(useQueueForAccept) {
-//            Runnable run = new Runnable() {
-//                public void run() {
-//                    try {
-//                        while (!isStopped() && !isStopping()) {
-//                            Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
-//                            if (sock != null) {
-//                                handleSocket(sock);
-//                            }
-//                        }
-//
-//                    } catch (InterruptedException e) {
-//                        LOG.info("socketQueue interuppted - stopping");
-//                        if (!isStopping()) {
-//                            onAcceptError(e);
-//                        }
-//                    }
-//
-//                }
-//
-//            };
-//            socketHandlerThread = new Thread(null, run,
-//                    "ActiveMQ Transport Server Thread Handler: " + toString(),
-//                    getStackSize());
-//            socketHandlerThread.setDaemon(true);
-//            //socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
-//            socketHandlerThread.start();
-//        }
-//        super.doStart();
-//
-//    }
-//
-//    protected void doStop(ServiceStopper stopper) throws Exception {
-//        super.doStop(stopper);
-//        if (serverSocket != null) {
-//            serverSocket.close();
-//        }
-//    }
-//
-//    public InetSocketAddress getSocketAddress() {
-//        return (InetSocketAddress)serverSocket.getLocalSocketAddress();
-//    }
-//
-//    public void setTransportOption(Map<String, Object> transportOptions) {
-//        this.transportOptions = transportOptions;
-//    }
-//
-//
-//    public int getSoTimeout() {
-//        return soTimeout;
-//    }
-//
-//    public void setSoTimeout(int soTimeout) {
-//        this.soTimeout = soTimeout;
-//    }
-//
-//    public int getSocketBufferSize() {
-//        return socketBufferSize;
-//    }
-//
-//    public void setSocketBufferSize(int socketBufferSize) {
-//        this.socketBufferSize = socketBufferSize;
-//    }
-//
-//    public int getConnectionTimeout() {
-//        return connectionTimeout;
-//    }
-//
-//    public void setConnectionTimeout(int connectionTimeout) {
-//        this.connectionTimeout = connectionTimeout;
-//    }
-//
-//    /**
-//     * @return the maximumConnections
-//     */
-//    public int getMaximumConnections() {
-//        return maximumConnections;
-//    }
-//
-//    /**
-//     * @param maximumConnections the maximumConnections to set
-//     */
-//    public void setMaximumConnections(int maximumConnections) {
-//        this.maximumConnections = maximumConnections;
-//    }
-//
-//
-//    public void started(Service service) {
-//       this.currentTransportCount++;
-//    }
-//
-//    public void stopped(Service service) {
-//        this.currentTransportCount--;
-//    }
 
 }
\ No newline at end of file

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DefaultTransportListener.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DefaultTransportListener.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DefaultTransportListener.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DefaultTransportListener.java Wed Jul  7 04:23:35 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport;
+package org.apache.activemq.apollo.transport;
 
 import java.io.IOException;
 

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryAgent.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgent.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryAgent.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryAgent.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgent.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgent.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryAgent.java Wed Jul  7 04:23:35 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.discovery;
+package org.apache.activemq.apollo.transport;
 
 import java.io.IOException;
 

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryAgentFactory.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryAgentFactory.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryAgentFactory.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryAgentFactory.java Wed Jul  7 04:23:35 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.discovery;
+package org.apache.activemq.apollo.transport;
 
 import java.io.IOException;
 import java.net.URI;

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryEvent.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryEvent.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryEvent.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryEvent.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryEvent.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryEvent.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryEvent.java Wed Jul  7 04:23:35 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.discovery;
+package org.apache.activemq.apollo.transport;
 
 public class DiscoveryEvent {
 

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryListener.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryListener.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryListener.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryListener.java Wed Jul  7 04:23:35 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.discovery;
+package org.apache.activemq.apollo.transport;
 
 
 /**

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ObjectStreamProtocolCodec.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ObjectStreamProtocolCodec.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ObjectStreamProtocolCodec.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ObjectStreamProtocolCodec.java Wed Jul  7 04:23:35 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.wireformat;
+package org.apache.activemq.apollo.transport;
 
 import java.io.DataInput;
 import java.io.DataInputStream;
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 
@@ -38,7 +37,7 @@ import org.fusesource.hawtbuf.ByteArrayO
  * 
  * @version $Revision: 1.1 $
  */
-public class ObjectStreamWireFormat implements WireFormat {
+public class ObjectStreamProtocolCodec implements ProtocolCodec {
 
     public static final String WIREFORMAT_NAME = "object";
     
@@ -105,7 +104,7 @@ public class ObjectStreamWireFormat impl
         throw new UnsupportedOperationException();
     }
 
-    public String getName() {
+    public String protocol() {
         return WIREFORMAT_NAME;
     }
 

Added: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ObjectStreamProtocolCodecFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ObjectStreamProtocolCodecFactory.java?rev=961218&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ObjectStreamProtocolCodecFactory.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ObjectStreamProtocolCodecFactory.java Wed Jul  7 04:23:35 2010
@@ -0,0 +1,22 @@
+package org.apache.activemq.apollo.transport;
+
+import org.fusesource.hawtbuf.Buffer;
+
+public class ObjectStreamProtocolCodecFactory implements ProtocolCodecFactory {
+
+	public ProtocolCodec createProtocolCodec() {
+		return new ObjectStreamProtocolCodec();
+	}	
+
+    public boolean isIdentifiable() {
+        return false;
+    }
+
+    public boolean matchesIdentification(Buffer byteSequence) {
+        throw new UnsupportedOperationException();
+    }
+
+    public int maxIdentificaionLength() {
+        throw new UnsupportedOperationException();
+    }
+}

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodec.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodec.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodec.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodec.java Wed Jul  7 04:23:35 2010
@@ -14,12 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.wireformat;
+package org.apache.activemq.apollo.transport;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 
@@ -27,44 +24,22 @@ import org.fusesource.hawtbuf.Buffer;
 
 
 /**
- * Provides a mechanism to marshal commands into and out of packets
- * or into and out of streams, Channels and Datagrams.
+ * Interface to encode and decode commands in and out of a a non blocking channel.
  *
- * @version $Revision: 1.1 $
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public interface WireFormat {
-
-    enum BufferState {
-        EMPTY,
-        WAS_EMPTY,
-        NOT_EMPTY,
-        FULL,
-    }
+public interface ProtocolCodec {
 
     /**
-     * @return The name of the wireformat
+     * @return The name of the protocol associated with the the channel codec.
      */
-    String getName();
+    String protocol();
 
-    /**
-     * Packet based marshaling 
-     */
-    Buffer marshal(Object command) throws IOException;
-    
-    /**
-     * Packet based un-marshaling 
-     */
-    Object unmarshal(Buffer packet) throws IOException;
-
-    /**
-     * Stream based marshaling 
-     */
-    void marshal(Object command, DataOutput out) throws IOException;
-
-    /**
-     * Stream based un-marshaling
-     */
-    Object unmarshal(DataInput in) throws IOException;
+    ///////////////////////////////////////////////////////////////////
+    //
+    // Methods related with reading from the channel
+    //
+    ///////////////////////////////////////////////////////////////////
 
     /**
      * @param channel
@@ -87,13 +62,26 @@ public interface WireFormat {
      */
     void unread(Buffer buffer);
 
-
     /**
      * @return The number of bytes received.
      */
     public long getReadCounter();
 
 
+    ///////////////////////////////////////////////////////////////////
+    //
+    // Methods related with writing to the channel
+    //
+    ///////////////////////////////////////////////////////////////////
+
+
+    enum BufferState {
+        EMPTY,
+        WAS_EMPTY,
+        NOT_EMPTY,
+        FULL,
+    }
+
     public void setWritableByteChannel(WritableByteChannel channel);
 
     /**
@@ -114,19 +102,7 @@ public interface WireFormat {
     /**
      * @return The number of bytes written.
      */
-    public long getWriteCounter() ;
-
-
-//    void unmarshalStartPos(int pos);
-//
-//    int unmarshalEndPos();
-//    void unmarshalEndPos(int pos);
-//
-//    /**
-//     * For a unmarshal session is used for non-blocking
-//     * unmarshalling.
-//     */
-//    Object unmarshalNB(ByteBuffer buffer) throws IOException;
+    public long getWriteCounter();
 
 
 }

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodecFactory.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodecFactory.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodecFactory.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodecFactory.java Wed Jul  7 04:23:35 2010
@@ -14,38 +14,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.wireformat;
+package org.apache.activemq.apollo.transport;
 
 import org.fusesource.hawtbuf.Buffer;
 
-public interface WireFormatFactory {
+public interface ProtocolCodecFactory {
     
     /**
      * @return an instance of the wire format. 
      * 
      */
-    WireFormat createWireFormat();
+    ProtocolCodec createProtocolCodec();
     
     /**
-     * @return true if this wire format factory is isDiscriminatable. A discriminatable
-     * WireFormat's will first write a header to the stream 
+     * @return true if this wire format factory is identifiable. An identifiable
+     * protocol will first write a easy to identify header to the stream
      */
-    boolean isDiscriminatable();
+    boolean isIdentifiable();
     
     /**
      * @return Returns the maximum length of the header used to discriminate the wire format if it
-     * {@link #isDiscriminatable()}
-     * @throws UnsupportedOperationException If {@link #isDiscriminatable()} is false
+     * {@link #isIdentifiable()}
+     * @throws UnsupportedOperationException If {@link #isIdentifiable()} is false
      */
-    int maxWireformatHeaderLength();
+    int maxIdentificaionLength();
 
     /**
-     * Called to test if this wireformat matches the provided header.
+     * Called to test if this protocol matches the identification header.
      * 
      * @param buffer The byte buffer representing the header data read so far.
-     * @return true if the Buffer matches the wire format header.
+     * @return true if the Buffer matches the protocol format header.
      */
-    boolean matchesWireformatHeader(Buffer buffer);
+    boolean matchesIdentification(Buffer buffer);
 
     
 }

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/Transport.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/Transport.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/Transport.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/Transport.java Wed Jul  7 04:23:35 2010
@@ -14,15 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport;
+package org.apache.activemq.apollo.transport;
 
 import java.io.IOException;
 import java.net.URI;
 
 import org.apache.activemq.Service;
-import org.apache.activemq.wireformat.WireFormat;
 import org.fusesource.hawtdispatch.DispatchQueue;
-import org.fusesource.hawtdispatch.Retained;
 
 /**
  * Represents an abstract connection.  It can be a client side or server side connection.
@@ -109,11 +107,15 @@ public interface Transport extends Servi
     boolean isConnected();
     
     /**
-     * @return The wireformat for the connection.
+     * @return The protocol codec for the transport.
      */
-    WireFormat getWireformat();
+    ProtocolCodec getProtocolCodec();
 
-    void setWireformat(WireFormat wireformat);
+    /**
+     * Sets the protocol codec for the transport
+     * @param protocolCodec
+     */
+    void setProtocolCodec(ProtocolCodec protocolCodec);
 
     /**
      * reconnect to another location

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java Wed Jul  7 04:23:35 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport;
+package org.apache.activemq.apollo.transport;
 
 
 public interface TransportAcceptListener {

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFactory.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFactory.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFactory.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFactory.java Wed Jul  7 04:23:35 2010
@@ -14,14 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport;
+package org.apache.activemq.apollo.transport;
 
 import org.apache.activemq.util.FactoryFinder;
 import org.apache.activemq.util.IOExceptionSupport;
-import org.fusesource.hawtdispatch.DispatchQueue;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFactorySupport.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFactorySupport.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFactorySupport.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFactorySupport.java Wed Jul  7 04:23:35 2010
@@ -14,36 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport;
+package org.apache.activemq.apollo.transport;
 
 import org.apache.activemq.util.FactoryFinder;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.URISupport;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.wireformat.WireFormatFactory;
 
 import java.io.IOException;
-import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
 
 /**
  * <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class  TransportFactorySupport{
+public class  TransportFactorySupport {
 
-    private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
-
-    private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
-    private static final String THREAD_NAME_FILTER = "threadName";
+    private static final FactoryFinder PROTOCOL_CODEC_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
 
     static public Transport configure(Transport transport, Map<String, String> options) throws IOException {
-        WireFormat wf = createWireFormat(options);
-        transport.setWireformat(wf);
+        ProtocolCodec wf = createProtocolCodec(options);
+        transport.setProtocolCodec(wf);
         IntrospectionSupport.setProperties(transport, options);
         return transport;
     }
@@ -60,52 +53,52 @@ public class  TransportFactorySupport{
         return transport;
     }
 
-    static public WireFormat createWireFormat(Map<String, String> options) throws IOException {
-        WireFormatFactory factory = createWireFormatFactory(options);
+    static public ProtocolCodec createProtocolCodec(Map<String, String> options) throws IOException {
+        ProtocolCodecFactory factory = createProtocolCodecFactory(options);
         if( factory == null ) {
             return null;
         }
-        WireFormat format = factory.createWireFormat();
-        return format;
+        ProtocolCodec protocolCodec = factory.createProtocolCodec();
+        return protocolCodec;
     }
 
-    static public WireFormatFactory createWireFormatFactory(Map<String, String> options) throws IOException {
-        String wireFormat = (String)options.remove("wireFormat");
-        if (wireFormat == null) {
-            wireFormat = getDefaultWireFormatType();
+    static public ProtocolCodecFactory createProtocolCodecFactory(Map<String, String> options) throws IOException {
+        String protocolName = (String)options.remove("protocol");
+        if (protocolName == null) {
+            protocolName = getDefaultProtocolName();
         }
-        if( "null".equals(wireFormat) ) {
+        if( "null".equals(protocolName) ) {
             return null;
         }
 
         try {
-            WireFormatFactory wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat);
-            IntrospectionSupport.setProperties(wff, options, "wireFormat.");
+            ProtocolCodecFactory wff = (ProtocolCodecFactory) PROTOCOL_CODEC_FACTORY_FINDER.newInstance(protocolName);
+            IntrospectionSupport.setProperties(wff, options, "protocol.");
             return wff;
         } catch (Throwable e) {
-            throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
+            throw IOExceptionSupport.create("Could not create protocol codec for: " + protocolName + ", reason: " + e, e);
         }
     }
-    static public WireFormatFactory createWireFormatFactory(String location) throws IOException, URISyntaxException {
+    static public ProtocolCodecFactory createProtocolCodecFactory(String location) throws IOException, URISyntaxException {
         URI uri = new URI(location);
         Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
 
-        String wireFormat = uri.getPath();
-        if( "null".equals(wireFormat) ) {
+        String protocolName = uri.getPath();
+        if( "null".equals(protocolName) ) {
             return null;
         }
 
         try {
-            WireFormatFactory wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat);
+            ProtocolCodecFactory wff = (ProtocolCodecFactory) PROTOCOL_CODEC_FACTORY_FINDER.newInstance(protocolName);
             IntrospectionSupport.setProperties(wff, options);
             return wff;
         } catch (Throwable e) {
-            throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
+            throw IOExceptionSupport.create("Could not protocol codec for: " + protocolName + ", reason: " + e, e);
         }
     }
 
-    static protected String getDefaultWireFormatType() {
-        return "default";
+    static protected String getDefaultProtocolName() {
+        return "stomp";
     }
 
 }

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFilter.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFilter.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFilter.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFilter.java Wed Jul  7 04:23:35 2010
@@ -14,15 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport;
+package org.apache.activemq.apollo.transport;
+
+import org.fusesource.hawtdispatch.DispatchQueue;
 
 import java.io.IOException;
 import java.net.URI;
 
-import org.apache.activemq.wireformat.WireFormat;
-import org.fusesource.hawtdispatch.DispatchQueue;
-import org.fusesource.hawtdispatch.Retained;
-
 /**
  * @version $Revision: 1.5 $
  */
@@ -151,7 +149,7 @@ public class TransportFilter implements 
 
     /**
      * @return
-     * @see org.apache.activemq.transport.Transport#isFaultTolerant()
+     * @see Transport#isFaultTolerant()
      */
     public boolean isFaultTolerant() {
         return next.isFaultTolerant();
@@ -173,11 +171,11 @@ public class TransportFilter implements 
         next.reconnect(uri);
     }
 
-    public WireFormat getWireformat() {
-        return next.getWireformat();
+    public ProtocolCodec getProtocolCodec() {
+        return next.getProtocolCodec();
     }
-    public void setWireformat(WireFormat wireformat) {
-        next.setWireformat(wireformat);
+    public void setProtocolCodec(ProtocolCodec protocolCodec) {
+        next.setProtocolCodec(protocolCodec);
     }
 
 

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportListener.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportListener.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportListener.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportListener.java Wed Jul  7 04:23:35 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport;
+package org.apache.activemq.apollo.transport;
 
 import java.io.IOException;
 

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportServer.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportServer.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportServer.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportServer.java Wed Jul  7 04:23:35 2010
@@ -14,13 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport;
+package org.apache.activemq.apollo.transport;
 
 import java.net.InetSocketAddress;
 import java.net.URI;
 
 import org.apache.activemq.Service;
-import org.apache.activemq.wireformat.WireFormatFactory;
 import org.fusesource.hawtdispatch.DispatchQueue;
 
 /**

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportServerSupport.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportServerSupport.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportServerSupport.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportServerSupport.java Wed Jul  7 04:23:35 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport;
+package org.apache.activemq.apollo.transport;
 
 import java.net.URI;
 

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/package.html (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/package.html)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/package.html?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/package.html&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/package.html&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/package.html (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/package.html Wed Jul  7 04:23:35 2010
@@ -20,7 +20,7 @@
 <body>
 
 <p>
-An API for WireFormats which are used to turn object into bytes and bytes into objects.
+Transport related interfaces.
 </p>
 
 </body>

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransport.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransport.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransport.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransport.java Wed Jul  7 04:23:35 2010
@@ -14,12 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.pipe;
+package org.apache.activemq.apollo.transport.pipe;
 
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportListener;
-import org.fusesource.hawtbuf.Buffer;
-import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.apollo.transport.ProtocolCodec;
+import org.apache.activemq.apollo.transport.Transport;
+import org.apache.activemq.apollo.transport.TransportListener;
 import org.fusesource.hawtdispatch.*;
 
 import java.io.EOFException;
@@ -41,7 +40,6 @@ public class PipeTransport implements Tr
     private String remoteAddress;
     private AtomicBoolean stopping = new AtomicBoolean();
     private String name;
-    private WireFormat wireformat;
     private boolean marshal;
     private boolean trace;
 
@@ -51,6 +49,7 @@ public class PipeTransport implements Tr
 
     private long writeCounter = 0;
     private long readCounter = 0;
+    private ProtocolCodec protocolCodec;
 
     public PipeTransport(PipeTransportServer server) {
         this.server = server;
@@ -88,13 +87,8 @@ public class PipeTransport implements Tr
                                 if(o == EOF_TOKEN) {
                                     throw new EOFException();
                                 }
-
-                                if (wireformat != null && marshal) {
-                                    listener.onTransportCommand(wireformat.unmarshal((Buffer) o));
-                                } else {
-                                    readCounter ++;
-                                    listener.onTransportCommand(o);
-                                }
+                                readCounter ++;
+                                listener.onTransportCommand(o);
                             }
 
                             // let the peer know that they have been processed.
@@ -247,11 +241,11 @@ public class PipeTransport implements Tr
         this.listener = listener;
     }
 
-    public WireFormat getWireformat() {
-        return wireformat;
+    public ProtocolCodec getProtocolCodec() {
+        return protocolCodec;
     }
-    public void setWireformat(WireFormat wireformat) {
-        this.wireformat = wireformat;
+    public void setProtocolCodec(ProtocolCodec protocolCodec) {
+        this.protocolCodec = protocolCodec;
     }
 
 

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportFactory.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportFactory.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportFactory.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportFactory.java Wed Jul  7 04:23:35 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.pipe;
+package org.apache.activemq.apollo.transport.pipe;
 
 import java.io.IOException;
 import java.net.URI;
@@ -22,8 +22,11 @@ import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.activemq.transport.TransportFactorySupport.*;
-import org.apache.activemq.transport.*;
+import static org.apache.activemq.apollo.transport.TransportFactorySupport.*;
+
+import org.apache.activemq.apollo.transport.Transport;
+import org.apache.activemq.apollo.transport.TransportFactory;
+import org.apache.activemq.apollo.transport.TransportServer;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.URISupport;
 

Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportServer.java (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportServer.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportServer.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportServer.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportServer.java&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportServer.java Wed Jul  7 04:23:35 2010
@@ -14,11 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.pipe;
+package org.apache.activemq.apollo.transport.pipe;
 
-import org.apache.activemq.transport.TransportAcceptListener;
-import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.wireformat.WireFormatFactory;
+import org.apache.activemq.apollo.transport.TransportAcceptListener;
+import org.apache.activemq.apollo.transport.TransportServer;
+import org.apache.activemq.apollo.transport.ProtocolCodecFactory;
 import org.fusesource.hawtdispatch.CustomDispatchSource;
 import org.fusesource.hawtdispatch.Dispatch;
 import org.fusesource.hawtdispatch.DispatchQueue;
@@ -38,7 +38,7 @@ public class PipeTransportServer impleme
     protected URI connectURI;
     protected TransportAcceptListener listener;
     protected String name;
-    protected WireFormatFactory wireFormatFactory;
+    protected ProtocolCodecFactory protocolCodecFactory;
     protected boolean marshal;
     protected final AtomicInteger connectionCounter = new AtomicInteger();
     DispatchQueue dispatchQueue;
@@ -127,9 +127,9 @@ public class PipeTransportServer impleme
         serverTransport.setRemoteAddress(remoteAddress);
 
         serverTransport.setMarshal(marshal);
-        if (wireFormatFactory != null) {
-            clientTransport.setWireformat(wireFormatFactory.createWireFormat());
-            serverTransport.setWireformat(wireFormatFactory.createWireFormat());
+        if (protocolCodecFactory != null) {
+            clientTransport.setProtocolCodec(protocolCodecFactory.createProtocolCodec());
+            serverTransport.setProtocolCodec(protocolCodecFactory.createProtocolCodec());
         }
         this.acceptSource.merge(serverTransport);
         return clientTransport;
@@ -143,8 +143,8 @@ public class PipeTransportServer impleme
         return new PipeTransport(this);
     }
 
-    public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
-        this.wireFormatFactory = wireFormatFactory;
+    public void setProtocolCodecFactory(ProtocolCodecFactory protocolCodecFactory) {
+        this.protocolCodecFactory = protocolCodecFactory;
     }
 
     public boolean isMarshal() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPoolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPoolFactory.scala?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPoolFactory.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPoolFactory.scala Wed Jul  7 04:23:35 2010
@@ -27,7 +27,7 @@ import org.apache.activemq.apollo.util.C
 object DirectBufferPoolFactory {
 
   val finder = ClassFinder[SPI]("META-INF/services/org.apache.activemq.apollo/direct-buffer-pools")
-  var direct_buffer_pools_spis = List[SPI]()
+  var spis = List[SPI]()
 
   trait SPI {
     def create(config:String):DirectBufferPool
@@ -36,11 +36,9 @@ object DirectBufferPoolFactory {
 
   finder.find.foreach{ clazz =>
     try {
-      val SPI = clazz.newInstance.asInstanceOf[SPI]
-      direct_buffer_pools_spis ::= SPI
+      spis ::= clazz.newInstance.asInstanceOf[SPI]
     } catch {
-      case e:Throwable =>
-        e.printStackTrace
+      case e:Throwable => e.printStackTrace
     }
   }
 
@@ -48,7 +46,7 @@ object DirectBufferPoolFactory {
     if( config == null ) {
       return null
     }
-    direct_buffer_pools_spis.foreach { spi=>
+    spis.foreach { spi=>
       val rc = spi.create(config)
       if( rc!=null ) {
         return rc
@@ -62,7 +60,7 @@ object DirectBufferPoolFactory {
     if( config == null ) {
       return true
     } else {
-      direct_buffer_pools_spis.foreach { spi=>
+      spis.foreach { spi=>
         if( spi.validate(config) ) {
           return true
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala Wed Jul  7 04:23:35 2010
@@ -274,10 +274,10 @@ case class RuntimeResource(parent:Broker
             result.id = connection.id
             result.state = connection.serviceState.toString
             result.state_since = connection.serviceState.since
-            result.protocol = connection.protocol
+            result.protocol = connection.protocolHandler.protocol
             result.transport = connection.transport.getTypeId
             result.remote_address = connection.transport.getRemoteAddress
-            val wf = connection.transport.getWireformat
+            val wf = connection.transport.getProtocolCodec
             if( wf!=null ) {
               result.write_counter = wf.getWriteCounter
               result.read_counter = wf.getReadCounter