You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/03/27 18:31:15 UTC

svn commit: r522988 [3/3] - in /incubator/qpid/branches/qpid.0-9/java/newclient: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/qpid/ src/main/java/org/apache/qpid/nclient/ src/main/java/org/apach...

Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,412 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.url.URLSyntaxException;
+
+import java.util.*;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+public class AMQPConnectionURL implements ConnectionURL
+{
+    private String _url;
+    private String _failoverMethod;
+    private HashMap<String, String> _failoverOptions;
+    private HashMap<String, String> _options;
+    private List<BrokerDetails> _brokers;
+    private String _clientName;
+    private String _username;
+    private String _password;
+    private String _virtualHost;
+
+    public AMQPConnectionURL(String fullURL) throws URLSyntaxException
+    {
+        _url = fullURL;
+        _options = new HashMap<String, String>();
+        _brokers = new LinkedList<BrokerDetails>();
+        _failoverOptions = new HashMap<String, String>();
+        
+        try
+        {
+            URI connection = new URI(fullURL);
+
+            if (connection.getScheme() == null || !(connection.getScheme().equalsIgnoreCase(AMQ_PROTOCOL)))
+            {
+                throw new URISyntaxException(fullURL, "Not an AMQP URL");
+            }
+
+            if (connection.getHost() == null || connection.getHost().equals(""))
+            {
+                String uid = getUniqueClientID();
+                if (uid == null)
+                {
+                    URLHelper.parseError(-1, "Client Name not specified", fullURL);
+                }
+                else
+                {
+                    setClientName(uid);
+                }
+
+            }
+            else
+            {
+                setClientName(connection.getHost());
+            }
+
+            String userInfo = connection.getUserInfo();
+
+            if (userInfo == null)
+            {
+                //Fix for Java 1.5 which doesn't parse UserInfo for non http URIs
+                userInfo = connection.getAuthority();
+
+                if (userInfo != null)
+                {
+                    int atIndex = userInfo.indexOf('@');
+
+                    if (atIndex != -1)
+                    {
+                        userInfo = userInfo.substring(0, atIndex);
+                    }
+                    else
+                    {
+                        userInfo = null;
+                    }
+                }
+
+            }
+
+            if (userInfo == null)
+            {
+                URLHelper.parseError(AMQ_PROTOCOL.length() + 3,
+                        "User information not found on url", fullURL);
+            }
+            else
+            {
+                parseUserInfo(userInfo);
+            }
+            String virtualHost = connection.getPath();
+
+            if (virtualHost != null && (!virtualHost.equals("")))
+            {
+                setVirtualHost(virtualHost);
+            }
+            else
+            {
+                int authLength = connection.getAuthority().length();
+                int start = AMQ_PROTOCOL.length() + 3;
+                int testIndex = start + authLength;
+                if (testIndex < fullURL.length() && fullURL.charAt(testIndex) == '?')
+                {
+                    URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL);
+                }
+                else
+                {
+                    URLHelper.parseError(-1, "Virtual host not specified", fullURL);
+                }
+
+            }
+
+
+            URLHelper.parseOptions(_options, connection.getQuery());
+
+            processOptions();
+
+            //Fragment is #string (not used)
+            //System.out.println(connection.getFragment());
+
+        }
+        catch (URISyntaxException uris)
+        {
+            if (uris instanceof URLSyntaxException)
+            {
+                throw (URLSyntaxException) uris;
+            }
+
+            int slash = fullURL.indexOf("\\");
+
+            if (slash == -1)
+            {
+                URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+            }
+            else
+            {
+                if (slash != 0 && fullURL.charAt(slash - 1) == ':')
+                {
+                    URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL);
+                }
+                else
+                {
+                    URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL);
+                }
+            }
+
+        }
+    }
+    
+    private String getUniqueClientID()
+    {
+        try
+        {
+            InetAddress addr = InetAddress.getLocalHost();
+            return addr.getHostName() + System.currentTimeMillis();
+        }
+        catch (UnknownHostException e)
+        {
+            return null;
+        }
+    }
+
+    private void parseUserInfo(String userinfo) throws URLSyntaxException
+    {
+        //user info = user:pass
+
+        int colonIndex = userinfo.indexOf(':');
+
+        if (colonIndex == -1)
+        {
+            URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(),
+                    "Null password in user information not allowed.", _url);
+        }
+        else
+        {
+            setUsername(userinfo.substring(0, colonIndex));
+            setPassword(userinfo.substring(colonIndex + 1));
+        }
+
+    }
+
+    private void processOptions() throws URLSyntaxException
+    {
+        if (_options.containsKey(OPTIONS_BROKERLIST))
+        {
+            String brokerlist = _options.get(OPTIONS_BROKERLIST);
+
+            //brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'
+            StringTokenizer st = new StringTokenizer(brokerlist, "" + URLHelper.BROKER_SEPARATOR);
+
+            while (st.hasMoreTokens())
+            {
+                String broker = st.nextToken();
+
+                _brokers.add(new AMQPBrokerDetails(broker));
+            }
+
+            _options.remove(OPTIONS_BROKERLIST);
+        }
+
+        if (_options.containsKey(OPTIONS_FAILOVER))
+        {
+            String failover = _options.get(OPTIONS_FAILOVER);
+
+            // failover='method?option='value',option='value''
+
+            int methodIndex = failover.indexOf('?');
+
+            if (methodIndex > -1)
+            {
+                _failoverMethod = failover.substring(0, methodIndex);
+                URLHelper.parseOptions(_failoverOptions, failover.substring(methodIndex + 1));
+            }
+            else
+            {
+                _failoverMethod = failover;
+            }
+
+            _options.remove(OPTIONS_FAILOVER);
+        }
+    }
+
+    public String getURL()
+    {
+        return _url;
+    }
+
+    public String getFailoverMethod()
+    {
+        return _failoverMethod;
+    }
+
+    public String getFailoverOption(String key)
+    {
+        return _failoverOptions.get(key);
+    }
+
+    public void setFailoverOption(String key, String value)
+    {
+        _failoverOptions.put(key, value);
+    }
+
+    public int getBrokerCount()
+    {
+        return _brokers.size();
+    }
+
+    public BrokerDetails getBrokerDetails(int index)
+    {
+        if (index < _brokers.size())
+        {
+            return _brokers.get(index);
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+    public void addBrokerDetails(BrokerDetails broker)
+    {
+        if (!(_brokers.contains(broker)))
+        {
+            _brokers.add(broker);
+        }
+    }
+
+    public List<BrokerDetails> getAllBrokerDetails()
+    {
+        return _brokers;
+    }
+
+    public String getClientName()
+    {
+        return _clientName;
+    }
+
+    public void setClientName(String clientName)
+    {
+        _clientName = clientName;
+    }
+
+    public String getUsername()
+    {
+        return _username;
+    }
+
+    public void setUsername(String username)
+    {
+        _username = username;
+    }
+
+    public String getPassword()
+    {
+        return _password;
+    }
+
+    public void setPassword(String password)
+    {
+        _password = password;
+    }
+
+    public String getVirtualHost()
+    {
+        return _virtualHost;
+    }
+
+    public void setVirtualHost(String virtuaHost)
+    {
+        _virtualHost = virtuaHost;
+    }
+
+    public String getOption(String key)
+    {
+        return _options.get(key);
+    }
+
+    public void setOption(String key, String value)
+    {
+        _options.put(key, value);
+    }
+
+    public String toString()
+    {
+        StringBuffer sb = new StringBuffer();
+
+        sb.append(AMQ_PROTOCOL);
+        sb.append("://");
+
+        if (_username != null)
+        {
+            sb.append(_username);
+
+            if (_password != null)
+            {
+                sb.append(':');
+                sb.append(_password);
+            }
+
+            sb.append('@');
+        }
+
+        sb.append(_clientName);
+
+        sb.append(_virtualHost);
+
+        sb.append(optionsToString());
+
+        return sb.toString();
+    }
+
+    private String optionsToString()
+    {
+        StringBuffer sb = new StringBuffer();
+
+        sb.append("?" + OPTIONS_BROKERLIST + "='");
+
+        for (BrokerDetails service : _brokers)
+        {
+            sb.append(service.toString());
+            sb.append(';');
+        }
+
+        sb.deleteCharAt(sb.length() - 1);
+        sb.append("'");
+
+        if (_failoverMethod != null)
+        {
+            sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+            sb.append(OPTIONS_FAILOVER + "='");
+            sb.append(_failoverMethod);
+            sb.append(URLHelper.printOptions(_failoverOptions));
+            sb.append("'");
+        }
+
+        return sb.toString();
+    }
+
+
+    public static void main(String[] args) throws URLSyntaxException
+    {
+
+        String url2 = "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'";
+        //"amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''";
+
+        //ConnectionURL connectionurl2 = new AMQConnectionURL(url2);
+
+        System.out.println(url2);
+        //System.out.println(connectionurl2);
+
+    }
+
+}

Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+public interface BrokerDetails
+{
+
+    /*
+     * Known URL Options
+     * @see ConnectionURL
+    */
+    public static final String OPTIONS_RETRY = "retries";
+    public static final String OPTIONS_SSL = ConnectionURL.OPTIONS_SSL;
+    public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
+    public static final int DEFAULT_PORT = 5672;
+
+    public static final String TCP = "tcp";
+    public static final String VM = "vm";
+
+    public static final String DEFAULT_TRANSPORT = TCP;
+
+    public static final String URL_FORMAT_EXAMPLE =
+            "<transport>://<hostname>[:<port Default=\"" + DEFAULT_PORT + "\">][?<option>='<value>'[,<option>='<value>']]";
+
+    public static final long DEFAULT_CONNECT_TIMEOUT = 30000L;
+    public static final boolean USE_SSL_DEFAULT = false;
+
+    String getHost();
+
+    void setHost(String host);
+
+    int getPort();
+
+    void setPort(int port);
+
+    String getTransport();
+
+    void setTransport(String transport);
+
+    boolean useSSL();
+
+    void useSSL(boolean ssl);
+
+    String getOption(String key);
+
+    void setOption(String key, String value);
+
+    long getTimeout();
+
+    void setTimeout(long timeout);
+
+    String toString();
+
+    boolean equals(Object o);
+}

Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+import java.util.List;
+
+/**
+ Connection URL format
+ For TCP:
+ amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\'&option=\'value\'
+ 
+ For VMBroker:
+ vm://:3/virtualpath?option=\'value\''&failover='method?option=\'value\'&option='value''"
+ 
+ Options are of course optional except for requiring a single broker in the broker list.
+ The option seperator is defined to be either '&' or ','
+  */
+public interface ConnectionURL
+{
+    public static final String AMQ_PROTOCOL = "amqp";
+    public static final String OPTIONS_BROKERLIST = "brokerlist";
+    public static final String OPTIONS_FAILOVER = "failover";
+    public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
+    public static final String OPTIONS_SSL = "ssl";
+
+    String getURL();
+
+    String getFailoverMethod();
+
+    String getFailoverOption(String key);
+
+    int getBrokerCount();
+
+    BrokerDetails getBrokerDetails(int index);
+
+    void addBrokerDetails(BrokerDetails broker);
+
+    List<BrokerDetails> getAllBrokerDetails();
+
+    String getClientName();
+
+    void setClientName(String clientName);
+
+    String getUsername();
+
+    void setUsername(String username);
+
+    String getPassword();
+
+    void setPassword(String password);
+
+    String getVirtualHost();
+
+    void setVirtualHost(String virtualHost);
+
+    String getOption(String key);
+
+    void setOption(String key, String value);
+}

Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,82 @@
+package org.apache.qpid.nclient.transport;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.DefaultPhaseContext;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.PhaseContext;
+import org.apache.qpid.nclient.core.PhaseFactory;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+
+public class TCPConnection implements TransportConnection
+{
+    private static final Logger _logger = Logger.getLogger(TCPConnection.class);
+    private BrokerDetails _brokerDetails;
+    private IoConnector _ioConnector;
+    private Phase _phase;  
+    
+    protected TCPConnection(ConnectionURL url)
+    {
+	_brokerDetails = url.getBrokerDetails(0);
+	
+	ByteBuffer.setUseDirectBuffers(ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_DIRECT_BUFFERS));
+
+        // the MINA default is currently to use the pooled allocator although this may change in future
+        // once more testing of the performance of the simple allocator has been done
+        if (ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_POOLED_ALLOCATOR))
+        {
+            // Not sure what the original code wanted use :)
+        }
+        else
+        {
+            _logger.info("Using SimpleByteBufferAllocator");
+            ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
+        }
+
+        final IoConnector ioConnector = new SocketConnector();
+        SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
+
+        // if we do not use our own thread model we get the MINA default which is to use
+        // its own leader-follower model
+        if (ClientConfiguration.get().getBoolean(QpidConstants.USE_SHARED_READ_WRITE_POOL))
+        {
+            cfg.setThreadModel(ReadWriteThreadModel.getInstance());
+        }
+
+        SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
+        scfg.setTcpNoDelay(ClientConfiguration.get().getBoolean(QpidConstants.TCP_NO_DELAY));
+        scfg.setSendBufferSize(ClientConfiguration.get().getInt(QpidConstants.SEND_BUFFER_SIZE_IN_KB)*1024);
+        scfg.setReceiveBufferSize(ClientConfiguration.get().getInt(QpidConstants.RECEIVE_BUFFER_SIZE_IN_KB)*1024);
+    }
+
+    // Returns the phase pipe
+    public Phase connect() throws AMQPException
+    {	
+	PhaseContext ctx = new DefaultPhaseContext();
+	ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails);
+	ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector);
+	
+	_phase = PhaseFactory.createPhasePipe(ctx);
+	_phase.start();
+	
+	return _phase;
+    }
+
+    public void close() throws AMQPException
+    {
+	
+    }
+    
+    public Phase getPhasePipe()
+    {
+	return _phase;
+    }
+}

Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+
+public interface TransportConnection
+{
+    public Phase connect()throws AMQPException;
+    
+    public void close()throws AMQPException;
+    
+    public Phase getPhasePipe();
+}
\ No newline at end of file

Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,44 @@
+package org.apache.qpid.nclient.transport;
+
+import java.net.URISyntaxException;
+
+public class TransportConnectionFactory
+{
+    public enum ConnectionType 
+    {	
+	TCP,VM
+    }
+    
+    public static TransportConnection createTransportConnection(String url,ConnectionType type) throws URISyntaxException
+    {
+	return createTransportConnection(new AMQPConnectionURL(url),type);
+	
+    }
+    
+    public static TransportConnection createTransportConnection(ConnectionURL url,ConnectionType type)
+    {
+	switch (type)
+	{
+	    case TCP : default:
+	    {
+		return createTCPConnection(url);
+	    }
+	    
+	    case VM :
+	    {
+		return createVMConnection(url);
+	    }
+	}
+	
+    }
+
+    private static TransportConnection createTCPConnection(ConnectionURL url)
+    {
+	return new TCPConnection(url);
+    }
+    
+    private static TransportConnection createVMConnection(ConnectionURL url)
+    {
+	return null;
+    }
+}

Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,263 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
+import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
+import org.apache.mina.transport.vmpipe.VmPipeConnector;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.AbstractPhase;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.ssl.BogusSSLContextFactory;
+
+/**
+ * The Transport Phase corresponds to the Layer 1 in AMQP It works at the Frame
+ * layer
+ * 
+ */
+public class TransportPhase extends AbstractPhase implements IoHandler, ProtocolVersionList
+{
+
+    private static final Logger _logger = Logger
+            .getLogger(TransportPhase.class);
+
+    private IoSession _ioSession;
+    private BrokerDetails _brokerDetails;
+    
+    protected WriteFuture _lastWriteFuture;
+
+    /**
+     * ------------------------------------------------ 
+     * Phase - methods introduced by Phase 
+     * ------------------------------------------------
+     */
+    
+    public void start()throws AMQPException
+    {
+	_brokerDetails = (BrokerDetails)_ctx.getProperty(QpidConstants.AMQP_BROKER_DETAILS);
+	IoConnector ioConnector = (IoConnector)_ctx.getProperty(QpidConstants.MINA_IO_CONNECTOR);        
+        
+	final SocketAddress address;
+	if (ioConnector instanceof VmPipeConnector)
+	{
+	    address = new VmPipeAddress(_brokerDetails.getPort());
+	}
+	else
+	{
+            address = new InetSocketAddress(_brokerDetails.getHost(), _brokerDetails.getPort());
+            _logger.info("Attempting connection to " + address);
+            
+	}
+	
+        ConnectFuture future = ioConnector.connect(address,this);
+        
+        // wait for connection to complete
+        if (future.join(_brokerDetails.getTimeout()))
+        {
+            // we call getSession which throws an IOException if there has been an error connecting
+            future.getSession();
+        }
+        else
+        {
+            throw new AMQPException("Timeout waiting for connection.");
+        }	
+    }
+
+    public void messageReceived(Object frame) throws AMQPException
+    {
+        super.messageReceived(frame);
+    }
+
+    public void messageSent(Object frame) throws AMQPException
+    {
+
+        _ioSession.write(frame);
+    }
+
+    /**
+     * ------------------------------------------------ 
+     * IoHandler - methods introduced by IoHandler 
+     * ------------------------------------------------
+     */
+    public void sessionIdle(IoSession session, IdleStatus status)
+            throws Exception
+    {
+        _logger.debug("Protocol Session [" + this + ":" + session + "] idle: "
+                + status);
+        if (IdleStatus.WRITER_IDLE.equals(status))
+        {
+            // write heartbeat frame:
+            _logger.debug("Sent heartbeat");
+            session.write(HeartbeatBody.FRAME);
+            // HeartbeatDiagnostics.sent();
+        } else if (IdleStatus.READER_IDLE.equals(status))
+        {
+            // failover:
+            // HeartbeatDiagnostics.timeout();
+            _logger.warn("Timed out while waiting for heartbeat from peer.");
+            session.close();
+        }
+    }
+
+    public void messageReceived(IoSession session, Object message)
+            throws Exception
+    {
+        AMQFrame frame = (AMQFrame) message;
+        final AMQBody bodyFrame = frame.getBodyFrame();
+
+        if (bodyFrame instanceof HeartbeatBody)
+        {
+            _logger.debug("Received heartbeat");
+        } else
+        {
+            messageReceived(bodyFrame);
+        }
+        // _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
+    }
+
+    public void messageSent(IoSession session, Object message) throws Exception
+    {
+        _logger.debug("Sent frame " + message);
+    }
+
+    public void exceptionCaught(IoSession session, Throwable cause)
+            throws Exception
+    {
+        // Need to handle failover
+        sessionClosed(session);
+    }
+
+    public void sessionClosed(IoSession session) throws Exception
+    {
+        // Need to handle failover
+        _logger.info("Protocol Session [" + this + "] closed");
+    }
+
+    public void sessionCreated(IoSession session) throws Exception
+    {
+        _logger.debug("Protocol session created for session "
+                + System.identityHashCode(session));
+
+        final ProtocolCodecFilter pcf = new ProtocolCodecFilter(
+                new AMQCodecFactory(false));
+
+        if (ClientConfiguration.get().getBoolean(
+                QpidConstants.USE_SHARED_READ_WRITE_POOL))
+        {
+            session.getFilterChain().addBefore("AsynchronousWriteFilter",
+                    "protocolFilter", pcf);
+        } else
+        {
+            session.getFilterChain().addLast("protocolFilter", pcf);
+        }
+        // we only add the SSL filter where we have an SSL connection
+        if (_brokerDetails.useSSL())
+        {
+            // FIXME: Bogus context cannot be used in production.
+            SSLFilter sslFilter = new SSLFilter(BogusSSLContextFactory
+                    .getInstance(false));
+            sslFilter.setUseClientMode(true);
+            session.getFilterChain().addBefore("protocolFilter", "ssl",
+                    sslFilter);
+        }
+
+        try
+        {
+
+            ReadWriteThreadModel threadModel = ReadWriteThreadModel
+                    .getInstance();
+            threadModel.getAsynchronousReadFilter().createNewJobForSession(
+                    session);
+            threadModel.getAsynchronousWriteFilter().createNewJobForSession(
+                    session);
+        } catch (RuntimeException e)
+        {
+            e.printStackTrace();
+        }
+
+        doAMQPConnectionNegotiation();
+    }
+
+    public void sessionOpened(IoSession session) throws Exception
+    {
+        _logger.debug("Protocol session opened for session "
+                + System.identityHashCode(session));
+    }
+
+    /**
+     * ---------------------------------------------------------- 
+     * Protocol related methods
+     * ----------------------------------------------------------
+     */
+    private void doAMQPConnectionNegotiation()
+    {
+        int i = pv.length - 1;
+        writeFrame(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
+    }
+
+    /**
+     * ---------------------------------------------------------- 
+     * Write Operations 
+     * ----------------------------------------------------------
+     */
+    public void writeFrame(AMQDataBlock frame)
+    {
+        writeFrame(frame, false);
+    }
+
+    public void writeFrame(AMQDataBlock frame, boolean wait)
+    {
+        WriteFuture f = _ioSession.write(frame);
+        if (wait)
+        {
+            // fixme -- time out?
+            f.join();
+        } else
+        {
+            _lastWriteFuture = f;
+        }
+    }
+
+    /**
+     * ----------------------------------------------------------- Failover
+     * section -----------------------------------------------------------
+     */
+}

Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,133 @@
+package org.apache.qpid.nclient.transport;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
+import org.apache.mina.transport.vmpipe.VmPipeConnector;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.DefaultPhaseContext;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.PhaseContext;
+import org.apache.qpid.nclient.core.PhaseFactory;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.pool.PoolingFilter;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
+
+public class VMConnection implements TransportConnection
+{
+    private static final Logger _logger = Logger.getLogger(VMConnection.class);
+    private BrokerDetails _brokerDetails;
+    private IoConnector _ioConnector;
+    private Phase _phase;
+    
+    protected VMConnection(ConnectionURL url)
+    {
+	_brokerDetails = url.getBrokerDetails(0);
+	_ioConnector = new VmPipeConnector();
+        final IoServiceConfig cfg = _ioConnector.getDefaultConfig();
+        ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance();
+        PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executorService,
+                                                    "AsynchronousReadFilter");
+        cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead);
+        PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executorService, 
+                                                     "AsynchronousWriteFilter");
+        cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite);
+    }
+    
+    public Phase connect() throws AMQPException
+    {		
+	createVMBroker();	      
+        
+        PhaseContext ctx = new DefaultPhaseContext();
+	ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails);
+	ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector);
+	
+	_phase = PhaseFactory.createPhasePipe(ctx);
+	_phase.start();
+	
+	return _phase;
+
+    }
+    
+    private void createVMBroker()throws AMQPException
+    {
+	_logger.info("Creating InVM Qpid.AMQP listening on port " + _brokerDetails.getPort());
+	
+	VmPipeAcceptor acceptor = new VmPipeAcceptor();
+        IoServiceConfig config = acceptor.getDefaultConfig();
+        config.setThreadModel(ReadWriteThreadModel.getInstance());
+        
+        IoHandlerAdapter provider = null;
+        try
+        {
+            VmPipeAddress pipe = new VmPipeAddress(_brokerDetails.getPort());
+            provider = createBrokerInstance(_brokerDetails.getPort());
+            acceptor.bind(pipe, provider);
+            _logger.info("Created InVM Qpid.AMQP listening on port " + _brokerDetails.getPort());
+        }
+        catch (IOException e)
+        {
+            _logger.error(e);
+            VmPipeAddress pipe = new VmPipeAddress(_brokerDetails.getPort());
+            acceptor.unbind(pipe);
+                        
+            throw new AMQPException("Error creating VM broker",e);
+        }
+    }
+    
+    private IoHandlerAdapter createBrokerInstance(int port) throws AMQPException
+    {
+        String protocolProviderClass = ClientConfiguration.get().getString(QpidConstants.QPID_VM_BROKER_CLASS);        
+        _logger.info("Creating Qpid protocol provider: " + protocolProviderClass);
+
+        // can't use introspection to get Provider as it is a server class.
+        // need to go straight to IoHandlerAdapter but that requries the queues and exchange from the ApplicationRegistry which we can't access.
+
+        //get correct constructor and pass in instancec ID - "port"
+        IoHandlerAdapter provider;
+        try
+        {
+            Class[] cnstr = {Integer.class};
+            Object[] params = {port};
+            provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
+            //Give the broker a second to create
+            _logger.info("Created VMBroker Instance:" + port);
+        }
+        catch (Exception e)
+        {
+            _logger.info("Unable to create InVM Qpid broker on port " + port + ". due to : " + e.getCause());
+            _logger.error(e);
+            String because;
+            if (e.getCause() == null)
+            {
+                because = e.toString();
+            }
+            else
+            {
+                because = e.getCause().toString();
+            }
+
+
+            throw new AMQPException(port, because + " Stopped InVM Qpid.AMQP creation",e);
+        }
+
+        return provider;
+    }
+
+    public void close() throws AMQPException
+    {
+	
+    }
+    
+    public Phase getPhasePipe()
+    {
+	return _phase;
+    }
+}

Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,14 @@
+package org.apache.qpid.nclient.util;
+
+import org.apache.qpid.nclient.core.AMQPException;
+
+public class AMQPValidator 
+{
+	public static void throwExceptionOnNull(Object obj, String msg) throws AMQPException
+	{
+		if(obj == null)
+		{
+			throw new AMQPException(msg);
+		}
+	}
+}