You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/02/12 12:29:31 UTC

svn commit: r620767 - in /incubator/qpid/branches/M2.1/java/client: example/src/main/java/org/apache/qpid/example/transport/ src/main/java/org/apache/mina/ src/main/java/org/apache/mina/transport/ src/main/java/org/apache/mina/transport/socket/ src/mai...

Author: ritchiem
Date: Tue Feb 12 03:29:19 2008
New Revision: 620767

URL: http://svn.apache.org/viewvc?rev=620767&view=rev
Log:
QPID-784 : Added ability to provide existing Socket to Qpid Client Libraries to use as for connection.
AMQBrokerDetails.java, BrokerDetails.java And ConnectionURLTest.java augmented to allow new transport type 'socket'
New ExistingSocketConnector, which utises a given Socket() rather than creating its own from a SocketChannel. This code was taken from the Mina library v1.0.0.
Changes to AMQConnection.java, SocketTransportConnection.java were required to allow the new Socket object to be passed through to the ExistingSocketConnector.
The TransportConnection.java was updated to return an ExistingSocketConnector when the 'socket' transport is used. 
AMQConnection.makeBrokerConnection was changed when the 'socket' transport is being used. This allows the set Socket to be passed down to the ExistingSocketConnector for the transport to be run over.



Added:
    incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/
    incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java   (with props)
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/transport/
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/transport/socket/
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/transport/socket/nio/
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java   (with props)
Modified:
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
    incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java

Added: incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java?rev=620767&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java (added)
+++ incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java Tue Feb 12 03:29:19 2008
@@ -0,0 +1,161 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.example.transport;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.channels.SocketChannel;
+
+/**
+ * This is a simple application that demonstrates how you can use the Qpid AMQP interfaces to use existing sockets as
+ * the transport for the Client API.
+ *
+ * The Demo here runs twice:
+ * 1. Just to show a simple publish and receive.
+ * 2. To demonstrate how to use existing sockets and utilise the underlying client failover mechnaism.
+ */
+public class ExistingSocketConnectorDemo implements ConnectionListener
+{
+    private static boolean DEMO_FAILOVER = false;
+
+    public static void main(String[] args) throws IOException, URLSyntaxException, AMQException, JMSException
+    {
+        System.out.println("Testing socket connection to localhost:5672.");
+
+        new ExistingSocketConnectorDemo();
+
+        System.out.println("Testing socket connection failover between localhost:5672 and localhost:5673.");
+
+        DEMO_FAILOVER = true;
+
+        new ExistingSocketConnectorDemo();
+    }
+
+    Connection _connection;
+    MessageProducer _producer;
+    Session _session;
+
+
+    /** Here we can see the broker we are connecting to is set to be 'socket:///' signifying we will provide the socket. */
+    public static final String CONNECTION = "amqp://guest:guest@id/test?brokerlist='socket:///'";
+
+    public ExistingSocketConnectorDemo() throws IOException, URLSyntaxException, AMQException, JMSException
+    {
+
+        Socket socket = SocketChannel.open().socket();
+        socket.connect(new InetSocketAddress("localhost", 5672));
+
+        _connection = new AMQConnection(CONNECTION, socket);
+
+        _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageConsumer consumer = _session.createConsumer(_session.createQueue("Queue"));
+
+        _producer = _session.createProducer(_session.createQueue("Queue"));
+
+        _connection.start();
+
+        if (!DEMO_FAILOVER)
+        {
+            _producer.send(_session.createTextMessage("Simple Test"));
+        }
+        else
+        {
+            // Using the Qpid interfaces we can set a listener that allows us to demonstrate failover
+            ((AMQConnection) _connection).setConnectionListener(this);
+
+            System.out.println("Testing failover: Please ensure second broker running on localhost:5673 and shutdown broker on 5672.");
+        }
+
+        //We do a blocking receive here so that we can demonstrate failover.
+        Message message = consumer.receive();
+
+        System.out.println("Recevied :" + message);
+
+        _connection.close();
+    }
+
+    // ConnectionListener Interface
+
+    public void bytesSent(long count)
+    {
+        //not used in this example
+    }
+    public void bytesReceived(long count)
+    {
+        //not used in this example
+    }
+
+    public boolean preFailover(boolean redirect)
+    {
+        /**
+         * This method is called before the underlying client library starts to reconnect. This gives us the opportunity
+         * to set a new socket for the failover to occur on.
+         */
+        try
+        {
+            Socket socket = SocketChannel.open().socket();
+
+            socket.connect(new InetSocketAddress("localhost", 5673));
+
+            // This is the new method to pass in an open socket for the connection to use.
+            ((AMQConnection) _connection).setOpenSocket(socket);
+        }
+        catch (IOException e)
+        {
+            e.printStackTrace();
+            return false;
+        }
+        return true;
+    }
+
+    public boolean preResubscribe()
+    {
+        //not used in this example - but must return true to allow the resubscription of existing clients.
+        return true;
+    }
+
+    public void failoverComplete()
+    {
+        // Now that failover has completed we can send a message that the receiving thread will pick up
+        try
+        {
+            _producer.send(_session.createTextMessage("Simple Failover Test"));
+        }
+        catch (JMSException e)
+        {
+            e.printStackTrace();
+        }
+    }
+}

Propchange: incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java?rev=620767&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java (added)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java Tue Feb 12 03:29:19 2008
@@ -0,0 +1,478 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoConnectorConfig;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.BaseIoConnector;
+import org.apache.mina.common.support.DefaultConnectFuture;
+import org.apache.mina.util.NamePreservingRunnable;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.util.Queue;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * {@link IoConnector} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class ExistingSocketConnector extends BaseIoConnector
+{
+    /** @noinspection StaticNonFinalField */
+    private static volatile int nextId = 0;
+
+    private final Object lock = new Object();
+    private final int id = nextId++;
+    private final String threadName = "SocketConnector-" + id;
+    private SocketConnectorConfig defaultConfig = new SocketConnectorConfig();
+    private final Queue connectQueue = new Queue();
+    private final SocketIoProcessor[] ioProcessors;
+    private final int processorCount;
+    private final Executor executor;
+
+    /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
+    private Selector selector;
+    private Worker worker;
+    private int processorDistributor = 0;
+    private int workerTimeout = 60;  // 1 min.
+    private Socket _openSocket = null;
+
+    /** Create a connector with a single processing thread using a NewThreadExecutor */
+    public ExistingSocketConnector()
+    {
+        this(1, new NewThreadExecutor());
+    }
+
+    /**
+     * Create a connector with the desired number of processing threads
+     *
+     * @param processorCount Number of processing threads
+     * @param executor       Executor to use for launching threads
+     */
+    public ExistingSocketConnector(int processorCount, Executor executor)
+    {
+        if (processorCount < 1)
+        {
+            throw new IllegalArgumentException("Must have at least one processor");
+        }
+
+        this.executor = executor;
+        this.processorCount = processorCount;
+        ioProcessors = new SocketIoProcessor[processorCount];
+
+        for (int i = 0; i < processorCount; i++)
+        {
+            ioProcessors[i] = new SocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor);
+        }
+    }
+
+    /**
+     * How many seconds to keep the connection thread alive between connection requests
+     *
+     * @return Number of seconds to keep connection thread alive
+     */
+    public int getWorkerTimeout()
+    {
+        return workerTimeout;
+    }
+
+    /**
+     * Set how many seconds the connection worker thread should remain alive once idle before terminating itself.
+     *
+     * @param workerTimeout Number of seconds to keep thread alive. Must be >=0
+     */
+    public void setWorkerTimeout(int workerTimeout)
+    {
+        if (workerTimeout < 0)
+        {
+            throw new IllegalArgumentException("Must be >= 0");
+        }
+        this.workerTimeout = workerTimeout;
+    }
+
+    public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config)
+    {
+        return connect(address, null, handler, config);
+    }
+
+    public ConnectFuture connect(SocketAddress address, SocketAddress localAddress,
+                                 IoHandler handler, IoServiceConfig config)
+    {
+        /** Changes here from the Mina OpenSocketConnector.
+         * Ignoreing all address as they are not needed */
+
+        if (handler == null)
+        {
+            throw new NullPointerException("handler");
+        }
+
+
+        if (config == null)
+        {
+            config = getDefaultConfig();
+        }
+
+        if (_openSocket == null)
+        {
+            throw new IllegalArgumentException("Specifed Socket not active");
+        }
+
+        boolean success = false;
+
+        try
+        {
+            DefaultConnectFuture future = new DefaultConnectFuture();
+            newSession(_openSocket, handler, config, future);
+            success = true;
+            return future;
+        }
+        catch (IOException e)
+        {
+            return DefaultConnectFuture.newFailedFuture(e);
+        }
+        finally
+        {
+            if (!success && _openSocket != null)
+            {
+                try
+                {
+                    _openSocket.close();
+                }
+                catch (IOException e)
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught(e);
+                }
+            }
+        }
+    }
+
+    public IoServiceConfig getDefaultConfig()
+    {
+        return defaultConfig;
+    }
+
+    /**
+     * Sets the config this connector will use by default.
+     *
+     * @param defaultConfig the default config.
+     *
+     * @throws NullPointerException if the specified value is <code>null</code>.
+     */
+    public void setDefaultConfig(SocketConnectorConfig defaultConfig)
+    {
+        if (defaultConfig == null)
+        {
+            throw new NullPointerException("defaultConfig");
+        }
+        this.defaultConfig = defaultConfig;
+    }
+
+    private synchronized void startupWorker() throws IOException
+    {
+        if (worker == null)
+        {
+            selector = Selector.open();
+            worker = new Worker();
+            executor.execute(new NamePreservingRunnable(worker));
+        }
+    }
+
+    private void registerNew()
+    {
+        if (connectQueue.isEmpty())
+        {
+            return;
+        }
+
+        for (; ;)
+        {
+            ConnectionRequest req;
+            synchronized (connectQueue)
+            {
+                req = (ConnectionRequest) connectQueue.pop();
+            }
+
+            if (req == null)
+            {
+                break;
+            }
+
+            SocketChannel ch = req.channel;
+            try
+            {
+                ch.register(selector, SelectionKey.OP_CONNECT, req);
+            }
+            catch (IOException e)
+            {
+                req.setException(e);
+            }
+        }
+    }
+
+    private void processSessions(Set keys)
+    {
+        Iterator it = keys.iterator();
+
+        while (it.hasNext())
+        {
+            SelectionKey key = (SelectionKey) it.next();
+
+            if (!key.isConnectable())
+            {
+                continue;
+            }
+
+            SocketChannel ch = (SocketChannel) key.channel();
+            ConnectionRequest entry = (ConnectionRequest) key.attachment();
+
+            boolean success = false;
+            try
+            {
+                ch.finishConnect();
+                newSession(ch, entry.handler, entry.config, entry);
+                success = true;
+            }
+            catch (Throwable e)
+            {
+                entry.setException(e);
+            }
+            finally
+            {
+                key.cancel();
+                if (!success)
+                {
+                    try
+                    {
+                        ch.close();
+                    }
+                    catch (IOException e)
+                    {
+                        ExceptionMonitor.getInstance().exceptionCaught(e);
+                    }
+                }
+            }
+        }
+
+        keys.clear();
+    }
+
+    private void processTimedOutSessions(Set keys)
+    {
+        long currentTime = System.currentTimeMillis();
+        Iterator it = keys.iterator();
+
+        while (it.hasNext())
+        {
+            SelectionKey key = (SelectionKey) it.next();
+
+            if (!key.isValid())
+            {
+                continue;
+            }
+
+            ConnectionRequest entry = (ConnectionRequest) key.attachment();
+
+            if (currentTime >= entry.deadline)
+            {
+                entry.setException(new ConnectException());
+                try
+                {
+                    key.channel().close();
+                }
+                catch (IOException e)
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught(e);
+                }
+                finally
+                {
+                    key.cancel();
+                }
+            }
+        }
+    }
+
+    private void newSession(Socket socket, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture)
+            throws IOException
+    {
+        SocketSessionImpl session = new SocketSessionImpl(this,
+                                                          nextProcessor(),
+                                                          getListeners(),
+                                                          config,
+                                                          socket.getChannel(),
+                                                          handler,
+                                                          socket.getRemoteSocketAddress());
+
+        newSession(session, config, connectFuture);
+    }
+
+    private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture)
+            throws IOException
+
+    {
+        SocketSessionImpl session = new SocketSessionImpl(this,
+                                                          nextProcessor(),
+                                                          getListeners(),
+                                                          config,
+                                                          ch,
+                                                          handler,
+                                                          ch.socket().getRemoteSocketAddress());
+
+        newSession(session, config, connectFuture);
+    }
+
+    private void newSession(SocketSessionImpl session, IoServiceConfig config, ConnectFuture connectFuture)
+            throws IOException
+    {
+        try
+        {
+            getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+            config.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+            config.getThreadModel().buildFilterChain(session.getFilterChain());
+        }
+        catch (Throwable e)
+        {
+            throw (IOException) new IOException("Failed to create a session.").initCause(e);
+        }
+        session.getIoProcessor().addNew(session);
+        connectFuture.setSession(session);
+    }
+
+    private SocketIoProcessor nextProcessor()
+    {
+        return ioProcessors[processorDistributor++ % processorCount];
+    }
+
+    public void setOpenSocket(Socket openSocket)
+    {
+        _openSocket = openSocket;
+    }
+
+    private class Worker implements Runnable
+    {
+        private long lastActive = System.currentTimeMillis();
+
+        public void run()
+        {
+            Thread.currentThread().setName(ExistingSocketConnector.this.threadName);
+
+            for (; ;)
+            {
+                try
+                {
+                    int nKeys = selector.select(1000);
+
+                    registerNew();
+
+                    if (nKeys > 0)
+                    {
+                        processSessions(selector.selectedKeys());
+                    }
+
+                    processTimedOutSessions(selector.keys());
+
+                    if (selector.keys().isEmpty())
+                    {
+                        if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L)
+                        {
+                            synchronized (lock)
+                            {
+                                if (selector.keys().isEmpty() &&
+                                    connectQueue.isEmpty())
+                                {
+                                    worker = null;
+                                    try
+                                    {
+                                        selector.close();
+                                    }
+                                    catch (IOException e)
+                                    {
+                                        ExceptionMonitor.getInstance().exceptionCaught(e);
+                                    }
+                                    finally
+                                    {
+                                        selector = null;
+                                    }
+                                    break;
+                                }
+                            }
+                        }
+                    }
+                    else
+                    {
+                        lastActive = System.currentTimeMillis();
+                    }
+                }
+                catch (IOException e)
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught(e);
+
+                    try
+                    {
+                        Thread.sleep(1000);
+                    }
+                    catch (InterruptedException e1)
+                    {
+                        ExceptionMonitor.getInstance().exceptionCaught(e1);
+                    }
+                }
+            }
+        }
+    }
+
+    private class ConnectionRequest extends DefaultConnectFuture
+    {
+        private final SocketChannel channel;
+        private final long deadline;
+        private final IoHandler handler;
+        private final IoServiceConfig config;
+
+        private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config)
+        {
+            this.channel = channel;
+            long timeout;
+            if (config instanceof IoConnectorConfig)
+            {
+                timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis();
+            }
+            else
+            {
+                timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis();
+            }
+            this.deadline = System.currentTimeMillis() + timeout;
+            this.handler = handler;
+            this.config = config;
+        }
+    }
+}

Propchange: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java?rev=620767&r1=620766&r2=620767&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java Tue Feb 12 03:29:19 2008
@@ -58,7 +58,8 @@
             {
                 //todo this list of valid transports should be enumerated somewhere
                 if ((!(transport.equalsIgnoreCase("vm") ||
-                       transport.equalsIgnoreCase("tcp"))))
+                       transport.equalsIgnoreCase("tcp") ||
+                       transport.equalsIgnoreCase("socket"))))
                 {
                     if (transport.equalsIgnoreCase("localhost"))
                     {

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=620767&r1=620766&r2=620767&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Feb 12 03:29:19 2008
@@ -30,6 +30,8 @@
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.transport.ITransportConnection;
+import org.apache.qpid.client.transport.SocketTransportConnection;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.*;
@@ -62,6 +64,7 @@
 import javax.naming.StringRefAddr;
 import java.io.IOException;
 import java.net.ConnectException;
+import java.net.Socket;
 import java.nio.channels.UnresolvedAddressException;
 import java.text.MessageFormat;
 import java.util.*;
@@ -157,6 +160,9 @@
     private static final long DEFAULT_TIMEOUT = 1000 * 30;
     private ProtocolVersion _protocolVersion;
 
+    /** The active socket that is to be used as a value for connection */
+    private Socket _openSocket;
+
     /**
      * @param broker      brokerdetails
      * @param username    username
@@ -173,7 +179,7 @@
         this(new AMQConnectionURL(
                 ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
                 + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
-                + AMQBrokerDetails.checkTransport(broker) + "'"), null);
+                + AMQBrokerDetails.checkTransport(broker) + "'"), null, null);
     }
 
     /**
@@ -192,7 +198,7 @@
         this(new AMQConnectionURL(
                 ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
                 + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
-                + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig);
+                + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig, null);
     }
 
     public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost)
@@ -217,26 +223,38 @@
                    + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'")
                 : (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
                    + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
-                   + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig);
+                   + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig, null);
+    }
+
+    public AMQConnection(String connection, Socket socket) throws AMQException, URLSyntaxException
+    {
+        this(new AMQConnectionURL(connection), null, socket);
     }
 
     public AMQConnection(String connection) throws AMQException, URLSyntaxException
     {
-        this(new AMQConnectionURL(connection), null);
+        this(new AMQConnectionURL(connection), null, null);
     }
 
     public AMQConnection(String connection, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
     {
-        this(new AMQConnectionURL(connection), sslConfig);
+        this(new AMQConnectionURL(connection), sslConfig, null);
     }
 
     public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
     {
+        this(connectionURL, sslConfig, null);
+    }
+
+    public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig, Socket socket) throws AMQException
+    {
         if (_logger.isInfoEnabled())
         {
             _logger.info("Connection:" + connectionURL);
         }
 
+        _openSocket = socket;
+
         _sslConfiguration = sslConfig;
         if (connectionURL == null)
         {
@@ -395,9 +413,26 @@
                 EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
         try
         {
-            TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail);
-            // this blocks until the connection has been set up or when an error
-            // has prevented the connection being set up
+
+            ITransportConnection connection = TransportConnection.getInstance(brokerDetail);
+
+            if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET))
+            {
+                if (_openSocket != null)
+                {
+                    ((SocketTransportConnection) connection).setOpenSocket(_openSocket);
+                }
+                else
+                {
+                    throw new IllegalArgumentException("Active Socket must be provided for broker " +
+                                                       "with 'socket' transport:" + brokerDetail);
+                }
+
+            }
+
+            connection.connect(_protocolHandler, brokerDetail);
+             // this blocks until the connection has been set up or when an error
+             // has prevented the connection being set up
 
             //_protocolHandler.attainState(AMQState.CONNECTION_OPEN);
             AMQState state = _protocolHandler.attainState(openOrClosedStates);
@@ -1290,6 +1325,11 @@
     public AMQSession getSession(int channelId)
     {
         return _sessions.get(channelId);
+    }
+
+    public void setOpenSocket(Socket socket)
+    {
+        _openSocket = socket;
     }
 
     public ProtocolVersion getProtocolVersion()

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java?rev=620767&r1=620766&r2=620767&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java Tue Feb 12 03:29:19 2008
@@ -24,6 +24,7 @@
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
 import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
 
@@ -36,6 +37,7 @@
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.Socket;
 
 public class SocketTransportConnection implements ITransportConnection
 {
@@ -44,6 +46,8 @@
 
     private SocketConnectorFactory _socketConnectorFactory;
 
+    private Socket _openSocket;
+
     static interface SocketConnectorFactory
     {
         IoConnector newSocketConnector();
@@ -54,6 +58,11 @@
         _socketConnectorFactory = socketConnectorFactory;
     }
 
+    public void setOpenSocket(Socket openSocket)
+    {
+        _openSocket = openSocket;
+    }
+
     public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
     {
         ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
@@ -83,8 +92,31 @@
         _logger.info("send-buffer-size = " + scfg.getSendBufferSize());
         scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE));
         _logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize());
-        final InetSocketAddress address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
-        _logger.info("Attempting connection to " + address);
+
+        final InetSocketAddress address;
+
+        if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET))
+        {
+            address = null;
+
+            if (_openSocket != null)
+            {
+                _logger.info("Using existing Socket:" + _openSocket);
+                ((ExistingSocketConnector) ioConnector).setOpenSocket(_openSocket);
+            }
+            else
+            {
+                throw new IllegalArgumentException("Active Socket must be provided for broker " +
+                                                   "with 'socket' transport:" + brokerDetail);
+            }
+        }
+        else
+        {
+            address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
+            _logger.info("Attempting connection to " + address);
+        }
+
+
         ConnectFuture future = ioConnector.connect(address, protocolHandler);
 
         // wait for connection to complete

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=620767&r1=620766&r2=620767&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Tue Feb 12 03:29:19 2008
@@ -23,6 +23,7 @@
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
 import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
 import org.apache.mina.transport.socket.nio.SocketConnector;
 import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
@@ -54,6 +55,7 @@
 
     private static final int TCP = 0;
     private static final int VM = 1;
+    private static final int SOCKET = 2;
 
     private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class);
 
@@ -87,7 +89,15 @@
 
         switch (transport)
         {
-
+            case SOCKET:
+                _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+                {
+                    public IoConnector newSocketConnector()
+                    {
+                        return new ExistingSocketConnector();
+                    }
+                });
+                break;
             case TCP:
                 _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
                 {
@@ -127,6 +137,11 @@
 
     private static int getTransport(String transport)
     {
+        if (transport.equals(BrokerDetails.SOCKET))
+        {
+            return SOCKET;
+        }
+
         if (transport.equals(BrokerDetails.TCP))
         {
             return TCP;

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=620767&r1=620766&r2=620767&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java Tue Feb 12 03:29:19 2008
@@ -34,6 +34,7 @@
     public static final String OPTIONS_CONNECT_DELAY = "connectdelay";
     public static final int DEFAULT_PORT = 5672;
 
+    public static final String SOCKET = "socket";
     public static final String TCP = "tcp";
     public static final String VM = "vm";
 

Modified: incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java?rev=620767&r1=620766&r2=620767&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java Tue Feb 12 03:29:19 2008
@@ -510,6 +510,23 @@
 
     }
 
+    public void testSocketProtocol() throws URLSyntaxException
+    {
+        String url = "amqp://guest:guest@id/test" + "?brokerlist='socket:///'";
+
+        try
+        {
+            AMQConnectionURL curl = new AMQConnectionURL(url);
+            assertNotNull(curl);
+            assertEquals(1, curl.getBrokerCount());
+            assertNotNull(curl.getBrokerDetails(0));
+            assertEquals("socket", curl.getBrokerDetails(0).getTransport());
+        }
+        catch (URLSyntaxException e)
+        {
+            fail(e.getMessage());
+        }
+    }
 
     public static junit.framework.Test suite()
     {