You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/03/27 18:31:15 UTC
svn commit: r522988 [3/3] - in
/incubator/qpid/branches/qpid.0-9/java/newclient: ./ src/ src/main/
src/main/java/ src/main/java/org/ src/main/java/org/apache/
src/main/java/org/apache/qpid/ src/main/java/org/apache/qpid/nclient/
src/main/java/org/apach...
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,412 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.url.URLSyntaxException;
+
+import java.util.*;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+public class AMQPConnectionURL implements ConnectionURL
+{
+ private String _url;
+ private String _failoverMethod;
+ private HashMap<String, String> _failoverOptions;
+ private HashMap<String, String> _options;
+ private List<BrokerDetails> _brokers;
+ private String _clientName;
+ private String _username;
+ private String _password;
+ private String _virtualHost;
+
+ public AMQPConnectionURL(String fullURL) throws URLSyntaxException
+ {
+ _url = fullURL;
+ _options = new HashMap<String, String>();
+ _brokers = new LinkedList<BrokerDetails>();
+ _failoverOptions = new HashMap<String, String>();
+
+ try
+ {
+ URI connection = new URI(fullURL);
+
+ if (connection.getScheme() == null || !(connection.getScheme().equalsIgnoreCase(AMQ_PROTOCOL)))
+ {
+ throw new URISyntaxException(fullURL, "Not an AMQP URL");
+ }
+
+ if (connection.getHost() == null || connection.getHost().equals(""))
+ {
+ String uid = getUniqueClientID();
+ if (uid == null)
+ {
+ URLHelper.parseError(-1, "Client Name not specified", fullURL);
+ }
+ else
+ {
+ setClientName(uid);
+ }
+
+ }
+ else
+ {
+ setClientName(connection.getHost());
+ }
+
+ String userInfo = connection.getUserInfo();
+
+ if (userInfo == null)
+ {
+ //Fix for Java 1.5 which doesn't parse UserInfo for non http URIs
+ userInfo = connection.getAuthority();
+
+ if (userInfo != null)
+ {
+ int atIndex = userInfo.indexOf('@');
+
+ if (atIndex != -1)
+ {
+ userInfo = userInfo.substring(0, atIndex);
+ }
+ else
+ {
+ userInfo = null;
+ }
+ }
+
+ }
+
+ if (userInfo == null)
+ {
+ URLHelper.parseError(AMQ_PROTOCOL.length() + 3,
+ "User information not found on url", fullURL);
+ }
+ else
+ {
+ parseUserInfo(userInfo);
+ }
+ String virtualHost = connection.getPath();
+
+ if (virtualHost != null && (!virtualHost.equals("")))
+ {
+ setVirtualHost(virtualHost);
+ }
+ else
+ {
+ int authLength = connection.getAuthority().length();
+ int start = AMQ_PROTOCOL.length() + 3;
+ int testIndex = start + authLength;
+ if (testIndex < fullURL.length() && fullURL.charAt(testIndex) == '?')
+ {
+ URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL);
+ }
+ else
+ {
+ URLHelper.parseError(-1, "Virtual host not specified", fullURL);
+ }
+
+ }
+
+
+ URLHelper.parseOptions(_options, connection.getQuery());
+
+ processOptions();
+
+ //Fragment is #string (not used)
+ //System.out.println(connection.getFragment());
+
+ }
+ catch (URISyntaxException uris)
+ {
+ if (uris instanceof URLSyntaxException)
+ {
+ throw (URLSyntaxException) uris;
+ }
+
+ int slash = fullURL.indexOf("\\");
+
+ if (slash == -1)
+ {
+ URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+ }
+ else
+ {
+ if (slash != 0 && fullURL.charAt(slash - 1) == ':')
+ {
+ URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL);
+ }
+ else
+ {
+ URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL);
+ }
+ }
+
+ }
+ }
+
+ private String getUniqueClientID()
+ {
+ try
+ {
+ InetAddress addr = InetAddress.getLocalHost();
+ return addr.getHostName() + System.currentTimeMillis();
+ }
+ catch (UnknownHostException e)
+ {
+ return null;
+ }
+ }
+
+ private void parseUserInfo(String userinfo) throws URLSyntaxException
+ {
+ //user info = user:pass
+
+ int colonIndex = userinfo.indexOf(':');
+
+ if (colonIndex == -1)
+ {
+ URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(),
+ "Null password in user information not allowed.", _url);
+ }
+ else
+ {
+ setUsername(userinfo.substring(0, colonIndex));
+ setPassword(userinfo.substring(colonIndex + 1));
+ }
+
+ }
+
+ private void processOptions() throws URLSyntaxException
+ {
+ if (_options.containsKey(OPTIONS_BROKERLIST))
+ {
+ String brokerlist = _options.get(OPTIONS_BROKERLIST);
+
+ //brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'
+ StringTokenizer st = new StringTokenizer(brokerlist, "" + URLHelper.BROKER_SEPARATOR);
+
+ while (st.hasMoreTokens())
+ {
+ String broker = st.nextToken();
+
+ _brokers.add(new AMQPBrokerDetails(broker));
+ }
+
+ _options.remove(OPTIONS_BROKERLIST);
+ }
+
+ if (_options.containsKey(OPTIONS_FAILOVER))
+ {
+ String failover = _options.get(OPTIONS_FAILOVER);
+
+ // failover='method?option='value',option='value''
+
+ int methodIndex = failover.indexOf('?');
+
+ if (methodIndex > -1)
+ {
+ _failoverMethod = failover.substring(0, methodIndex);
+ URLHelper.parseOptions(_failoverOptions, failover.substring(methodIndex + 1));
+ }
+ else
+ {
+ _failoverMethod = failover;
+ }
+
+ _options.remove(OPTIONS_FAILOVER);
+ }
+ }
+
+ public String getURL()
+ {
+ return _url;
+ }
+
+ public String getFailoverMethod()
+ {
+ return _failoverMethod;
+ }
+
+ public String getFailoverOption(String key)
+ {
+ return _failoverOptions.get(key);
+ }
+
+ public void setFailoverOption(String key, String value)
+ {
+ _failoverOptions.put(key, value);
+ }
+
+ public int getBrokerCount()
+ {
+ return _brokers.size();
+ }
+
+ public BrokerDetails getBrokerDetails(int index)
+ {
+ if (index < _brokers.size())
+ {
+ return _brokers.get(index);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void addBrokerDetails(BrokerDetails broker)
+ {
+ if (!(_brokers.contains(broker)))
+ {
+ _brokers.add(broker);
+ }
+ }
+
+ public List<BrokerDetails> getAllBrokerDetails()
+ {
+ return _brokers;
+ }
+
+ public String getClientName()
+ {
+ return _clientName;
+ }
+
+ public void setClientName(String clientName)
+ {
+ _clientName = clientName;
+ }
+
+ public String getUsername()
+ {
+ return _username;
+ }
+
+ public void setUsername(String username)
+ {
+ _username = username;
+ }
+
+ public String getPassword()
+ {
+ return _password;
+ }
+
+ public void setPassword(String password)
+ {
+ _password = password;
+ }
+
+ public String getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public void setVirtualHost(String virtuaHost)
+ {
+ _virtualHost = virtuaHost;
+ }
+
+ public String getOption(String key)
+ {
+ return _options.get(key);
+ }
+
+ public void setOption(String key, String value)
+ {
+ _options.put(key, value);
+ }
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append(AMQ_PROTOCOL);
+ sb.append("://");
+
+ if (_username != null)
+ {
+ sb.append(_username);
+
+ if (_password != null)
+ {
+ sb.append(':');
+ sb.append(_password);
+ }
+
+ sb.append('@');
+ }
+
+ sb.append(_clientName);
+
+ sb.append(_virtualHost);
+
+ sb.append(optionsToString());
+
+ return sb.toString();
+ }
+
+ private String optionsToString()
+ {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append("?" + OPTIONS_BROKERLIST + "='");
+
+ for (BrokerDetails service : _brokers)
+ {
+ sb.append(service.toString());
+ sb.append(';');
+ }
+
+ sb.deleteCharAt(sb.length() - 1);
+ sb.append("'");
+
+ if (_failoverMethod != null)
+ {
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ sb.append(OPTIONS_FAILOVER + "='");
+ sb.append(_failoverMethod);
+ sb.append(URLHelper.printOptions(_failoverOptions));
+ sb.append("'");
+ }
+
+ return sb.toString();
+ }
+
+
+ public static void main(String[] args) throws URLSyntaxException
+ {
+
+ String url2 = "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'";
+ //"amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''";
+
+ //ConnectionURL connectionurl2 = new AMQConnectionURL(url2);
+
+ System.out.println(url2);
+ //System.out.println(connectionurl2);
+
+ }
+
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+public interface BrokerDetails
+{
+
+ /*
+ * Known URL Options
+ * @see ConnectionURL
+ */
+ public static final String OPTIONS_RETRY = "retries";
+ public static final String OPTIONS_SSL = ConnectionURL.OPTIONS_SSL;
+ public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
+ public static final int DEFAULT_PORT = 5672;
+
+ public static final String TCP = "tcp";
+ public static final String VM = "vm";
+
+ public static final String DEFAULT_TRANSPORT = TCP;
+
+ public static final String URL_FORMAT_EXAMPLE =
+ "<transport>://<hostname>[:<port Default=\"" + DEFAULT_PORT + "\">][?<option>='<value>'[,<option>='<value>']]";
+
+ public static final long DEFAULT_CONNECT_TIMEOUT = 30000L;
+ public static final boolean USE_SSL_DEFAULT = false;
+
+ String getHost();
+
+ void setHost(String host);
+
+ int getPort();
+
+ void setPort(int port);
+
+ String getTransport();
+
+ void setTransport(String transport);
+
+ boolean useSSL();
+
+ void useSSL(boolean ssl);
+
+ String getOption(String key);
+
+ void setOption(String key, String value);
+
+ long getTimeout();
+
+ void setTimeout(long timeout);
+
+ String toString();
+
+ boolean equals(Object o);
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+import java.util.List;
+
+/**
+ Connection URL format
+ For TCP:
+ amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\'&option=\'value\'
+
+ For VMBroker:
+ vm://:3/virtualpath?option=\'value\''&failover='method?option=\'value\'&option='value''"
+
+ Options are of course optional except for requiring a single broker in the broker list.
+ The option seperator is defined to be either '&' or ','
+ */
+public interface ConnectionURL
+{
+ public static final String AMQ_PROTOCOL = "amqp";
+ public static final String OPTIONS_BROKERLIST = "brokerlist";
+ public static final String OPTIONS_FAILOVER = "failover";
+ public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
+ public static final String OPTIONS_SSL = "ssl";
+
+ String getURL();
+
+ String getFailoverMethod();
+
+ String getFailoverOption(String key);
+
+ int getBrokerCount();
+
+ BrokerDetails getBrokerDetails(int index);
+
+ void addBrokerDetails(BrokerDetails broker);
+
+ List<BrokerDetails> getAllBrokerDetails();
+
+ String getClientName();
+
+ void setClientName(String clientName);
+
+ String getUsername();
+
+ void setUsername(String username);
+
+ String getPassword();
+
+ void setPassword(String password);
+
+ String getVirtualHost();
+
+ void setVirtualHost(String virtualHost);
+
+ String getOption(String key);
+
+ void setOption(String key, String value);
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,82 @@
+package org.apache.qpid.nclient.transport;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.DefaultPhaseContext;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.PhaseContext;
+import org.apache.qpid.nclient.core.PhaseFactory;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+
+public class TCPConnection implements TransportConnection
+{
+ private static final Logger _logger = Logger.getLogger(TCPConnection.class);
+ private BrokerDetails _brokerDetails;
+ private IoConnector _ioConnector;
+ private Phase _phase;
+
+ protected TCPConnection(ConnectionURL url)
+ {
+ _brokerDetails = url.getBrokerDetails(0);
+
+ ByteBuffer.setUseDirectBuffers(ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_DIRECT_BUFFERS));
+
+ // the MINA default is currently to use the pooled allocator although this may change in future
+ // once more testing of the performance of the simple allocator has been done
+ if (ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_POOLED_ALLOCATOR))
+ {
+ // Not sure what the original code wanted use :)
+ }
+ else
+ {
+ _logger.info("Using SimpleByteBufferAllocator");
+ ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
+ }
+
+ final IoConnector ioConnector = new SocketConnector();
+ SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
+
+ // if we do not use our own thread model we get the MINA default which is to use
+ // its own leader-follower model
+ if (ClientConfiguration.get().getBoolean(QpidConstants.USE_SHARED_READ_WRITE_POOL))
+ {
+ cfg.setThreadModel(ReadWriteThreadModel.getInstance());
+ }
+
+ SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
+ scfg.setTcpNoDelay(ClientConfiguration.get().getBoolean(QpidConstants.TCP_NO_DELAY));
+ scfg.setSendBufferSize(ClientConfiguration.get().getInt(QpidConstants.SEND_BUFFER_SIZE_IN_KB)*1024);
+ scfg.setReceiveBufferSize(ClientConfiguration.get().getInt(QpidConstants.RECEIVE_BUFFER_SIZE_IN_KB)*1024);
+ }
+
+ // Returns the phase pipe
+ public Phase connect() throws AMQPException
+ {
+ PhaseContext ctx = new DefaultPhaseContext();
+ ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails);
+ ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector);
+
+ _phase = PhaseFactory.createPhasePipe(ctx);
+ _phase.start();
+
+ return _phase;
+ }
+
+ public void close() throws AMQPException
+ {
+
+ }
+
+ public Phase getPhasePipe()
+ {
+ return _phase;
+ }
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+
+public interface TransportConnection
+{
+ public Phase connect()throws AMQPException;
+
+ public void close()throws AMQPException;
+
+ public Phase getPhasePipe();
+}
\ No newline at end of file
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,44 @@
+package org.apache.qpid.nclient.transport;
+
+import java.net.URISyntaxException;
+
+public class TransportConnectionFactory
+{
+ public enum ConnectionType
+ {
+ TCP,VM
+ }
+
+ public static TransportConnection createTransportConnection(String url,ConnectionType type) throws URISyntaxException
+ {
+ return createTransportConnection(new AMQPConnectionURL(url),type);
+
+ }
+
+ public static TransportConnection createTransportConnection(ConnectionURL url,ConnectionType type)
+ {
+ switch (type)
+ {
+ case TCP : default:
+ {
+ return createTCPConnection(url);
+ }
+
+ case VM :
+ {
+ return createVMConnection(url);
+ }
+ }
+
+ }
+
+ private static TransportConnection createTCPConnection(ConnectionURL url)
+ {
+ return new TCPConnection(url);
+ }
+
+ private static TransportConnection createVMConnection(ConnectionURL url)
+ {
+ return null;
+ }
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,263 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
+import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
+import org.apache.mina.transport.vmpipe.VmPipeConnector;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.AbstractPhase;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.ssl.BogusSSLContextFactory;
+
+/**
+ * The Transport Phase corresponds to the Layer 1 in AMQP It works at the Frame
+ * layer
+ *
+ */
+public class TransportPhase extends AbstractPhase implements IoHandler, ProtocolVersionList
+{
+
+ private static final Logger _logger = Logger
+ .getLogger(TransportPhase.class);
+
+ private IoSession _ioSession;
+ private BrokerDetails _brokerDetails;
+
+ protected WriteFuture _lastWriteFuture;
+
+ /**
+ * ------------------------------------------------
+ * Phase - methods introduced by Phase
+ * ------------------------------------------------
+ */
+
+ public void start()throws AMQPException
+ {
+ _brokerDetails = (BrokerDetails)_ctx.getProperty(QpidConstants.AMQP_BROKER_DETAILS);
+ IoConnector ioConnector = (IoConnector)_ctx.getProperty(QpidConstants.MINA_IO_CONNECTOR);
+
+ final SocketAddress address;
+ if (ioConnector instanceof VmPipeConnector)
+ {
+ address = new VmPipeAddress(_brokerDetails.getPort());
+ }
+ else
+ {
+ address = new InetSocketAddress(_brokerDetails.getHost(), _brokerDetails.getPort());
+ _logger.info("Attempting connection to " + address);
+
+ }
+
+ ConnectFuture future = ioConnector.connect(address,this);
+
+ // wait for connection to complete
+ if (future.join(_brokerDetails.getTimeout()))
+ {
+ // we call getSession which throws an IOException if there has been an error connecting
+ future.getSession();
+ }
+ else
+ {
+ throw new AMQPException("Timeout waiting for connection.");
+ }
+ }
+
+ public void messageReceived(Object frame) throws AMQPException
+ {
+ super.messageReceived(frame);
+ }
+
+ public void messageSent(Object frame) throws AMQPException
+ {
+
+ _ioSession.write(frame);
+ }
+
+ /**
+ * ------------------------------------------------
+ * IoHandler - methods introduced by IoHandler
+ * ------------------------------------------------
+ */
+ public void sessionIdle(IoSession session, IdleStatus status)
+ throws Exception
+ {
+ _logger.debug("Protocol Session [" + this + ":" + session + "] idle: "
+ + status);
+ if (IdleStatus.WRITER_IDLE.equals(status))
+ {
+ // write heartbeat frame:
+ _logger.debug("Sent heartbeat");
+ session.write(HeartbeatBody.FRAME);
+ // HeartbeatDiagnostics.sent();
+ } else if (IdleStatus.READER_IDLE.equals(status))
+ {
+ // failover:
+ // HeartbeatDiagnostics.timeout();
+ _logger.warn("Timed out while waiting for heartbeat from peer.");
+ session.close();
+ }
+ }
+
+ public void messageReceived(IoSession session, Object message)
+ throws Exception
+ {
+ AMQFrame frame = (AMQFrame) message;
+ final AMQBody bodyFrame = frame.getBodyFrame();
+
+ if (bodyFrame instanceof HeartbeatBody)
+ {
+ _logger.debug("Received heartbeat");
+ } else
+ {
+ messageReceived(bodyFrame);
+ }
+ // _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
+ }
+
+ public void messageSent(IoSession session, Object message) throws Exception
+ {
+ _logger.debug("Sent frame " + message);
+ }
+
+ public void exceptionCaught(IoSession session, Throwable cause)
+ throws Exception
+ {
+ // Need to handle failover
+ sessionClosed(session);
+ }
+
+ public void sessionClosed(IoSession session) throws Exception
+ {
+ // Need to handle failover
+ _logger.info("Protocol Session [" + this + "] closed");
+ }
+
+ public void sessionCreated(IoSession session) throws Exception
+ {
+ _logger.debug("Protocol session created for session "
+ + System.identityHashCode(session));
+
+ final ProtocolCodecFilter pcf = new ProtocolCodecFilter(
+ new AMQCodecFactory(false));
+
+ if (ClientConfiguration.get().getBoolean(
+ QpidConstants.USE_SHARED_READ_WRITE_POOL))
+ {
+ session.getFilterChain().addBefore("AsynchronousWriteFilter",
+ "protocolFilter", pcf);
+ } else
+ {
+ session.getFilterChain().addLast("protocolFilter", pcf);
+ }
+ // we only add the SSL filter where we have an SSL connection
+ if (_brokerDetails.useSSL())
+ {
+ // FIXME: Bogus context cannot be used in production.
+ SSLFilter sslFilter = new SSLFilter(BogusSSLContextFactory
+ .getInstance(false));
+ sslFilter.setUseClientMode(true);
+ session.getFilterChain().addBefore("protocolFilter", "ssl",
+ sslFilter);
+ }
+
+ try
+ {
+
+ ReadWriteThreadModel threadModel = ReadWriteThreadModel
+ .getInstance();
+ threadModel.getAsynchronousReadFilter().createNewJobForSession(
+ session);
+ threadModel.getAsynchronousWriteFilter().createNewJobForSession(
+ session);
+ } catch (RuntimeException e)
+ {
+ e.printStackTrace();
+ }
+
+ doAMQPConnectionNegotiation();
+ }
+
+ public void sessionOpened(IoSession session) throws Exception
+ {
+ _logger.debug("Protocol session opened for session "
+ + System.identityHashCode(session));
+ }
+
+ /**
+ * ----------------------------------------------------------
+ * Protocol related methods
+ * ----------------------------------------------------------
+ */
+ private void doAMQPConnectionNegotiation()
+ {
+ int i = pv.length - 1;
+ writeFrame(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
+ }
+
+ /**
+ * ----------------------------------------------------------
+ * Write Operations
+ * ----------------------------------------------------------
+ */
+ public void writeFrame(AMQDataBlock frame)
+ {
+ writeFrame(frame, false);
+ }
+
+ public void writeFrame(AMQDataBlock frame, boolean wait)
+ {
+ WriteFuture f = _ioSession.write(frame);
+ if (wait)
+ {
+ // fixme -- time out?
+ f.join();
+ } else
+ {
+ _lastWriteFuture = f;
+ }
+ }
+
+ /**
+ * ----------------------------------------------------------- Failover
+ * section -----------------------------------------------------------
+ */
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,133 @@
+package org.apache.qpid.nclient.transport;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
+import org.apache.mina.transport.vmpipe.VmPipeConnector;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.DefaultPhaseContext;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.PhaseContext;
+import org.apache.qpid.nclient.core.PhaseFactory;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.pool.PoolingFilter;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
+
+public class VMConnection implements TransportConnection
+{
+ private static final Logger _logger = Logger.getLogger(VMConnection.class);
+ private BrokerDetails _brokerDetails;
+ private IoConnector _ioConnector;
+ private Phase _phase;
+
+ protected VMConnection(ConnectionURL url)
+ {
+ _brokerDetails = url.getBrokerDetails(0);
+ _ioConnector = new VmPipeConnector();
+ final IoServiceConfig cfg = _ioConnector.getDefaultConfig();
+ ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance();
+ PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executorService,
+ "AsynchronousReadFilter");
+ cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead);
+ PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executorService,
+ "AsynchronousWriteFilter");
+ cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite);
+ }
+
+ public Phase connect() throws AMQPException
+ {
+ createVMBroker();
+
+ PhaseContext ctx = new DefaultPhaseContext();
+ ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails);
+ ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector);
+
+ _phase = PhaseFactory.createPhasePipe(ctx);
+ _phase.start();
+
+ return _phase;
+
+ }
+
+ private void createVMBroker()throws AMQPException
+ {
+ _logger.info("Creating InVM Qpid.AMQP listening on port " + _brokerDetails.getPort());
+
+ VmPipeAcceptor acceptor = new VmPipeAcceptor();
+ IoServiceConfig config = acceptor.getDefaultConfig();
+ config.setThreadModel(ReadWriteThreadModel.getInstance());
+
+ IoHandlerAdapter provider = null;
+ try
+ {
+ VmPipeAddress pipe = new VmPipeAddress(_brokerDetails.getPort());
+ provider = createBrokerInstance(_brokerDetails.getPort());
+ acceptor.bind(pipe, provider);
+ _logger.info("Created InVM Qpid.AMQP listening on port " + _brokerDetails.getPort());
+ }
+ catch (IOException e)
+ {
+ _logger.error(e);
+ VmPipeAddress pipe = new VmPipeAddress(_brokerDetails.getPort());
+ acceptor.unbind(pipe);
+
+ throw new AMQPException("Error creating VM broker",e);
+ }
+ }
+
+ private IoHandlerAdapter createBrokerInstance(int port) throws AMQPException
+ {
+ String protocolProviderClass = ClientConfiguration.get().getString(QpidConstants.QPID_VM_BROKER_CLASS);
+ _logger.info("Creating Qpid protocol provider: " + protocolProviderClass);
+
+ // can't use introspection to get Provider as it is a server class.
+ // need to go straight to IoHandlerAdapter but that requries the queues and exchange from the ApplicationRegistry which we can't access.
+
+ //get correct constructor and pass in instancec ID - "port"
+ IoHandlerAdapter provider;
+ try
+ {
+ Class[] cnstr = {Integer.class};
+ Object[] params = {port};
+ provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
+ //Give the broker a second to create
+ _logger.info("Created VMBroker Instance:" + port);
+ }
+ catch (Exception e)
+ {
+ _logger.info("Unable to create InVM Qpid broker on port " + port + ". due to : " + e.getCause());
+ _logger.error(e);
+ String because;
+ if (e.getCause() == null)
+ {
+ because = e.toString();
+ }
+ else
+ {
+ because = e.getCause().toString();
+ }
+
+
+ throw new AMQPException(port, because + " Stopped InVM Qpid.AMQP creation",e);
+ }
+
+ return provider;
+ }
+
+ public void close() throws AMQPException
+ {
+
+ }
+
+ public Phase getPhasePipe()
+ {
+ return _phase;
+ }
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,14 @@
+package org.apache.qpid.nclient.util;
+
+import org.apache.qpid.nclient.core.AMQPException;
+
+public class AMQPValidator
+{
+ public static void throwExceptionOnNull(Object obj, String msg) throws AMQPException
+ {
+ if(obj == null)
+ {
+ throw new AMQPException(msg);
+ }
+ }
+}