You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/08/15 13:26:57 UTC
svn commit: r1157780 [8/13] - in
/qpid/branches/rg-amqp-1-0-sandbox/qpid/java: ./
broker-plugins/access-control/
broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/
broker-plugins/access-control/src/main/java/org/...
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.deps
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.deps?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.deps (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.deps Mon Aug 15 11:26:46 2011
@@ -17,8 +17,6 @@
# under the License.
#
-backport-util-concurrent=lib/backport-util-concurrent-2.2.jar
-
commons-beanutils-core=lib/commons-beanutils-core-1.8.0.jar
commons-cli=lib/commons-cli-1.0.jar
commons-codec=lib/commons-codec-1.3.jar
@@ -27,89 +25,44 @@ commons-configuration=lib/commons-config
commons-digester=lib/commons-digester-1.8.1.jar
commons-lang=lib/commons-lang-2.2.jar
commons-logging=lib/commons-logging-1.0.4.jar
-commons-pool=lib/commons-pool-1.4.jar
derby-db=lib/derby-10.6.1.0.jar
geronimo-jms=lib/geronimo-jms_1.1_spec-1.0.jar
junit=lib/junit-3.8.1.jar
-junit4=lib/junit-4.4.jar
-
-jline=lib/jline-0.9.94.jar
log4j=lib/log4j-1.2.12.jar
-mina-core=lib/mina-core-1.0.1.jar
-mina-filter-ssl=lib/mina-filter-ssl-1.0.1.jar
+mina-core=lib/mina-core-1.1.7.jar
+mina-filter-ssl=lib/mina-filter-ssl-1.1.7.jar
slf4j-api=lib/slf4j-api-1.6.1.jar
slf4j-log4j=lib/slf4j-log4j12-1.6.1.jar
xalan=lib/xalan-2.7.0.jar
-muse-core=lib/muse-core-2.2.0.jar
-muse-platform-mini=lib/muse-platform-mini-2.2.0.jar
-muse-util=lib/muse-util-2.2.0.jar
-muse-util-qname=lib/muse-util-qname-2.2.0.jar
-muse-util-xml=lib/muse-util-xml-2.2.0.jar
-muse-wsa-soap=lib/muse-wsa-soap-2.2.0.jar
-muse-wsdm-muws-adv-api=lib/muse-wsdm-muws-adv-api-2.2.0.jar
-muse-wsdm-muws-adv-impl=lib/muse-wsdm-muws-adv-impl-2.2.0.jar
-muse-wsdm-muws-api=lib/muse-wsdm-muws-api-2.2.0.jar
-muse-wsdm-muws-impl=lib/muse-wsdm-muws-impl-2.2.0.jar
-muse-wsdm-wef-api=lib/muse-wsdm-wef-api-2.2.0.jar
-muse-wsdm-wef-impl=lib/muse-wsdm-wef-impl-2.2.0.jar
-muse-wsn-api=lib/muse-wsn-api-2.2.0.jar
-muse-wsn-impl=lib/muse-wsn-impl-2.2.0.jar
-muse-wsrf-api=lib/muse-wsrf-api-2.2.0.jar
-muse-wsrf-impl=lib/muse-wsrf-impl-2.2.0.jar
-muse-wsrf-rmd=lib/muse-wsrf-rmd-2.2.0.jar
-muse-wsx-api=lib/muse-wsx-api-2.2.0.jar
-muse-wsx-impl=lib/muse-wsx-impl-2.2.0.jar
-wsdl4j=lib/wsdl4j-1.6.1.jar
-xercesImpl=lib/xercesImpl-2.8.1.jar
-xml-apis=lib/xml-apis-1.3.03.jar
-javassist=lib/javassist.jar
jetty=lib/jetty-6.1.14.jar
jetty-util=lib/jetty-util-6.1.14.jar
jetty-servlet-tester=lib/jetty-servlet-tester-6.1.14.jar
-jetty-bootstrap=lib/start.jar
-jsp-api=lib/jsp-api-2.1.jar
-jsp-impl=lib/jsp-2.1.jar
-core-lib=lib/core-3.1.1.jar
servlet-api=lib/servlet-api.jar
-muse.libs = ${muse-core} ${muse-platform-mini} ${muse-util} ${muse-util-qname} \
-${muse-util-xml} ${muse-wsa-soap} ${muse-wsdm-muws-adv-api} ${muse-wsdm-muws-adv-impl} \
-${muse-wsdm-muws-api} ${muse-wsdm-muws-impl} ${muse-wsdm-wef-api} ${muse-wsdm-wef-impl} \
-${muse-wsn-api} ${muse-wsn-impl} ${muse-wsrf-api} ${muse-wsrf-impl} ${muse-wsrf-rmd} \
-${muse-wsx-api} ${muse-wsx-impl} ${wsdl4j} ${xercesImpl} ${xml-apis} ${jetty} ${jetty-util} ${jetty-bootstrap}
-
-jsp.libs = ${jsp-api} ${jsp-impl} ${core-lib}
-
osgi-core=lib/org.osgi.core-1.0.0.jar
felix-framework=lib/org.apache.felix.framework-2.0.5.jar
-geronimo-servlet=lib/geronimo-servlet_2.5_spec-1.2.jar
felix.libs=${osgi-core} ${felix-framework}
commons-configuration.libs = ${commons-beanutils-core} ${commons-digester} \
${commons-codec} ${commons-lang} ${commons-collections} ${commons-configuration}
-common.libs=${slf4j-api} ${backport-util-concurrent} ${mina-core} ${mina-filter-ssl}
+common.libs=${slf4j-api} ${mina-core} ${mina-filter-ssl}
client.libs=${geronimo-jms}
tools.libs=${commons-configuration.libs} ${log4j}
broker.libs=${commons-cli} ${commons-logging} ${log4j} ${slf4j-log4j} \
${xalan} ${felix.libs} ${derby-db} ${commons-configuration.libs}
amqp-1-0-client.libs=${geronimo-jms} ${commons-cli}
-broker-plugins.libs=${felix.libs} ${log4j} ${commons-configuration.libs}
-management-client.libs=${jsp.libs} ${log4j} ${slf4j-log4j} ${slf4j-api} \
- ${commons-pool} ${geronimo-servlet} ${muse.libs} ${javassist} ${xalan}
-
-management-agent.libs=${commons-logging}
-management-console.libs=${commons-logging}
+broker-plugins.libs=${felix.libs} ${log4j} ${commons-configuration.libs}
junit-toolkit.libs=${log4j} ${junit} ${slf4j-api}
test.libs=${slf4j-log4j} ${junit-toolkit.libs}
@@ -151,34 +104,28 @@ ecl-equinox-launcher-solaris-gtk-sparc=l
management-common.libs=
+management-eclipse-plugin.core-libs=${ibm-icu} ${ecl-core-jface} ${ecl-core-jface-databinding} \
+ ${ecl-core-commands} ${ecl-core-contenttype} ${ecl-core-databinding} ${ecl-core-expressions} \
+ ${ecl-core-jobs} ${ecl-core-runtime} ${ecl-equinox-app} ${ecl-equinox-common} ${ecl-equinox-launcher} \
+ ${ecl-equinox-prefs} ${ecl-equinox-registry} ${ecl-help} ${ecl-osgi} ${ecl-swt} ${ecl-ui} ${ecl-ui-forms} \
+ ${ecl-ui-workbench} ${apache-commons-codec}
+
+management-eclipse-plugin.swt-libs=${ecl-swt-win32-win32-x86} ${ecl-swt-linux-gtk-x86} ${ecl-swt-macosx-carbon} \
+ ${ecl-swt-linux-gtk-x86_64} ${ecl-swt-solaris-gtk-sparc}
+
+management-eclipse-plugin.libs=${management-eclipse-plugin.core-libs} ${management-eclipse-plugin.swt-libs}
+
management-eclipse-plugin-win32-win32-x86.libs=${management-eclipse-plugin.core-libs} \
- ${ecl-swt-win32-win32-x86} ${ecl-equinox-launcher-win32-win32-x86}
+ ${ecl-swt-win32-win32-x86} ${ecl-equinox-launcher-win32-win32-x86} ${ecl-core-runtime-compat-registry}
management-eclipse-plugin-linux-gtk-x86.libs=${management-eclipse-plugin.core-libs} \
- ${ecl-swt-linux-gtk-x86} ${ecl-equinox-launcher-linux-gtk-x86}
+ ${ecl-swt-linux-gtk-x86} ${ecl-equinox-launcher-linux-gtk-x86} ${ecl-core-runtime-compat-registry}
management-eclipse-plugin-linux-gtk-x86_64.libs=${management-eclipse-plugin.core-libs} \
- ${ecl-swt-linux-gtk-x86_64} ${ecl-equinox-launcher-linux-gtk-x86_64}
+ ${ecl-swt-linux-gtk-x86_64} ${ecl-equinox-launcher-linux-gtk-x86_64} ${ecl-core-runtime-compat-registry}
management-eclipse-plugin-macosx.libs=${management-eclipse-plugin.core-libs} \
- ${ecl-swt-macosx-carbon} ${ecl-equinox-launcher-macosx-carbon}
+ ${ecl-swt-macosx-carbon} ${ecl-equinox-launcher-macosx-carbon} ${ecl-core-runtime-compat-registry}
management-eclipse-plugin-solaris-gtk-sparc.libs=${management-eclipse-plugin.core-libs} \
- ${ecl-swt-solaris-gtk-sparc} ${ecl-equinox-launcher-solaris-gtk-sparc}
-
-management-eclipse-plugin.core-libs=${ibm-icu} ${ecl-core-jface} ${ecl-core-jface-databinding} \
- ${ecl-core-commands} ${ecl-core-contenttype} ${ecl-core-databinding} ${ecl-core-expressions} \
- ${ecl-core-jobs} ${ecl-core-runtime} ${ecl-core-runtime-compat-registry} ${ecl-equinox-app} \
- ${ecl-equinox-common} ${ecl-equinox-launcher} ${ecl-equinox-prefs} ${ecl-equinox-registry} \
- ${ecl-help} ${ecl-osgi} ${ecl-swt} ${ecl-ui} ${ecl-ui-forms} ${ecl-ui-workbench} ${apache-commons-codec}
+ ${ecl-swt-solaris-gtk-sparc} ${ecl-equinox-launcher-solaris-gtk-sparc} ${ecl-core-runtime-compat-registry}
-management-eclipse-plugin.platform-libs=${ecl-equinox-launcher-win32-win32-x86} \
- ${ecl-equinox-launcher-linux-gtk-x86} ${ecl-equinox-launcher-macosx-carbon} \
- ${ecl-swt-win32-win32-x86} ${ecl-swt-linux-gtk-x86} ${ecl-swt-macosx-carbon} \
- ${ecl-swt-linux-gtk-x86_64} ${ecl-equinox-launcher-linux-gtk-x86_64} \
- ${ecl-swt-solaris-gtk-sparc} ${ecl-equinox-launcher-solaris-gtk-sparc}
-
-management-eclipse-plugin.libs=${management-eclipse-plugin.core-libs} ${management-eclipse-plugin.platform-libs}
-
-
-management-tools-qpid-cli.libs=${jline} ${commons-configuration.libs}
-
common.test.libs=${test.libs}
broker.test.libs=${test.libs}
client.test.libs=${test.libs}
@@ -190,9 +137,5 @@ systests.libs=${test.libs}
broker-plugins.test.libs=${test.libs}
broker-plugins-experimental-info.test.libs=${test.libs} ${servlet-api} ${jetty} ${jetty-util} ${jetty-servlet-tester}
-management-client.test.libs=${muse.libs} ${test.libs} ${log4j} ${javassist} ${geronimo-servlet} ${commons-pool}
-management-console.test.libs=${junit4} ${slf4j-log4j} ${log4j}
-management-agent.test.libs=${junit}
-management-eclipse-plugin.test.libs=${systests.libs}
-management-tools-qpid-cli.test.libs=${junit4} ${slf4j-log4j} ${log4j}
+management-eclipse-plugin.test.libs=${test.libs}
management-common.test.libs=${test.libs}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.xml?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.xml (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.xml Mon Aug 15 11:26:46 2011
@@ -22,18 +22,17 @@
<import file="common.xml"/>
-
<findSubProjects name="broker-plugins" dir="broker-plugins"/>
- <findSubProjects name="management" dir="management" excludes="common,example,tools/qpid-cli"/>
+ <findSubProjects name="client-plugins" dir="client-plugins" erroronmissingdir="false"/>
+ <findSubProjects name="management" dir="management" excludes="common,example"/>
<property name="modules.core" value="junit-toolkit common management/common amqp-1-0-common broker client amqp-1-0-client tools"/>
<property name="modules.examples" value="client/example management/example"/>
<property name="modules.tests" value="systests perftests integrationtests testkit"/>
<property name="modules.management" value="${management}"/>
- <property name="modules.plugin" value="${broker-plugins}"/>
- <property name="modules.management.tools" value="management/tools/qpid-cli"/>
+ <property name="modules.plugin" value="${broker-plugins} ${client-plugins}"/>
<property name="modules" value="${modules.core} ${modules.examples}
- ${modules.management} ${modules.management.tools} ${modules.tests} ${modules.plugin}"/>
+ ${modules.management} ${modules.tests} ${modules.plugin}"/>
<property name="qpid.jar" location="${build.lib}/qpid-all.jar"/>
<basename property="qpid.jar.name" file="${qpid.jar}"/>
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/README.txt
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/README.txt?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/README.txt (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/README.txt Mon Aug 15 11:26:46 2011
@@ -24,7 +24,7 @@ run more easily.
E.g, in order to run the Hello example, you would add the client+example library
files to the java classpath and launch the example like follows:
-java -cp "lib/qpid-all.jar:example/lib/qpid-client-examples-<version>.jar" \
+java -cp "lib/qpid-all.jar:example/lib/qpid-client-example-<version>.jar" \
org.apache.qpid.example.Hello
NOTE: The client uses the SL4FJ API for its logging. You must supply a logging
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java Mon Aug 15 11:26:46 2011
@@ -33,7 +33,6 @@ import java.util.Properties;
* It is equivalent to a PropertyFile of value:
*
* connectionfactory.local=amqp://guest:guest@clientid/test?brokerlist='localhost'
- * connectionfactory.vm=amqp://guest:guest@clientid/test?brokerlist='vm://:1'
*
* queue.queue=example.MyQueue
* topic.topic=example.hierarical.topic
@@ -61,7 +60,6 @@ public class ConnectionSetup
Properties properties = new Properties();
properties.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
properties.put("connectionfactory." + CONNECTION_JNDI_NAME, CONNECTION_NAME);
- properties.put("connectionfactory." + "vm", "amqp://guest:guest@clientid/test?brokerlist='vm://:1'");
properties.put("queue." + QUEUE_JNDI_NAME, QUEUE_NAME);
properties.put("topic." + TOPIC_JNDI_NAME, TOPIC_NAME);
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/client.bnd
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/client.bnd?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/client.bnd (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/client.bnd Mon Aug 15 11:26:46 2011
@@ -17,7 +17,7 @@
# under the License.
#
-ver: 0.11.0
+ver: 0.13.0
Bundle-SymbolicName: qpid-client
Bundle-Version: ${ver}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java Mon Aug 15 11:26:46 2011
@@ -72,6 +72,17 @@ public class AMQAnyDestination extends A
public String getTopicName() throws JMSException
{
- return super.getRoutingKey().toString();
+ if (getRoutingKey() != null)
+ {
+ return getRoutingKey().asString();
+ }
+ else if (getSubject() != null)
+ {
+ return getSubject();
+ }
+ else
+ {
+ return null;
+ }
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java Mon Aug 15 11:26:46 2011
@@ -56,9 +56,7 @@ public class AMQBrokerDetails implements
if (transport != null)
{
//todo this list of valid transports should be enumerated somewhere
- if ((!(transport.equalsIgnoreCase(BrokerDetails.VM) ||
- transport.equalsIgnoreCase(BrokerDetails.TCP) ||
- transport.equalsIgnoreCase(BrokerDetails.SOCKET))))
+ if (!(transport.equalsIgnoreCase(BrokerDetails.TCP)))
{
if (transport.equalsIgnoreCase("localhost"))
{
@@ -182,10 +180,7 @@ public class AMQBrokerDetails implements
}
else
{
- if (!_transport.equalsIgnoreCase(SOCKET))
- {
- setPort(port);
- }
+ setPort(port);
}
String queryString = connection.getQuery();
@@ -301,17 +296,9 @@ public class AMQBrokerDetails implements
sb.append(_transport);
sb.append("://");
-
- if (!(_transport.equalsIgnoreCase(VM)))
- {
- sb.append(_host);
- }
-
- if (!(_transport.equalsIgnoreCase(SOCKET)))
- {
- sb.append(':');
- sb.append(_port);
- }
+ sb.append(_host);
+ sb.append(':');
+ sb.append(_port);
sb.append(printOptionsURL());
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Aug 15 11:26:46 2011
@@ -111,7 +111,7 @@ public class AMQConnection extends Close
/** Maps from session id (Integer) to AMQSession instance */
private final ChannelToSessionMap _sessions = new ChannelToSessionMap();
- private String _clientName;
+ private final String _clientName;
/** The user name to use for authentication */
private String _username;
@@ -126,7 +126,7 @@ public class AMQConnection extends Close
private ConnectionListener _connectionListener;
- private ConnectionURL _connectionURL;
+ private final ConnectionURL _connectionURL;
/**
* Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message
@@ -173,8 +173,8 @@ public class AMQConnection extends Close
//Indicates the sync publish options (persistent|all)
//By default it's async publish
private String _syncPublish = "";
-
- // Indicates whether to use the old map message format or the
+
+ // Indicates whether to use the old map message format or the
// new amqp-0-10 encoded format.
private boolean _useLegacyMapMessageFormat;
@@ -257,6 +257,11 @@ public class AMQConnection extends Close
*/
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
+ if (connectionURL == null)
+ {
+ throw new IllegalArgumentException("Connection must be specified");
+ }
+
// set this connection maxPrefetch
if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null)
{
@@ -264,7 +269,7 @@ public class AMQConnection extends Close
}
else
{
- // use the defaul value set for all connections
+ // use the default value set for all connections
_maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
ClientProperties.MAX_PREFETCH_DEFAULT));
}
@@ -278,7 +283,7 @@ public class AMQConnection extends Close
}
else
{
- // use the defaul value set for all connections
+ // use the default value set for all connections
_syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME);
if (_syncPersistence)
{
@@ -293,7 +298,7 @@ public class AMQConnection extends Close
}
else
{
- // use the defaul value set for all connections
+ // use the default value set for all connections
_syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME);
}
@@ -306,7 +311,7 @@ public class AMQConnection extends Close
// use the default value set for all connections
_syncPublish = System.getProperty((ClientProperties.SYNC_PUBLISH_PROP_NAME),_syncPublish);
}
-
+
if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT) != null)
{
_useLegacyMapMessageFormat = Boolean.parseBoolean(
@@ -317,16 +322,16 @@ public class AMQConnection extends Close
// use the default value set for all connections
_useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT);
}
-
+
String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
_logger.debug("AMQP version " + amqpVersion);
-
+
_failoverPolicy = new FailoverPolicy(connectionURL, this);
BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails();
- if (brokerDetails.getTransport().equals(BrokerDetails.VM) || "0-8".equals(amqpVersion))
+ if ("0-8".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_8_0(this);
- }
+ }
else if ("0-9".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_0_9(this);
@@ -346,11 +351,6 @@ public class AMQConnection extends Close
}
_sslConfiguration = sslConfig;
- if (connectionURL == null)
- {
- throw new IllegalArgumentException("Connection must be specified");
- }
-
_connectionURL = connectionURL;
_clientName = connectionURL.getClientName();
@@ -418,6 +418,7 @@ public class AMQConnection extends Close
brokerDetails = _failoverPolicy.getNextBrokerDetails();
}
}
+ verifyClientID();
if (_logger.isDebugEnabled())
{
@@ -504,7 +505,7 @@ public class AMQConnection extends Close
Class partypes[] = new Class[1];
partypes[0] = AMQConnection.class;
_delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
- //Update our session to use this new protocol version
+ //Update our session to use this new protocol version
_protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion());
}
@@ -535,14 +536,6 @@ public class AMQConnection extends Close
}
}
- protected AMQConnection(String username, String password, String clientName, String virtualHost)
- {
- _clientName = clientName;
- _username = username;
- _password = password;
- setVirtualHost(virtualHost);
- }
-
private void setVirtualHost(String virtualHost)
{
if (virtualHost != null && virtualHost.startsWith("/"))
@@ -696,20 +689,6 @@ public class AMQConnection extends Close
}
}
- private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
- throws AMQException, FailoverException
- {
- try
- {
- createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
- }
- catch (AMQException e)
- {
- deregisterSession(channelId);
- throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e);
- }
- }
-
public void setFailoverPolicy(FailoverPolicy policy)
{
_failoverPolicy = policy;
@@ -1096,7 +1075,7 @@ public class AMQConnection extends Close
{
_username = id;
}
-
+
public String getPassword()
{
return _password;
@@ -1272,7 +1251,7 @@ public class AMQConnection extends Close
{
je.setLinkedException((Exception) cause);
}
-
+
je.initCause(cause);
}
@@ -1305,7 +1284,7 @@ public class AMQConnection extends Close
{
_logger.info("Not a hard-error connection not closing: " + cause);
}
-
+
// deliver the exception if there is a listener
if (_exceptionListener != null)
{
@@ -1315,7 +1294,7 @@ public class AMQConnection extends Close
{
_logger.error("Throwable Received but no listener set: " + cause);
}
-
+
// if we are closing the connection, close sessions first
if (closer)
{
@@ -1372,6 +1351,20 @@ public class AMQConnection extends Close
return buf.toString();
}
+ /**
+ * Returns connection url.
+ * @return connection url
+ */
+ public ConnectionURL getConnectionURL()
+ {
+ return _connectionURL;
+ }
+
+ /**
+ * Returns stringified connection url. This url is suitable only for display
+ * as {@link AMQConnectionURL#toString()} converts any password to asterisks.
+ * @return connection url
+ */
public String toURL()
{
return _connectionURL.toString();
@@ -1442,7 +1435,18 @@ public class AMQConnection extends Close
{
return _delegate.getProtocolVersion();
}
-
+
+ public String getBrokerUUID()
+ {
+ if(getProtocolVersion().equals(ProtocolVersion.v0_10))
+ {
+ return ((AMQConnectionDelegate_0_10)_delegate).getUUID();
+ }
+ else
+ {
+ return null;
+ }
+ }
public boolean isFailingOver()
{
return (_protocolHandler.getFailoverLatch() != null);
@@ -1485,9 +1489,24 @@ public class AMQConnection extends Close
{
return _sessions.getNextChannelId();
}
-
+
public boolean isUseLegacyMapMessageFormat()
{
return _useLegacyMapMessageFormat;
}
+
+ private void verifyClientID() throws AMQException
+ {
+ if (Boolean.getBoolean(ClientProperties.QPID_VERIFY_CLIENT_ID))
+ {
+ try
+ {
+ _delegate.verifyClientID();
+ }
+ catch(JMSException e)
+ {
+ throw new AMQException(AMQConstant.ALREADY_EXISTS,"ClientID must be unique",e);
+ }
+ }
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Mon Aug 15 11:26:46 2011
@@ -57,10 +57,12 @@ public interface AMQConnectionDelegate
void closeConnection(long timeout) throws JMSException, AMQException;
<T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E;
-
+
int getMaxChannelID();
int getMinChannelID();
ProtocolVersion getProtocolVersion();
+
+ void verifyClientID() throws JMSException;
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Mon Aug 15 11:26:46 2011
@@ -47,6 +47,7 @@ import org.apache.qpid.transport.Connect
import org.apache.qpid.transport.ConnectionListener;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.ProtocolVersionException;
+import org.apache.qpid.transport.SessionDetachCode;
import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +58,11 @@ public class AMQConnectionDelegate_0_10
* This class logger.
*/
private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_10.class);
-
+
+ /**
+ * The name of the UUID property
+ */
+ private static final String UUID_NAME = "qpid.federation_tag";
/**
* The AMQ Connection.
*/
@@ -86,7 +91,14 @@ public class AMQConnectionDelegate_0_10
/**
* create a Session and start it if required.
*/
+
public Session createSession(boolean transacted, int acknowledgeMode, int prefetchHigh, int prefetchLow)
+ throws JMSException
+ {
+ return createSession(transacted,acknowledgeMode,prefetchHigh,prefetchLow,null);
+ }
+
+ public Session createSession(boolean transacted, int acknowledgeMode, int prefetchHigh, int prefetchLow, String name)
throws JMSException
{
_conn.checkNotClosed();
@@ -101,7 +113,7 @@ public class AMQConnectionDelegate_0_10
try
{
session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh,
- prefetchLow);
+ prefetchLow,name);
_conn.registerSession(channelId, session);
if (_conn._started)
{
@@ -335,6 +347,11 @@ public class AMQConnectionDelegate_0_10
return ProtocolVersion.v0_10;
}
+ public String getUUID()
+ {
+ return (String)_qpidConnection.getServerProperties().get(UUID_NAME);
+ }
+
private void retriveConnectionSettings(ConnectionSettings conSettings, BrokerDetails brokerDetail)
{
@@ -449,12 +466,31 @@ public class AMQConnectionDelegate_0_10
else
{
heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT);
- }
+ }
return heartbeat;
}
-
+
protected org.apache.qpid.transport.Connection getQpidConnection()
{
return _qpidConnection;
}
+
+ public void verifyClientID() throws JMSException
+ {
+ int prefetch = (int)_conn.getMaxPrefetch();
+ AMQSession_0_10 ssn = (AMQSession_0_10)createSession(false, 1,prefetch,prefetch,_conn.getClientID());
+ org.apache.qpid.transport.Session ssn_0_10 = ssn.getQpidSession();
+ try
+ {
+ ssn_0_10.awaitOpen();
+ }
+ catch(Exception e)
+ {
+ if (ssn_0_10.getDetachCode() != null &&
+ ssn_0_10.getDetachCode() == SessionDetachCode.SESSION_BUSY)
+ {
+ throw new JMSException("ClientID must be unique");
+ }
+ }
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Mon Aug 15 11:26:46 2011
@@ -39,7 +39,6 @@ import org.apache.qpid.client.failover.F
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.StateWaiter;
-import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.BasicQosOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
@@ -49,6 +48,11 @@ import org.apache.qpid.framing.TxSelectB
import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,15 +93,21 @@ public class AMQConnectionDelegate_8_0 i
StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates);
- // TODO: use system property thingy for this
- if (System.getProperty("UseTransportIo", "false").equals("false"))
+ ConnectionSettings settings = new ConnectionSettings();
+ settings.setHost(brokerDetail.getHost());
+ settings.setPort(brokerDetail.getPort());
+ settings.setProtocol(brokerDetail.getTransport());
+
+ SSLConfiguration sslConfig = _conn.getSSLConfiguration();
+ SSLContextFactory sslFactory = null;
+ if (sslConfig != null)
{
- TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
- }
- else
- {
- _conn.getProtocolHandler().createIoTransportSession(brokerDetail);
+ sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
}
+
+ OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion());
+ NetworkConnection network = transport.connect(settings, _conn._protocolHandler, sslFactory);
+ _conn._protocolHandler.setNetworkConnection(network);
_conn._protocolHandler.getProtocolSession().init();
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
@@ -322,4 +332,9 @@ public class AMQConnectionDelegate_8_0 i
{
return ProtocolVersion.v8_0;
}
+
+ public void verifyClientID() throws JMSException
+ {
+ // NOOP
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java Mon Aug 15 11:26:46 2011
@@ -27,18 +27,14 @@ import java.util.Map;
import org.apache.qpid.client.url.URLParser;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.url.URLHelper;
import org.apache.qpid.url.URLSyntaxException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class AMQConnectionURL implements ConnectionURL
{
- private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionURL.class);
-
+
private String _url;
private String _failoverMethod;
private Map<String, String> _failoverOptions;
@@ -295,17 +291,4 @@ public class AMQConnectionURL implements
return sb.toString();
}
-
- public static void main(String[] args) throws URLSyntaxException
- {
- String url2 =
- "amqp://ritchiem:bob@temp/testHost?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin'";
- // "amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''";
-
- ConnectionURL connectionurl2 = new AMQConnectionURL(url2);
-
- System.out.println(url2);
- System.out.println(connectionurl2);
-
- }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Mon Aug 15 11:26:46 2011
@@ -21,8 +21,6 @@
package org.apache.qpid.client;
import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import javax.jms.Destination;
@@ -34,8 +32,6 @@ import javax.naming.StringRefAddr;
import org.apache.qpid.client.messaging.address.AddressHelper;
import org.apache.qpid.client.messaging.address.Link;
import org.apache.qpid.client.messaging.address.Node;
-import org.apache.qpid.client.messaging.address.QpidExchangeOptions;
-import org.apache.qpid.client.messaging.address.QpidQueueOptions;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -78,11 +74,6 @@ public abstract class AMQDestination imp
private boolean _exchangeExistsChecked;
- private byte[] _byteEncoding;
- private static final int IS_DURABLE_MASK = 0x1;
- private static final int IS_EXCLUSIVE_MASK = 0x2;
- private static final int IS_AUTODELETE_MASK = 0x4;
-
public static final int QUEUE_TYPE = 1;
public static final int TOPIC_TYPE = 2;
public static final int UNKNOWN_TYPE = 3;
@@ -323,7 +314,11 @@ public abstract class AMQDestination imp
{
if(_urlAsShortString == null)
{
- toURL();
+ if (_url == null)
+ {
+ toURL();
+ }
+ _urlAsShortString = new AMQShortString(_url);
}
return _urlAsShortString;
}
@@ -370,7 +365,6 @@ public abstract class AMQDestination imp
// calculated URL now out of date
_url = null;
_urlAsShortString = null;
- _byteEncoding = null;
}
public AMQShortString getRoutingKey()
@@ -508,59 +502,10 @@ public abstract class AMQDestination imp
sb.deleteCharAt(sb.length() - 1);
url = sb.toString();
_url = url;
- _urlAsShortString = new AMQShortString(url);
}
return url;
}
- public byte[] toByteEncoding()
- {
- byte[] encoding = _byteEncoding;
- if(encoding == null)
- {
- int size = _exchangeClass.length() + 1 +
- _exchangeName.length() + 1 +
- 0 + // in place of the destination name
- (_queueName == null ? 0 : _queueName.length()) + 1 +
- 1;
- encoding = new byte[size];
- int pos = 0;
-
- pos = _exchangeClass.writeToByteArray(encoding, pos);
- pos = _exchangeName.writeToByteArray(encoding, pos);
-
- encoding[pos++] = (byte)0;
-
- if(_queueName == null)
- {
- encoding[pos++] = (byte)0;
- }
- else
- {
- pos = _queueName.writeToByteArray(encoding,pos);
- }
- byte options = 0;
- if(_isDurable)
- {
- options |= IS_DURABLE_MASK;
- }
- if(_isExclusive)
- {
- options |= IS_EXCLUSIVE_MASK;
- }
- if(_isAutoDelete)
- {
- options |= IS_AUTODELETE_MASK;
- }
- encoding[pos] = options;
-
-
- _byteEncoding = encoding;
-
- }
- return encoding;
- }
-
public boolean equals(Object o)
{
if (this == o)
@@ -614,53 +559,6 @@ public abstract class AMQDestination imp
null); // factory location
}
-
- public static Destination createDestination(byte[] byteEncodedDestination)
- {
- AMQShortString exchangeClass;
- AMQShortString exchangeName;
- AMQShortString routingKey;
- AMQShortString queueName;
- boolean isDurable;
- boolean isExclusive;
- boolean isAutoDelete;
-
- int pos = 0;
- exchangeClass = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
- pos+= exchangeClass.length() + 1;
- exchangeName = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
- pos+= exchangeName.length() + 1;
- routingKey = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
- pos+= (routingKey == null ? 0 : routingKey.length()) + 1;
- queueName = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
- pos+= (queueName == null ? 0 : queueName.length()) + 1;
- int options = byteEncodedDestination[pos];
- isDurable = (options & IS_DURABLE_MASK) != 0;
- isExclusive = (options & IS_EXCLUSIVE_MASK) != 0;
- isAutoDelete = (options & IS_AUTODELETE_MASK) != 0;
-
- if (exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
- {
- return new AMQQueue(exchangeName,routingKey,queueName,isExclusive,isAutoDelete,isDurable);
- }
- else if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
- {
- return new AMQTopic(exchangeName,routingKey,isAutoDelete,queueName,isDurable);
- }
- else if (exchangeClass.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS))
- {
- return new AMQHeadersExchange(routingKey);
- }
- else
- {
- return new AMQAnyDestination(exchangeName,exchangeClass,
- routingKey,isExclusive,
- isAutoDelete,queueName,
- isDurable, new AMQShortString[0]);
- }
-
- }
-
public static Destination createDestination(BindingURL binding)
{
AMQShortString type = binding.getExchangeClass();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Aug 15 11:26:46 2011
@@ -567,6 +567,8 @@ public abstract class AMQSession<C exten
close(-1);
}
+ public abstract AMQException getLastException();
+
public void checkNotClosed() throws JMSException
{
try
@@ -575,16 +577,20 @@ public abstract class AMQSession<C exten
}
catch (IllegalStateException ise)
{
- // if the Connection has closed then we should throw any exception that has occurred that we were not waiting for
- AMQStateManager manager = _connection.getProtocolHandler().getStateManager();
+ AMQException ex = getLastException();
+ if (ex != null)
+ {
+ IllegalStateException ssnClosed = new IllegalStateException(
+ "Session has been closed", ex.getErrorCode().toString());
- if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null)
+ ssnClosed.setLinkedException(ex);
+ ssnClosed.initCause(ex);
+ throw ssnClosed;
+ }
+ else
{
- ise.setLinkedException(manager.getLastException());
- ise.initCause(ise.getLinkedException());
+ throw ise;
}
-
- throw ise;
}
}
@@ -1044,7 +1050,28 @@ public abstract class AMQSession<C exten
{
checkNotClosed();
Topic origTopic = checkValidTopic(topic, true);
+
AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
+ if (dest.getDestSyntax() == DestSyntax.ADDR &&
+ !dest.isAddressResolved())
+ {
+ try
+ {
+ handleAddressBasedDestination(dest,false,true);
+ if (dest.getAddressType() != AMQDestination.TOPIC_TYPE)
+ {
+ throw new JMSException("Durable subscribers can only be created for Topics");
+ }
+ dest.getSourceNode().setDurable(true);
+ }
+ catch(AMQException e)
+ {
+ JMSException ex = new JMSException("Error when verifying destination");
+ ex.initCause(e);
+ ex.setLinkedException(e);
+ throw ex;
+ }
+ }
String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;
@@ -1056,15 +1083,9 @@ public abstract class AMQSession<C exten
// Not subscribed to this name in the current session
if (subscriber == null)
{
- AMQShortString topicName;
- if (topic instanceof AMQTopic)
- {
- topicName = ((AMQTopic) topic).getRoutingKey();
- } else
- {
- topicName = new AMQShortString(topic.getTopicName());
- }
-
+ // After the address is resolved routing key will not be null.
+ AMQShortString topicName = dest.getRoutingKey();
+
if (_strictAMQP)
{
if (_strictAMQPFATAL)
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Mon Aug 15 11:26:46 2011
@@ -159,13 +159,20 @@ public class AMQSession_0_10 extends AMQ
*/
AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry,
- int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+ int defaultPrefetchHighMark, int defaultPrefetchLowMark,String name)
{
super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
defaultPrefetchLowMark);
_qpidConnection = qpidConnection;
- _qpidSession = _qpidConnection.createSession(1);
+ if (name == null)
+ {
+ _qpidSession = _qpidConnection.createSession(1);
+ }
+ else
+ {
+ _qpidSession = _qpidConnection.createSession(name,1);
+ }
_qpidSession.setSessionListener(this);
if (_transacted)
{
@@ -192,11 +199,12 @@ public class AMQSession_0_10 extends AMQ
* @param qpidConnection The connection
*/
AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
- boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
+ boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow,
+ String name)
{
this(qpidConnection, con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(),
- defaultPrefetchHigh, defaultPrefetchLow);
+ defaultPrefetchHigh, defaultPrefetchLow,name);
}
private void addUnacked(int id)
@@ -776,7 +784,7 @@ public class AMQSession_0_10 extends AMQ
else
{
QueueNode node = (QueueNode)amqd.getSourceNode();
- getQpidSession().queueDeclare(queueName.toString(), "" ,
+ getQpidSession().queueDeclare(queueName.toString(), node.getAlternateExchange() ,
node.getDeclareArgs(),
node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
node.isDurable() ? Option.DURABLE : Option.NONE,
@@ -913,7 +921,26 @@ public class AMQSession_0_10 extends AMQ
setCurrentException(exc);
}
- public void closed(Session ssn) {}
+ public void closed(Session ssn)
+ {
+ try
+ {
+ super.closed(null);
+ if (flushTask != null)
+ {
+ flushTask.cancel();
+ flushTask = null;
+ }
+ } catch (Exception e)
+ {
+ _logger.error("Error closing JMS session", e);
+ }
+ }
+
+ public AMQException getLastException()
+ {
+ return getCurrentException();
+ }
protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
final boolean noLocal, final boolean nowait)
@@ -1029,11 +1056,9 @@ public class AMQSession_0_10 extends AMQ
code = ee.getErrorCode().getValue();
}
AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause());
-
- _connection.exceptionReceived(amqe);
-
_currentException = amqe;
}
+ _connection.exceptionReceived(_currentException);
}
public AMQMessageDelegateFactory getMessageDelegateFactory()
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Mon Aug 15 11:26:46 2011
@@ -38,6 +38,7 @@ import org.apache.qpid.client.message.Re
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQFrame;
@@ -584,4 +585,35 @@ public final class AMQSession_0_8 extend
queueName == null ? null : new AMQShortString(queueName),
bindingKey == null ? null : new AMQShortString(bindingKey));
}
+
+
+ public AMQException getLastException()
+ {
+ // if the Connection has closed then we should throw any exception that
+ // has occurred that we were not waiting for
+ AMQStateManager manager = _connection.getProtocolHandler()
+ .getStateManager();
+
+ Exception e = manager.getLastException();
+ if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED)
+ && e != null)
+ {
+ if (e instanceof AMQException)
+ {
+ return (AMQException) e;
+ }
+ else
+ {
+ AMQException amqe = new AMQException(AMQConstant
+ .getConstant(AMQConstant.INTERNAL_ERROR.getCode()),
+ e.getMessage(), e.getCause());
+ return amqe;
+ }
+ }
+ else
+ {
+ return null;
+ }
+ }
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Mon Aug 15 11:26:46 2011
@@ -147,13 +147,17 @@ public class AMQTopic extends AMQDestina
public String getTopicName() throws JMSException
{
- if (super.getRoutingKey() == null && super.getSubject() != null)
+ if (getRoutingKey() != null)
{
- return super.getSubject();
+ return getRoutingKey().asString();
+ }
+ else if (getSubject() != null)
+ {
+ return getSubject();
}
else
{
- return super.getRoutingKey().toString();
+ return null;
}
}
@@ -172,12 +176,18 @@ public class AMQTopic extends AMQDestina
public AMQShortString getRoutingKey()
{
- if (super.getRoutingKey() == null && super.getSubject() != null)
+ if (super.getRoutingKey() != null)
+ {
+ return super.getRoutingKey();
+ }
+ else if (getSubject() != null)
{
- return new AMQShortString(super.getSubject());
+ return new AMQShortString(getSubject());
}
else
{
+ setRoutingKey(new AMQShortString(""));
+ setSubject("");
return super.getRoutingKey();
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Mon Aug 15 11:26:46 2011
@@ -35,6 +35,7 @@ import org.apache.qpid.client.AMQDestina
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.QpidMessageProperties;
import org.apache.qpid.client.messaging.address.Link.Reliability;
import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
@@ -178,7 +179,7 @@ public class BasicMessageProducer_0_10 e
if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR &&
(destination.getSubject() != null ||
- (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get("qpid.subject") != null))
+ (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT) != null))
)
{
Map<String,Object> appProps = messageProps.getApplicationHeaders();
@@ -188,16 +189,16 @@ public class BasicMessageProducer_0_10 e
messageProps.setApplicationHeaders(appProps);
}
- if (appProps.get("qpid.subject") == null)
+ if (appProps.get(QpidMessageProperties.QPID_SUBJECT) == null)
{
// use default subject in address string
- appProps.put("qpid.subject",destination.getSubject());
+ appProps.put(QpidMessageProperties.QPID_SUBJECT,destination.getSubject());
}
- if (destination.getTargetNode().getType() == AMQDestination.TOPIC_TYPE)
+ if (destination.getAddressType() == AMQDestination.TOPIC_TYPE)
{
deliveryProp.setRoutingKey((String)
- messageProps.getApplicationHeaders().get("qpid.subject"));
+ messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT));
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java Mon Aug 15 11:26:46 2011
@@ -23,6 +23,7 @@ package org.apache.qpid.client;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
+import java.util.List;
import org.apache.qpid.framing.AMQShortString;
@@ -34,6 +35,18 @@ public enum CustomJMSXProperty
JMSXGroupSeq,
JMSXUserID;
+ private static List<String> _names;
+
+ static
+ {
+ CustomJMSXProperty[] properties = values();
+ _names = new ArrayList<String>(properties.length);
+ for(CustomJMSXProperty property : properties)
+ {
+ _names.add(property.toString());
+ }
+
+ }
private final AMQShortString _nameAsShortString;
@@ -47,20 +60,8 @@ public enum CustomJMSXProperty
return _nameAsShortString;
}
- private static Enumeration _names;
-
- public static synchronized Enumeration asEnumeration()
+ public static Enumeration asEnumeration()
{
- if(_names == null)
- {
- CustomJMSXProperty[] properties = values();
- ArrayList<String> nameList = new ArrayList<String>(properties.length);
- for(CustomJMSXProperty property : properties)
- {
- nameList.add(property.toString());
- }
- _names = Collections.enumeration(nameList);
- }
- return _names;
+ return Collections.enumeration(_names);
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java Mon Aug 15 11:26:46 2011
@@ -30,9 +30,11 @@ import org.apache.qpid.common.QpidProper
public class QpidConnectionMetaData implements ConnectionMetaData
{
+ private AMQConnection con;
QpidConnectionMetaData(AMQConnection conn)
{
+ this.con = conn;
}
public int getJMSMajorVersion() throws JMSException
@@ -62,12 +64,12 @@ public class QpidConnectionMetaData impl
public int getProviderMajorVersion() throws JMSException
{
- return 0;
+ return con.getProtocolVersion().getMajorVersion();
}
public int getProviderMinorVersion() throws JMSException
{
- return 8;
+ return con.getProtocolVersion().getMinorVersion();
}
public String getProviderVersion() throws JMSException
@@ -78,8 +80,7 @@ public class QpidConnectionMetaData impl
private String getProtocolVersion()
{
- // TODO - Implement based on connection negotiated protocol
- return "0.8";
+ return con.getProtocolVersion().toString();
}
public String getBrokerVersion()
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java Mon Aug 15 11:26:46 2011
@@ -21,10 +21,14 @@ import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
-import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.dtx.XidImpl;
-import org.apache.qpid.transport.*;
-
+import org.apache.qpid.transport.DtxXaStatus;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.transport.Future;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.RecoverResult;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.XaResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -211,9 +215,28 @@ public class XAResourceImpl implements X
* @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
*/
public boolean isSameRM(XAResource xaResource) throws XAException
- {
- // TODO : get the server identity of xaResource and compare it with our own one
- return false;
+ {
+ if(this == xaResource)
+ {
+ return true;
+ }
+ if(!(xaResource instanceof XAResourceImpl))
+ {
+ return false;
+ }
+
+ XAResourceImpl other = (XAResourceImpl)xaResource;
+
+ String myUUID = ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID();
+ String otherUUID = ((AMQSession_0_10)other._xaSession).getAMQConnection().getBrokerUUID();
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Comparing my UUID " + myUUID + " with other UUID " + otherUUID);
+ }
+
+ return (myUUID != null && otherUUID != null && myUUID.equals(otherUUID));
+
}
/**
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java Mon Aug 15 11:26:46 2011
@@ -52,7 +52,7 @@ public class XASessionImpl extends AMQSe
{
super(qpidConnection, con, channelId, false, // this is not a transacted session
Session.AUTO_ACKNOWLEDGE, // the ack mode is transacted
- MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
+ MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow,null);
createSession();
_xaResource = new XAResourceImpl(this);
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Mon Aug 15 11:26:46 2011
@@ -226,7 +226,7 @@ public class ConnectionStartMethodHandle
{
Object instance = mechanismClass.newInstance();
AMQCallbackHandler cbh = (AMQCallbackHandler) instance;
- cbh.initialise(protocolSession);
+ cbh.initialise(protocolSession.getAMQConnection().getConnectionURL());
return cbh;
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Mon Aug 15 11:26:46 2011
@@ -634,6 +634,16 @@ public class AMQMessageDelegate_0_10 ext
{
return new String(_messageProps.getUserId());
}
+ else if (QpidMessageProperties.AMQP_0_10_APP_ID.equals(propertyName) &&
+ _messageProps.getAppId() != null)
+ {
+ return new String(_messageProps.getAppId());
+ }
+ else if (QpidMessageProperties.AMQP_0_10_ROUTING_KEY.equals(propertyName) &&
+ _deliveryProps.getRoutingKey() != null)
+ {
+ return _deliveryProps.getRoutingKey();
+ }
else
{
checkPropertyName(propertyName);
@@ -740,7 +750,14 @@ public class AMQMessageDelegate_0_10 ext
{
checkPropertyName(propertyName);
checkWritableProperties();
- setApplicationHeader(propertyName, value);
+ if (QpidMessageProperties.AMQP_0_10_APP_ID.equals(propertyName))
+ {
+ _messageProps.setAppId(value.getBytes());
+ }
+ else
+ {
+ setApplicationHeader(propertyName, value);
+ }
}
private static final Set<Class> ALLOWED = new HashSet();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java Mon Aug 15 11:26:46 2011
@@ -110,7 +110,7 @@ public class AMQPEncodedMapMessage exten
}
// for testing
- Map<String,Object> getMap()
+ public Map<String,Object> getMap()
{
return _map;
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java Mon Aug 15 11:26:46 2011
@@ -55,7 +55,7 @@ public class AddressHelper
public static final String EXCLUSIVE = "exclusive";
public static final String AUTO_DELETE = "auto-delete";
public static final String TYPE = "type";
- public static final String ALT_EXCHANGE = "alt-exchange";
+ public static final String ALT_EXCHANGE = "alternate-exchange";
public static final String BINDINGS = "bindings";
public static final String BROWSE = "browse";
public static final String MODE = "mode";
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Aug 15 11:26:46 2011
@@ -57,7 +57,6 @@ import org.apache.qpid.framing.Heartbeat
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.Job;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
@@ -65,8 +64,9 @@ import org.apache.qpid.protocol.AMQMetho
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.network.io.IoTransport;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.NetworkTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -172,11 +172,13 @@ public class AMQProtocolHandler implemen
private Job _readJob;
private Job _writeJob;
private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
- private NetworkDriver _networkDriver;
private ProtocolVersion _suggestedProtocolVersion;
private long _writtenBytes;
private long _readBytes;
+ private NetworkTransport _transport;
+ private NetworkConnection _network;
+ private Sender<ByteBuffer> _sender;
/**
* Creates a new protocol handler, associated with the specified client connection instance.
@@ -211,21 +213,6 @@ public class AMQProtocolHandler implemen
}
/**
- * Called when we want to create a new IoTransport session
- * @param brokerDetail
- */
- public void createIoTransportSession(BrokerDetails brokerDetail)
- {
- _protocolSession = new AMQProtocolSession(this, _connection);
- _stateManager.setProtocolSession(_protocolSession);
- IoTransport.connect_0_9(getProtocolSession(),
- brokerDetail.getHost(),
- brokerDetail.getPort(),
- brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL));
- _protocolSession.init();
- }
-
- /**
* Called when the network connection is closed. This can happen, either because the client explicitly requested
* that the connection be closed, in which case nothing is done, or because the connection died. In the case
* where the connection died, an attempt to failover automatically to a new connection may be started. The failover
@@ -315,7 +302,7 @@ public class AMQProtocolHandler implemen
// failover:
HeartbeatDiagnostics.timeout();
_logger.warn("Timed out while waiting for heartbeat from peer.");
- _networkDriver.close();
+ _network.close();
}
public void writerIdle()
@@ -337,7 +324,7 @@ public class AMQProtocolHandler implemen
{
_logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
// this will attempt failover
- _networkDriver.close();
+ _network.close();
closed();
}
else
@@ -589,7 +576,7 @@ public class AMQProtocolHandler implemen
{
public void run()
{
- _networkDriver.send(buf);
+ _sender.send(buf);
}
});
if (PROTOCOL_DEBUG)
@@ -610,7 +597,7 @@ public class AMQProtocolHandler implemen
if (wait)
{
- _networkDriver.flush();
+ _sender.flush();
}
}
@@ -724,7 +711,7 @@ public class AMQProtocolHandler implemen
try
{
syncWrite(frame, ConnectionCloseOkBody.class, timeout);
- _networkDriver.close();
+ _network.close();
closed();
}
catch (AMQTimeoutException e)
@@ -844,17 +831,18 @@ public class AMQProtocolHandler implemen
public SocketAddress getRemoteAddress()
{
- return _networkDriver.getRemoteAddress();
+ return _network.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _networkDriver.getLocalAddress();
+ return _network.getLocalAddress();
}
- public void setNetworkDriver(NetworkDriver driver)
+ public void setNetworkConnection(NetworkConnection network)
{
- _networkDriver = driver;
+ _network = network;
+ _sender = network.getSender();
}
/** @param delay delay in seconds (not ms) */
@@ -862,15 +850,15 @@ public class AMQProtocolHandler implemen
{
if (delay > 0)
{
- getNetworkDriver().setMaxWriteIdle(delay);
- getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
+ _network.setMaxWriteIdle(delay);
+ _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
}
}
- public NetworkDriver getNetworkDriver()
+ public NetworkConnection getNetworkConnection()
{
- return _networkDriver;
+ return _network;
}
public ProtocolVersion getSuggestedProtocolVersion()
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Mon Aug 15 11:26:46 2011
@@ -148,16 +148,6 @@ public class AMQProtocolSession implemen
return getAMQConnection().getVirtualHost();
}
- public String getUsername()
- {
- return getAMQConnection().getUsername();
- }
-
- public String getPassword()
- {
- return getAMQConnection().getPassword();
- }
-
public SaslClient getSaslClient()
{
return _saslClient;
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/AMQCallbackHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/AMQCallbackHandler.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/AMQCallbackHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/AMQCallbackHandler.java Mon Aug 15 11:26:46 2011
@@ -22,9 +22,9 @@ package org.apache.qpid.client.security;
import javax.security.auth.callback.CallbackHandler;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.jms.ConnectionURL;
public interface AMQCallbackHandler extends CallbackHandler
{
- void initialise(AMQProtocolSession protocolSession);
+ void initialise(ConnectionURL connectionURL);
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java Mon Aug 15 11:26:46 2011
@@ -20,30 +20,29 @@
*/
package org.apache.qpid.client.security;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
+import org.apache.qpid.jms.ConnectionURL;
public class UsernameHashedPasswordCallbackHandler implements AMQCallbackHandler
{
- private static final Logger _logger = LoggerFactory.getLogger(UsernameHashedPasswordCallbackHandler.class);
+ private ConnectionURL _connectionURL;
- private AMQProtocolSession _protocolSession;
-
- public void initialise(AMQProtocolSession protocolSession)
+ /**
+ * @see org.apache.qpid.client.security.AMQCallbackHandler#initialise(org.apache.qpid.jms.ConnectionURL)
+ */
+ @Override
+ public void initialise(ConnectionURL connectionURL)
{
- _protocolSession = protocolSession;
+ _connectionURL = connectionURL;
}
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException
@@ -53,13 +52,13 @@ public class UsernameHashedPasswordCallb
Callback cb = callbacks[i];
if (cb instanceof NameCallback)
{
- ((NameCallback) cb).setName(_protocolSession.getUsername());
+ ((NameCallback) cb).setName(_connectionURL.getUsername());
}
else if (cb instanceof PasswordCallback)
{
try
{
- ((PasswordCallback) cb).setPassword(getHash(_protocolSession.getPassword()));
+ ((PasswordCallback) cb).setPassword(getHash(_connectionURL.getPassword()));
}
catch (NoSuchAlgorithmException e)
{
@@ -99,4 +98,5 @@ public class UsernameHashedPasswordCallb
return hash;
}
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java Mon Aug 15 11:26:46 2011
@@ -27,15 +27,19 @@ import javax.security.auth.callback.Name
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.jms.ConnectionURL;
public class UsernamePasswordCallbackHandler implements AMQCallbackHandler
{
- private AMQProtocolSession _protocolSession;
+ private ConnectionURL _connectionURL;
- public void initialise(AMQProtocolSession protocolSession)
+ /**
+ * @see org.apache.qpid.client.security.AMQCallbackHandler#initialise(org.apache.qpid.jms.ConnectionURL)
+ */
+ @Override
+ public void initialise(final ConnectionURL connectionURL)
{
- _protocolSession = protocolSession;
+ _connectionURL = connectionURL;
}
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException
@@ -45,11 +49,11 @@ public class UsernamePasswordCallbackHan
Callback cb = callbacks[i];
if (cb instanceof NameCallback)
{
- ((NameCallback)cb).setName(_protocolSession.getUsername());
+ ((NameCallback)cb).setName(_connectionURL.getUsername());
}
else if (cb instanceof PasswordCallback)
{
- ((PasswordCallback)cb).setPassword(_protocolSession.getPassword().toCharArray());
+ ((PasswordCallback)cb).setPassword(_connectionURL.getPassword().toCharArray());
}
else
{
@@ -57,4 +61,5 @@ public class UsernamePasswordCallbackHan
}
}
}
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java Mon Aug 15 11:26:46 2011
@@ -45,7 +45,7 @@ public class URLParser
private void parseURL(String fullURL) throws URLSyntaxException
{
// Connection URL format
- // amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\',option=\'value\';vm://:3/virtualpath?option=\'value\'',failover='method?option=\'value\',option='value''"
+ // amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\',option=\'value\';tcp://host:port?option=\'value\'',failover='method?option=\'value\',option='value''"
// Options are of course optional except for requiring a single broker in the broker list.
try
{
@@ -195,7 +195,7 @@ public class URLParser
{
String brokerlist = _url.getOptions().get(AMQConnectionURL.OPTIONS_BROKERLIST);
- // brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'
+ // brokerlist tcp://host:port?option='value',option='value';tcp://host:port/virtualpath?option='value'
StringTokenizer st = new StringTokenizer(brokerlist, "" + URLHelper.BROKER_SEPARATOR);
while (st.hasMoreTokens())
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java Mon Aug 15 11:26:46 2011
@@ -19,6 +19,7 @@ package org.apache.qpid.filter;
import java.util.HashMap;
+import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import org.apache.qpid.AMQInternalException;
@@ -32,7 +33,7 @@ import org.slf4j.LoggerFactory;
public class PropertyExpression implements Expression
{
// Constants - defined the same as JMS
- private static final int NON_PERSISTENT = 1;
+ private static enum JMSDeliveryMode { NON_PERSISTENT, PERSISTENT }
private static final int DEFAULT_PRIORITY = 4;
private static final Logger _logger = LoggerFactory.getLogger(PropertyExpression.class);
@@ -79,22 +80,24 @@ public class PropertyExpression implemen
{
public Object evaluate(AbstractJMSMessage message)
{
+
+ JMSDeliveryMode mode = JMSDeliveryMode.NON_PERSISTENT;
try
{
- int mode = message.getJMSDeliveryMode();
+ mode = message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ?
+ JMSDeliveryMode.PERSISTENT : JMSDeliveryMode.NON_PERSISTENT;
+
if (_logger.isDebugEnabled())
{
_logger.debug("JMSDeliveryMode is :" + mode);
}
-
- return mode;
}
catch (JMSException e)
{
_logger.warn("Error evaluating property",e);
}
- return NON_PERSISTENT;
+ return mode.toString();
}
});
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org