You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/05/30 21:39:27 UTC

svn commit: r780312 - in /activemq/sandbox/activemq-flow: activemq-broker/src/main/java/org/apache/activemq/transport/ activemq-broker/src/main/java/org/apache/activemq/wireformat/ activemq-broker/src/main/resources/META-INF/services/org/apache/activem...

Author: chirino
Date: Sat May 30 19:39:26 2009
New Revision: 780312

URL: http://svn.apache.org/viewvc?rev=780312&view=rev
Log:
removing dependencies on activemq-core


Added:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/FutureResponse.java   (with props)
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/MutexTransport.java   (with props)
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ResponseCallback.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ThreadNameFilter.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/Transport.java   (with props)
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java   (with props)
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFactory.java   (with props)
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFilter.java   (with props)
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportListener.java   (with props)
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportServer.java   (with props)
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/WireFormat.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/package.html
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/wireformat/
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java
      - copied unchanged from r780233, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/wireformat/
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java
      - copied unchanged from r780233, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOExceptionSupport.java   (with props)
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntSequenceGenerator.java
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java   (with props)
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/URISupport.java   (with props)
Removed:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/
    activemq/sandbox/activemq-flow/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/broker/store/
    activemq/sandbox/activemq-flow/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/wireformat/default
    activemq/sandbox/activemq-flow/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/wireformat/openwire
    activemq/sandbox/activemq-flow/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp
Modified:
    activemq/sandbox/activemq-flow/activemq-util/pom.xml

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/FutureResponse.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/FutureResponse.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/FutureResponse.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/FutureResponse.java Sat May 30 19:39:26 2009
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class FutureResponse<T> {
+    private static final Log LOG = LogFactory.getLog(FutureResponse.class);
+
+    private final ResponseCallback<T> responseCallback;
+    private final ArrayBlockingQueue<T> responseSlot = new ArrayBlockingQueue<T>(1);
+
+    public FutureResponse(ResponseCallback<T> responseCallback) {
+        this.responseCallback = responseCallback;
+    }
+
+    public T getResult() throws IOException {
+        try {
+            return responseSlot.take();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Operation interupted: " + e, e);
+            }
+            throw new InterruptedIOException("Interrupted.");
+        }
+    }
+
+    public T getResult(int timeout) throws IOException {
+        try {
+            return responseSlot.poll(timeout, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException("Interrupted.");
+        }
+    }
+
+    public void set(T result) {
+        if (responseSlot.offer(result)) {
+            if (responseCallback != null) {
+                responseCallback.onCompletion(this);
+            }
+        }
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/FutureResponse.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/MutexTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/MutexTransport.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/MutexTransport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/MutexTransport.java Sat May 30 19:39:26 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+
+/**
+ * @version $Revision$
+ */
+public class MutexTransport extends TransportFilter {
+
+    private final Object writeMutex = new Object();
+
+    public MutexTransport(Transport next) {
+        super(next);
+    }
+
+    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
+        synchronized (writeMutex) {
+            return next.asyncRequest(command, null);
+        }
+    }
+
+    public void oneway(Object command) throws IOException {
+        synchronized (writeMutex) {
+            next.oneway(command);
+        }
+    }
+
+    public Object request(Object command) throws IOException {
+        synchronized (writeMutex) {
+            return next.request(command);
+        }
+    }
+
+    public Object request(Object command, int timeout) throws IOException {
+        synchronized (writeMutex) {
+            return next.request(command, timeout);
+        }
+    }
+
+    public String toString() {
+        return next.toString();
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/MutexTransport.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ResponseCallback.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ResponseCallback.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ResponseCallback.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ResponseCallback.java Sat May 30 19:39:26 2009
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+/**
+ * @version $Revision$
+ */
+public interface ResponseCallback<T> {
+    void onCompletion(FutureResponse<T> resp);
+}

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ThreadNameFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ThreadNameFilter.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ThreadNameFilter.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ThreadNameFilter.java Sat May 30 19:39:26 2009
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+
+/**
+ * The thread name filter, modifies the name of the thread during the invocation to a transport.
+ * It appends the remote address, so that a call stuck in a transport method such as socketWrite0
+ * will have the destination information in the thread name.
+ * This is extremely useful for thread dumps when debugging.
+ * To enable this transport, in the transport URI, simpley add<br/>
+ * <code>transport.threadName</code>.<br/>
+ * For example:</br>
+ * <pre><code>
+ * &lt;transportConnector 
+ *     name=&quot;tcp1&quot; 
+ *     uri=&quot;tcp://127.0.0.1:61616?transport.soTimeout=10000&amp;transport.threadName"
+ * /&gt;
+ * </code></pre>
+ * @author Filip Hanik
+ *
+ */
+public class ThreadNameFilter extends TransportFilter {
+
+    public ThreadNameFilter(Transport next) {
+        super(next);
+    }
+
+    @Override
+    public void oneway(Object command) throws IOException {
+        String address =(next!=null?next.getRemoteAddress():null); 
+        if (address!=null) {
+            String name = Thread.currentThread().getName();
+            try {
+                String sendname = name + " - SendTo:"+address;
+                Thread.currentThread().setName(sendname);
+                super.oneway(command);
+            }finally {
+                Thread.currentThread().setName(name);
+            }
+        } else {
+            super.oneway(command);
+        }
+    }
+    
+    
+
+}

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/Transport.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/Transport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/Transport.java Sat May 30 19:39:26 2009
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.activemq.Service;
+
+/**
+ * Represents the client side of a transport allowing messages to be sent
+ * synchronously, asynchronously and consumed.
+ * 
+ * @version $Revision: 1.5 $
+ */
+public interface Transport extends Service {
+
+    /**
+     * A one way asynchronous send
+     * 
+     * @param command
+     * @throws IOException
+     */
+    void oneway(Object command) throws IOException;
+
+    /**
+     * An asynchronous request response where the Receipt will be returned in
+     * the future. If responseCallback is not null, then it will be called when
+     * the response has been completed.
+     * 
+     * @param command
+     * @param responseCallback TODO
+     * @return the FutureResponse
+     * @throws IOException
+     */
+    <T> FutureResponse<T> asyncRequest(Object command, ResponseCallback<T> responseCallback) throws IOException;
+
+    /**
+     * A synchronous request response
+     * 
+     * @param command
+     * @return the response
+     * @throws IOException
+     */
+    Object request(Object command) throws IOException;
+
+    /**
+     * A synchronous request response
+     * 
+     * @param command
+     * @param timeout
+     * @return the repsonse or null if timeout
+     * @throws IOException
+     */
+    Object request(Object command, int timeout) throws IOException;
+
+    // /**
+    // * A one way asynchronous send
+    // * @param command
+    // * @throws IOException
+    // */
+    // void oneway(Command command) throws IOException;
+    //
+    // /**
+    // * An asynchronous request response where the Receipt will be returned
+    // * in the future. If responseCallback is not null, then it will be called
+    // * when the response has been completed.
+    // *
+    // * @param command
+    // * @param responseCallback TODO
+    // * @return the FutureResponse
+    // * @throws IOException
+    // */
+    // FutureResponse asyncRequest(Command command, ResponseCallback
+    // responseCallback) throws IOException;
+    //    
+    // /**
+    // * A synchronous request response
+    // * @param command
+    // * @return the response
+    // * @throws IOException
+    // */
+    // Response request(Command command) throws IOException;
+    //
+    // /**
+    // * A synchronous request response
+    // * @param command
+    // * @param timeout
+    // * @return the repsonse or null if timeout
+    // * @throws IOException
+    // */
+    // Response request(Command command, int timeout) throws IOException;
+
+    /**
+     * Returns the current transport listener
+     * 
+     * @return
+     */
+    TransportListener getTransportListener();
+
+    /**
+     * Registers an inbound command listener
+     * 
+     * @param commandListener
+     */
+    void setTransportListener(TransportListener commandListener);
+
+    /**
+     * @param target
+     * @return the target
+     */
+    <T> T narrow(Class<T> target);
+
+    /**
+     * @return the remote address for this connection
+     */
+    String getRemoteAddress();
+
+    /**
+     * Indicates if the transport can handle faults
+     * 
+     * @return true if fault tolerant
+     */
+    boolean isFaultTolerant();
+    
+    /**
+     * @return true if the transport is disposed
+     */
+    boolean isDisposed();
+    
+    /**
+     * @return true if the transport is connected
+     */
+    boolean isConnected();
+    
+    /**
+     * reconnect to another location
+     * @param uri
+     * @throws IOException on failure of if not supported
+     */
+    void reconnect(URI uri) throws IOException;
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/Transport.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java Sat May 30 19:39:26 2009
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+
+public interface TransportAcceptListener {
+    
+    void onAccept(Transport transport);
+    
+    void onAcceptError(Exception error);
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFactory.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFactory.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFactory.java Sat May 30 19:39:26 2009
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+import org.apache.activemq.util.FactoryFinder;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+public abstract class TransportFactory {
+
+    private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
+    private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
+    private static final ConcurrentHashMap<String, TransportFactory> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactory>();
+
+    private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
+    private static final String THREAD_NAME_FILTER = "threadName";
+    
+    public abstract TransportServer doBind(URI location) throws IOException;
+
+    public Transport doConnect(URI location, Executor ex) throws Exception {
+        return doConnect(location);
+    }
+
+    public Transport doCompositeConnect(URI location, Executor ex) throws Exception {
+        return doCompositeConnect(location);
+    }
+
+    /**
+     * Creates a normal transport.
+     * 
+     * @param location
+     * @return the transport
+     * @throws Exception
+     */
+    public static Transport connect(URI location) throws Exception {
+        TransportFactory tf = findTransportFactory(location);
+        return tf.doConnect(location);
+    }
+
+    /**
+     * Creates a normal transport.
+     * 
+     * @param location
+     * @param ex
+     * @return the transport
+     * @throws Exception
+     */
+    public static Transport connect(URI location, Executor ex) throws Exception {
+        TransportFactory tf = findTransportFactory(location);
+        return tf.doConnect(location, ex);
+    }
+
+    /**
+     * Creates a slimmed down transport that is more efficient so that it can be
+     * used by composite transports like reliable and HA.
+     * 
+     * @param location
+     * @return the Transport
+     * @throws Exception
+     */
+    public static Transport compositeConnect(URI location) throws Exception {
+        TransportFactory tf = findTransportFactory(location);
+        return tf.doCompositeConnect(location);
+    }
+
+    /**
+     * Creates a slimmed down transport that is more efficient so that it can be
+     * used by composite transports like reliable and HA.
+     * 
+     * @param location
+     * @param ex
+     * @return the Transport
+     * @throws Exception
+     */
+    public static Transport compositeConnect(URI location, Executor ex) throws Exception {
+        TransportFactory tf = findTransportFactory(location);
+        return tf.doCompositeConnect(location, ex);
+    }
+
+    public static TransportServer bind(URI location) throws IOException {
+        TransportFactory tf = findTransportFactory(location);
+        return tf.doBind(location);
+    }
+
+    /**
+     * @deprecated 
+     */
+    public static TransportServer bind(String brokerId, URI location) throws IOException {
+        return bind(location);
+    }
+    
+//    public static TransportServer bind(BrokerService brokerService, URI location) throws IOException {
+//        TransportFactory tf = findTransportFactory(location);
+//        if( brokerService!=null && tf instanceof BrokerServiceAware ) {
+//            ((BrokerServiceAware)tf).setBrokerService(brokerService);
+//        }
+//        try {
+//            if( brokerService!=null ) {
+//                SslContext.setCurrentSslContext(brokerService.getSslContext());
+//            }
+//            return tf.doBind(location);
+//        } finally {
+//            SslContext.setCurrentSslContext(null);
+//        }
+//    }    
+
+    public Transport doConnect(URI location) throws Exception {
+        try {
+            Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
+            WireFormat wf = createWireFormat(options);
+            Transport transport = createTransport(location, wf);
+            Transport rc = configure(transport, wf, options);
+            if (!options.isEmpty()) {
+                throw new IllegalArgumentException("Invalid connect parameters: " + options);
+            }
+            return rc;
+        } catch (URISyntaxException e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    public Transport doCompositeConnect(URI location) throws Exception {
+        try {
+            Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
+            WireFormat wf = createWireFormat(options);
+            Transport transport = createTransport(location, wf);
+            Transport rc = compositeConfigure(transport, wf, options);
+            if (!options.isEmpty()) {
+                throw new IllegalArgumentException("Invalid connect parameters: " + options);
+            }
+            return rc;
+
+        } catch (URISyntaxException e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+    
+     /**
+      * Allow registration of a transport factory without wiring via META-INF classes
+     * @param scheme
+     * @param tf
+     */
+    public static void registerTransportFactory(String scheme, TransportFactory tf) {
+        TRANSPORT_FACTORYS.put(scheme, tf);
+      }
+
+    /**
+     * Factory method to create a new transport
+     * 
+     * @throws IOException
+     * @throws UnknownHostException
+     */
+    protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException, UnknownHostException, IOException {
+        throw new IOException("createTransport() method not implemented!");
+    }
+
+    /**
+     * @param location
+     * @return
+     * @throws IOException
+     */
+    private static TransportFactory findTransportFactory(URI location) throws IOException {
+        String scheme = location.getScheme();
+        if (scheme == null) {
+            throw new IOException("Transport not scheme specified: [" + location + "]");
+        }
+        TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
+        if (tf == null) {
+            // Try to load if from a META-INF property.
+            try {
+                tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
+                TRANSPORT_FACTORYS.put(scheme, tf);
+            } catch (Throwable e) {
+                throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
+            }
+        }
+        return tf;
+    }
+
+    protected WireFormat createWireFormat(Map<String, String> options) throws IOException {
+        WireFormatFactory factory = createWireFormatFactory(options);
+        WireFormat format = factory.createWireFormat();
+        return format;
+    }
+
+    protected WireFormatFactory createWireFormatFactory(Map<String, String> options) throws IOException {
+        String wireFormat = (String)options.remove("wireFormat");
+        if (wireFormat == null) {
+            wireFormat = getDefaultWireFormatType();
+        }
+
+        try {
+            WireFormatFactory wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat);
+            IntrospectionSupport.setProperties(wff, options, "wireFormat.");
+            return wff;
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
+        }
+    }
+
+    protected String getDefaultWireFormatType() {
+        return "default";
+    }
+
+    /**
+     * Fully configures and adds all need transport filters so that the
+     * transport can be used by the JMS client.
+     * 
+     * @param transport
+     * @param wf
+     * @param options
+     * @return
+     * @throws Exception
+     */
+    public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
+        transport = compositeConfigure(transport, wf, options);
+
+        transport = new MutexTransport(transport);
+        transport = new ResponseCorrelator(transport);
+
+        return transport;
+    }
+
+    /**
+     * Fully configures and adds all need transport filters so that the
+     * transport can be used by the ActiveMQ message broker. The main difference
+     * between this and the configure() method is that the broker does not issue
+     * requests to the client so the ResponseCorrelator is not needed.
+     * 
+     * @param transport
+     * @param format
+     * @param options
+     * @return
+     * @throws Exception
+     */
+    public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
+        if (options.containsKey(WRITE_TIMEOUT_FILTER)) {
+            transport = new WriteTimeoutFilter(transport);
+            String soWriteTimeout = (String)options.get(WRITE_TIMEOUT_FILTER);
+            if (soWriteTimeout!=null) ((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout));
+        }
+        if (options.containsKey(THREAD_NAME_FILTER)) {
+            transport = new ThreadNameFilter(transport);
+        }
+        transport = compositeConfigure(transport, format, options);
+        transport = new MutexTransport(transport);
+        return transport;
+    }
+
+    /**
+     * Similar to configure(...) but this avoid adding in the MutexTransport and
+     * ResponseCorrelator transport layers so that the resulting transport can
+     * more efficiently be used as part of a composite transport.
+     * 
+     * @param transport
+     * @param format
+     * @param options
+     * @return
+     */
+    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+        IntrospectionSupport.setProperties(transport, options);
+        return transport;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFactory.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFilter.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFilter.java Sat May 30 19:39:26 2009
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * @version $Revision: 1.5 $
+ */
+public class TransportFilter implements TransportListener, Transport {
+    protected final Transport next;
+    protected TransportListener transportListener;
+
+    public TransportFilter(Transport next) {
+        this.next = next;
+    }
+
+    public TransportListener getTransportListener() {
+        return transportListener;
+    }
+
+    public void setTransportListener(TransportListener channelListener) {
+        this.transportListener = channelListener;
+        if (channelListener == null) {
+            next.setTransportListener(null);
+        } else {
+            next.setTransportListener(this);
+        }
+    }
+
+    /**
+     * @see org.apache.activemq.Service#start()
+     * @throws IOException if the next channel has not been set.
+     */
+    public void start() throws Exception {
+        if (next == null) {
+            throw new IOException("The next channel has not been set.");
+        }
+        if (transportListener == null) {
+            throw new IOException("The command listener has not been set.");
+        }
+        next.start();
+    }
+
+    /**
+     * @see org.apache.activemq.Service#stop()
+     */
+    public void stop() throws Exception {
+        next.stop();
+    }
+
+    public void onCommand(Object command) {
+        transportListener.onCommand(command);
+    }
+
+    /**
+     * @return Returns the next.
+     */
+    public Transport getNext() {
+        return next;
+    }
+
+    public String toString() {
+        return next.toString();
+    }
+
+    public void oneway(Object command) throws IOException {
+        next.oneway(command);
+    }
+
+    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
+        return next.asyncRequest(command, null);
+    }
+
+    public Object request(Object command) throws IOException {
+        return next.request(command);
+    }
+
+    public Object request(Object command, int timeout) throws IOException {
+        return next.request(command, timeout);
+    }
+
+    public void onException(IOException error) {
+        transportListener.onException(error);
+    }
+
+    public void transportInterupted() {
+        transportListener.transportInterupted();
+    }
+
+    public void transportResumed() {
+        transportListener.transportResumed();
+    }
+
+    public <T> T narrow(Class<T> target) {
+        if (target.isAssignableFrom(getClass())) {
+            return target.cast(this);
+        }
+        return next.narrow(target);
+    }
+
+    public String getRemoteAddress() {
+        return next.getRemoteAddress();
+    }
+
+    /**
+     * @return
+     * @see org.apache.activemq.transport.Transport#isFaultTolerant()
+     */
+    public boolean isFaultTolerant() {
+        return next.isFaultTolerant();
+    }
+
+	public boolean isDisposed() {
+		return next.isDisposed();
+	}
+	
+	public boolean isConnected() {
+        return next.isConnected();
+    }
+
+	public void reconnect(URI uri) throws IOException {
+		next.reconnect(uri);
+	}
+}

Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFilter.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportListener.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportListener.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportListener.java Sat May 30 19:39:26 2009
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+
+
+/**
+ * An asynchronous listener of commands
+ *
+ * @version $Revision$
+ */
+public interface TransportListener {
+    
+    /**
+     * called to process a command
+     * @param command
+     */
+    void onCommand(Object command);
+    /**
+     * An unrecoverable exception has occured on the transport
+     * @param error
+     */
+    void onException(IOException error);
+    
+    /**
+     * The transport has suffered an interuption from which it hopes to recover
+     *
+     */
+    void transportInterupted();
+    
+    
+    /**
+     * The transport has resumed after an interuption
+     *
+     */
+    void transportResumed();
+    
+}

Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportListener.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportServer.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportServer.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportServer.java Sat May 30 19:39:26 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+
+import org.apache.activemq.Service;
+
+/**
+ * A TransportServer asynchronously accepts {@see Transport} objects and then
+ * delivers those objects to a {@see TransportAcceptListener}.
+ * 
+ * @version $Revision: 1.4 $
+ */
+public interface TransportServer extends Service {
+
+    /**
+     * Registers an {@see TransportAcceptListener} which is notified of accepted
+     * channels.
+     * 
+     * @param acceptListener
+     */
+    void setAcceptListener(TransportAcceptListener acceptListener);
+
+    URI getConnectURI();
+
+    /**
+     * @return The socket address that this transport is accepting connections
+     *         on or null if this does not or is not currently accepting
+     *         connections on a socket.
+     */
+    InetSocketAddress getSocketAddress();
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportServer.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java Sat May 30 19:39:26 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.wireformat;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
+
+/**
+ * A simple implementation which uses Object Stream serialization.
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class ObjectStreamWireFormat implements WireFormat {
+
+    public ByteSequence marshal(Object command) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream ds = new DataOutputStream(baos);
+        marshal(command, ds);
+        ds.close();
+        return baos.toByteSequence();
+    }
+
+    public Object unmarshal(ByteSequence packet) throws IOException {
+        return unmarshal(new DataInputStream(new ByteArrayInputStream(packet)));
+    }
+
+    public void marshal(Object command, DataOutput ds) throws IOException {
+        ObjectOutputStream out = new ObjectOutputStream((OutputStream)ds);
+        out.writeObject(command);
+        out.flush();
+        out.reset();
+    }
+
+    public Object unmarshal(DataInput ds) throws IOException {
+        try {
+            ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream((InputStream)ds);
+            Object command;
+            command = in.readObject();
+            in.close();
+            return command;
+        } catch (ClassNotFoundException e) {
+            throw (IOException)new IOException("unmarshal failed: " + e).initCause(e);
+        }
+    }
+
+    public void setVersion(int version) {
+    }
+
+    public int getVersion() {
+        return 0;
+    }
+
+	public boolean inReceive() {
+		// TODO implement the inactivity monitor
+		return false;
+	}
+    
+    
+
+}

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/WireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/WireFormat.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/WireFormat.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/WireFormat.java Sat May 30 19:39:26 2009
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.wireformat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activemq.util.ByteSequence;
+
+
+/**
+ * Provides a mechanism to marshal commands into and out of packets
+ * or into and out of streams, Channels and Datagrams.
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface WireFormat {
+
+    /**
+     * Packet based marshaling 
+     */
+    ByteSequence marshal(Object command) throws IOException;
+    
+    /**
+     * Packet based un-marshaling 
+     */
+    Object unmarshal(ByteSequence packet) throws IOException;
+
+    /**
+     * Stream based marshaling 
+     */
+    void marshal(Object command, DataOutput out) throws IOException;
+    
+    /**
+     * Packet based un-marshaling 
+     */
+    Object unmarshal(DataInput in) throws IOException;
+    
+    /**
+     * @param the version of the wire format
+     */
+    void setVersion(int version);
+    
+    /**
+     * @return the version of the wire format
+     */
+    int getVersion();
+    
+    /**
+     * @return true if message is being received
+     */
+    boolean inReceive();
+    
+}

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java Sat May 30 19:39:26 2009
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.wireformat;
+
+public interface WireFormatFactory {
+    WireFormat createWireFormat();    
+}

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/package.html?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/package.html (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/package.html Sat May 30 19:39:26 2009
@@ -0,0 +1,27 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+<p>
+An API for WireFormats which are used to turn object into bytes and bytes into objects.
+</p>
+
+</body>
+</html>

Modified: activemq/sandbox/activemq-flow/activemq-util/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/pom.xml?rev=780312&r1=780311&r2=780312&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-util/pom.xml Sat May 30 19:39:26 2009
@@ -32,7 +32,7 @@
   <name>ActiveMQ :: Util</name>
 
   <dependencies>
-        
+                
     <!-- Testing Dependencies -->    
     <dependency>
       <groupId>junit</groupId>

Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java Sat May 30 19:39:26 2009
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+import java.lang.reflect.Proxy;
+import java.util.HashMap;
+
+public class ClassLoadingAwareObjectInputStream extends ObjectInputStream {
+
+    private static final ClassLoader FALLBACK_CLASS_LOADER = ClassLoadingAwareObjectInputStream.class.getClassLoader();
+    /** <p>Maps primitive type names to corresponding class objects.</p> */
+    private static final HashMap<String, Class> primClasses = new HashMap<String, Class>(8, 1.0F);
+    public ClassLoadingAwareObjectInputStream(InputStream in) throws IOException {
+        super(in);
+    }
+
+    protected Class resolveClass(ObjectStreamClass classDesc) throws IOException, ClassNotFoundException {
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        return load(classDesc.getName(), cl);
+    }
+
+    protected Class resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException {
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        Class[] cinterfaces = new Class[interfaces.length];
+        for (int i = 0; i < interfaces.length; i++) {
+            cinterfaces[i] = load(interfaces[i], cl);
+        }
+
+        try {
+            return Proxy.getProxyClass(cinterfaces[0].getClassLoader(), cinterfaces);
+        } catch (IllegalArgumentException e) {
+            throw new ClassNotFoundException(null, e);
+        }
+    }
+
+    private Class load(String className, ClassLoader cl)
+            throws ClassNotFoundException {
+        try {
+            return Class.forName(className, false, cl);
+        } catch (ClassNotFoundException e) {
+            final Class clazz = (Class) primClasses.get(className);
+            if (clazz != null) {
+                return clazz;
+            } else {
+                return Class.forName(className, false, FALLBACK_CLASS_LOADER);
+            }
+        }
+    }
+    
+    
+    
+    static {
+        primClasses.put("boolean", boolean.class);
+        primClasses.put("byte", byte.class);
+        primClasses.put("char", char.class);
+        primClasses.put("short", short.class);
+        primClasses.put("int", int.class);
+        primClasses.put("long", long.class);
+        primClasses.put("float", float.class);
+        primClasses.put("double", double.class);
+        primClasses.put("void", void.class);
+    }
+
+}

Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOExceptionSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOExceptionSupport.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOExceptionSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOExceptionSupport.java Sat May 30 19:39:26 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.IOException;
+
+public final class IOExceptionSupport {
+
+    private IOExceptionSupport() {
+    }
+
+    public static IOException create(String msg, Throwable cause) {
+        IOException exception = new IOException(msg);
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static IOException create(String msg, Exception cause) {
+        IOException exception = new IOException(msg);
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static IOException create(Throwable cause) {
+        IOException exception = new IOException(cause.getMessage());
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static IOException create(Exception cause) {
+        IOException exception = new IOException(cause.getMessage());
+        exception.initCause(cause);
+        return exception;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOExceptionSupport.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntSequenceGenerator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntSequenceGenerator.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntSequenceGenerator.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntSequenceGenerator.java Sat May 30 19:39:26 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+public class IntSequenceGenerator {
+
+    private int lastSequenceId;
+
+    public synchronized int getNextSequenceId() {
+        return ++lastSequenceId;
+    }
+
+    public synchronized int getLastSequenceId() {
+        return lastSequenceId;
+    }
+
+    public synchronized void setLastSequenceId(int l) {
+        lastSequenceId = l;
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java Sat May 30 19:39:26 2009
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.beans.PropertyEditor;
+import java.beans.PropertyEditorManager;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+
+
+
+public final class IntrospectionSupport {
+	
+	static {
+		// Add Spring and ActiveMQ specific property editors
+		String[] additionalPath = new String[] {
+				"org.springframework.beans.propertyeditors",
+				"org.apache.activemq.util" };
+
+		String[] searchPath = (String[]) Array.newInstance(String.class,
+				PropertyEditorManager.getEditorSearchPath().length
+						+ additionalPath.length);
+		System.arraycopy(PropertyEditorManager.getEditorSearchPath(), 0,
+				searchPath, 0,
+				PropertyEditorManager.getEditorSearchPath().length);
+		System.arraycopy(additionalPath, 0, searchPath, PropertyEditorManager
+				.getEditorSearchPath().length, additionalPath.length);
+		PropertyEditorManager.setEditorSearchPath(searchPath);
+	}
+    
+    private IntrospectionSupport() {
+    }
+
+    public static boolean getProperties(Object target, Map props, String optionPrefix) {
+
+        boolean rc = false;
+        if (target == null) {
+            throw new IllegalArgumentException("target was null.");
+        }
+        if (props == null) {
+            throw new IllegalArgumentException("props was null.");
+        }
+
+        if (optionPrefix == null) {
+            optionPrefix = "";
+        }
+
+        Class clazz = target.getClass();
+        Method[] methods = clazz.getMethods();
+        for (int i = 0; i < methods.length; i++) {
+            Method method = methods[i];
+            String name = method.getName();
+            Class type = method.getReturnType();
+            Class params[] = method.getParameterTypes();
+            if ((name.startsWith("is") || name.startsWith("get")) && params.length == 0 && type != null && isSettableType(type)) {
+
+                try {
+
+                    Object value = method.invoke(target, new Object[] {});
+                    if (value == null) {
+                        continue;
+                    }
+
+                    String strValue = convertToString(value, type);
+                    if (strValue == null) {
+                        continue;
+                    }
+                    if (name.startsWith("get")) {
+                        name = name.substring(3, 4).toLowerCase()
+                                + name.substring(4);
+                    } else {
+                        name = name.substring(2, 3).toLowerCase()
+                                + name.substring(3);
+                    }
+                    props.put(optionPrefix + name, strValue);
+                    rc = true;
+
+                } catch (Throwable ignore) {
+                }
+
+            }
+        }
+
+        return rc;
+    }
+
+    public static boolean setProperties(Object target, Map<String, ?> props, String optionPrefix) {
+        boolean rc = false;
+        if (target == null) {
+            throw new IllegalArgumentException("target was null.");
+        }
+        if (props == null) {
+            throw new IllegalArgumentException("props was null.");
+        }
+
+        for (Iterator<String> iter = props.keySet().iterator(); iter.hasNext();) {
+            String name = iter.next();
+            if (name.startsWith(optionPrefix)) {
+                Object value = props.get(name);
+                name = name.substring(optionPrefix.length());
+                if (setProperty(target, name, value)) {
+                    iter.remove();
+                    rc = true;
+                }
+            }
+        }
+        return rc;
+    }
+
+    public static Map<String, Object> extractProperties(Map props, String optionPrefix) {
+        if (props == null) {
+            throw new IllegalArgumentException("props was null.");
+        }
+
+        HashMap<String, Object> rc = new HashMap<String, Object>(props.size());
+
+        for (Iterator iter = props.keySet().iterator(); iter.hasNext();) {
+            String name = (String)iter.next();
+            if (name.startsWith(optionPrefix)) {
+                Object value = props.get(name);
+                name = name.substring(optionPrefix.length());
+                rc.put(name, value);
+                iter.remove();
+            }
+        }
+
+        return rc;
+    }
+
+    public static boolean setProperties(Object target, Map props) {
+        boolean rc = false;
+
+        if (target == null) {
+            throw new IllegalArgumentException("target was null.");
+        }
+        if (props == null) {
+            throw new IllegalArgumentException("props was null.");
+        }
+
+        for (Iterator iter = props.entrySet().iterator(); iter.hasNext();) {
+            Map.Entry entry = (Entry)iter.next();
+            if (setProperty(target, (String)entry.getKey(), entry.getValue())) {
+                iter.remove();
+                rc = true;
+            }
+        }
+
+        return rc;
+    }
+
+    public static boolean setProperty(Object target, String name, Object value) {
+        try {
+            Class clazz = target.getClass();
+            Method setter = findSetterMethod(clazz, name);
+            if (setter == null) {
+                return false;
+            }
+
+            // If the type is null or it matches the needed type, just use the
+            // value directly
+            if (value == null || value.getClass() == setter.getParameterTypes()[0]) {
+                setter.invoke(target, new Object[] {value});
+            } else {
+                // We need to convert it
+                setter.invoke(target, new Object[] {convert(value, setter.getParameterTypes()[0])});
+            }
+            return true;
+        } catch (Throwable ignore) {
+            return false;
+        }
+    }
+
+    private static Object convert(Object value, Class type) {
+        PropertyEditor editor = PropertyEditorManager.findEditor(type);
+        if (editor != null) {
+            editor.setAsText(value.toString());
+            return editor.getValue();
+        }
+        return null;
+    }
+
+    public static String convertToString(Object value, Class type) {
+        PropertyEditor editor = PropertyEditorManager.findEditor(type);
+        if (editor != null) {
+            editor.setValue(value);
+            return editor.getAsText();
+        }
+        return null;
+    }
+
+    private static Method findSetterMethod(Class clazz, String name) {
+        // Build the method name.
+        name = "set" + name.substring(0, 1).toUpperCase() + name.substring(1);
+        Method[] methods = clazz.getMethods();
+        for (int i = 0; i < methods.length; i++) {
+            Method method = methods[i];
+            Class params[] = method.getParameterTypes();
+            if (method.getName().equals(name) && params.length == 1 ) {
+                return method;
+            }
+        }
+        return null;
+    }
+
+    private static boolean isSettableType(Class clazz) {
+        if (PropertyEditorManager.findEditor(clazz) != null) {
+            return true;
+        }
+        	
+        return false;
+    }
+
+    public static String toString(Object target) {
+        return toString(target, Object.class, null);
+    }
+    
+    public static String toString(Object target, Class stopClass) {
+    	return toString(target, stopClass, null);
+    }
+
+    public static String toString(Object target, Class stopClass, Map<String, Object> overrideFields) {
+        LinkedHashMap<String, Object> map = new LinkedHashMap<String, Object>();
+        addFields(target, target.getClass(), stopClass, map);
+        if (overrideFields != null) {
+        	for(String key : overrideFields.keySet()) {
+        	    Object value = overrideFields.get(key);
+        	    map.put(key, value);
+        	}
+
+        }
+        StringBuffer buffer = new StringBuffer(simpleName(target.getClass()));
+        buffer.append(" {");
+        Set entrySet = map.entrySet();
+        boolean first = true;
+        for (Iterator iter = entrySet.iterator(); iter.hasNext();) {
+            Map.Entry entry = (Map.Entry)iter.next();
+            if (first) {
+                first = false;
+            } else {
+                buffer.append(", ");
+            }
+            buffer.append(entry.getKey());
+            buffer.append(" = ");
+            appendToString(buffer, entry.getValue());
+        }
+        buffer.append("}");
+        return buffer.toString();
+    }
+
+    protected static void appendToString(StringBuffer buffer, Object value) {
+//        if (value instanceof ActiveMQDestination) {
+//            ActiveMQDestination destination = (ActiveMQDestination)value;
+//            buffer.append(destination.getQualifiedName());
+//        } else {
+            buffer.append(value);
+//        }
+    }
+
+    public static String simpleName(Class clazz) {
+        String name = clazz.getName();
+        int p = name.lastIndexOf(".");
+        if (p >= 0) {
+            name = name.substring(p + 1);
+        }
+        return name;
+    }
+
+    private static void addFields(Object target, Class startClass, Class<Object> stopClass, LinkedHashMap<String, Object> map) {
+
+        if (startClass != stopClass) {
+            addFields(target, startClass.getSuperclass(), stopClass, map);
+        }
+
+        Field[] fields = startClass.getDeclaredFields();
+        for (int i = 0; i < fields.length; i++) {
+            Field field = fields[i];
+            if (Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())
+                || Modifier.isPrivate(field.getModifiers())) {
+                continue;
+            }
+
+            try {
+                field.setAccessible(true);
+                Object o = field.get(target);
+                if (o != null && o.getClass().isArray()) {
+                    try {
+                        o = Arrays.asList((Object[])o);
+                    } catch (Throwable e) {
+                    }
+                }
+                map.put(field.getName(), o);
+            } catch (Throwable e) {
+                e.printStackTrace();
+            }
+        }
+
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/URISupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/URISupport.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/URISupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/URISupport.java Sat May 30 19:39:26 2009
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @version $Revision$
+ */
+public class URISupport {
+
+    public static class CompositeData {
+        private String host;
+        private String scheme;
+        private String path;
+        private URI components[];
+        private Map<String, String> parameters;
+        private String fragment;
+
+        public URI[] getComponents() {
+            return components;
+        }
+
+        public String getFragment() {
+            return fragment;
+        }
+
+        public Map<String, String> getParameters() {
+            return parameters;
+        }
+
+        public String getScheme() {
+            return scheme;
+        }
+
+        public String getPath() {
+            return path;
+        }
+
+        public String getHost() {
+            return host;
+        }
+
+        public URI toURI() throws URISyntaxException {
+            StringBuffer sb = new StringBuffer();
+            if (scheme != null) {
+                sb.append(scheme);
+                sb.append(':');
+            }
+
+            if (host != null && host.length() != 0) {
+                sb.append(host);
+            } else {
+                sb.append('(');
+                for (int i = 0; i < components.length; i++) {
+                    if (i != 0) {
+                        sb.append(',');
+                    }
+                    sb.append(components[i].toString());
+                }
+                sb.append(')');
+            }
+
+            if (path != null) {
+                sb.append('/');
+                sb.append(path);
+            }
+            if (!parameters.isEmpty()) {
+                sb.append("?");
+                sb.append(createQueryString(parameters));
+            }
+            if (fragment != null) {
+                sb.append("#");
+                sb.append(fragment);
+            }
+            return new URI(sb.toString());
+        }
+    }
+
+    public static Map<String, String> parseQuery(String uri) throws URISyntaxException {
+        try {
+            Map<String, String> rc = new HashMap<String, String>();
+            if (uri != null) {
+                String[] parameters = uri.split("&");
+                for (int i = 0; i < parameters.length; i++) {
+                    int p = parameters[i].indexOf("=");
+                    if (p >= 0) {
+                        String name = URLDecoder.decode(parameters[i].substring(0, p), "UTF-8");
+                        String value = URLDecoder.decode(parameters[i].substring(p + 1), "UTF-8");
+                        rc.put(name, value);
+                    } else {
+                        rc.put(parameters[i], null);
+                    }
+                }
+            }
+            return rc;
+        } catch (UnsupportedEncodingException e) {
+            throw (URISyntaxException)new URISyntaxException(e.toString(), "Invalid encoding").initCause(e);
+        }
+    }
+
+    public static Map<String, String> parseParamters(URI uri) throws URISyntaxException {
+        return uri.getQuery() == null ? emptyMap() : parseQuery(stripPrefix(uri.getQuery(), "?"));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Map<String, String> emptyMap() {
+        return Collections.EMPTY_MAP;
+    }
+
+    /**
+     * Removes any URI query from the given uri
+     */
+    public static URI removeQuery(URI uri) throws URISyntaxException {
+        return createURIWithQuery(uri, null);
+    }
+
+    /**
+     * Creates a URI with the given query
+     */
+    public static URI createURIWithQuery(URI uri, String query) throws URISyntaxException {
+        return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), uri.getPath(),
+                       query, uri.getFragment());
+    }
+
+    public static CompositeData parseComposite(URI uri) throws URISyntaxException {
+
+        CompositeData rc = new CompositeData();
+        rc.scheme = uri.getScheme();
+        String ssp = stripPrefix(uri.getSchemeSpecificPart().trim(), "//").trim();
+
+        parseComposite(uri, rc, ssp);
+
+        rc.fragment = uri.getFragment();
+        return rc;
+    }
+
+    /**
+     * @param uri
+     * @param rc
+     * @param ssp
+     * @param p
+     * @throws URISyntaxException
+     */
+    private static void parseComposite(URI uri, CompositeData rc, String ssp) throws URISyntaxException {
+        String componentString;
+        String params;
+
+        if (!checkParenthesis(ssp)) {
+            throw new URISyntaxException(uri.toString(), "Not a matching number of '(' and ')' parenthesis");
+        }
+
+        int p;
+        int intialParen = ssp.indexOf("(");
+        if (intialParen == 0) {
+            rc.host = ssp.substring(0, intialParen);
+            p = rc.host.indexOf("/");
+            if (p >= 0) {
+                rc.path = rc.host.substring(p);
+                rc.host = rc.host.substring(0, p);
+            }
+            p = ssp.lastIndexOf(")");
+            componentString = ssp.substring(intialParen + 1, p);
+            params = ssp.substring(p + 1).trim();
+
+        } else {
+            componentString = ssp;
+            params = "";
+        }
+
+        String components[] = splitComponents(componentString);
+        rc.components = new URI[components.length];
+        for (int i = 0; i < components.length; i++) {
+            rc.components[i] = new URI(components[i].trim());
+        }
+
+        p = params.indexOf("?");
+        if (p >= 0) {
+            if (p > 0) {
+                rc.path = stripPrefix(params.substring(0, p), "/");
+            }
+            rc.parameters = parseQuery(params.substring(p + 1));
+        } else {
+            if (params.length() > 0) {
+                rc.path = stripPrefix(params, "/");
+            }
+            rc.parameters = emptyMap();
+        }
+    }
+
+    /**
+     * @param componentString
+     * @return
+     */
+    private static String[] splitComponents(String str) {
+        List<String> l = new ArrayList<String>();
+
+        int last = 0;
+        int depth = 0;
+        char chars[] = str.toCharArray();
+        for (int i = 0; i < chars.length; i++) {
+            switch (chars[i]) {
+            case '(':
+                depth++;
+                break;
+            case ')':
+                depth--;
+                break;
+            case ',':
+                if (depth == 0) {
+                    String s = str.substring(last, i);
+                    l.add(s);
+                    last = i + 1;
+                }
+                break;
+            default:
+            }
+        }
+
+        String s = str.substring(last);
+        if (s.length() != 0) {
+            l.add(s);
+        }
+
+        String rc[] = new String[l.size()];
+        l.toArray(rc);
+        return rc;
+    }
+
+    public static String stripPrefix(String value, String prefix) {
+        if (value.startsWith(prefix)) {
+            return value.substring(prefix.length());
+        }
+        return value;
+    }
+
+    public static URI stripScheme(URI uri) throws URISyntaxException {
+        return new URI(stripPrefix(uri.getSchemeSpecificPart().trim(), "//"));
+    }
+
+    public static String createQueryString(Map options) throws URISyntaxException {
+        try {
+            if (options.size() > 0) {
+                StringBuffer rc = new StringBuffer();
+                boolean first = true;
+                for (Iterator iter = options.keySet().iterator(); iter.hasNext();) {
+                    if (first) {
+                        first = false;
+                    } else {
+                        rc.append("&");
+                    }
+                    String key = (String)iter.next();
+                    String value = (String)options.get(key);
+                    rc.append(URLEncoder.encode(key, "UTF-8"));
+                    rc.append("=");
+                    rc.append(URLEncoder.encode(value, "UTF-8"));
+                }
+                return rc.toString();
+            } else {
+                return "";
+            }
+        } catch (UnsupportedEncodingException e) {
+            throw (URISyntaxException)new URISyntaxException(e.toString(), "Invalid encoding").initCause(e);
+        }
+    }
+
+    /**
+     * Creates a URI from the original URI and the remaining paramaters
+     * 
+     * @throws URISyntaxException
+     */
+    public static URI createRemainingURI(URI originalURI, Map params) throws URISyntaxException {
+        String s = createQueryString(params);
+        if (s.length() == 0) {
+            s = null;
+        }
+        return createURIWithQuery(originalURI, s);
+    }
+
+    public static URI changeScheme(URI bindAddr, String scheme) throws URISyntaxException {
+        return new URI(scheme, bindAddr.getUserInfo(), bindAddr.getHost(), bindAddr.getPort(), bindAddr
+            .getPath(), bindAddr.getQuery(), bindAddr.getFragment());
+    }
+
+    public static boolean checkParenthesis(String str) {
+        boolean result = true;
+        if (str != null) {
+            int open = 0;
+            int closed = 0;
+
+            int i = 0;
+            while ((i = str.indexOf('(', i)) >= 0) {
+                i++;
+                open++;
+            }
+            i = 0;
+            while ((i = str.indexOf(')', i)) >= 0) {
+                i++;
+                closed++;
+            }
+            result = open == closed;
+        }
+        return result;
+    }
+
+    public int indexOfParenthesisMatch(String str) {
+        int result = -1;
+
+        return result;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/URISupport.java
------------------------------------------------------------------------------
    svn:executable = *