You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/18 16:37:10 UTC
svn commit: r576933 -
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/
Author: arnaudsimon
Date: Tue Sep 18 07:37:09 2007
New Revision: 576933
URL: http://svn.apache.org/viewvc?rev=576933&view=rev
Log:
Added XA support (for 0_10 only)
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java (with props)
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=576933&r1=576932&r2=576933&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Sep 18 07:37:09 2007
@@ -153,7 +153,7 @@
protected final ExecutorService _taskPool = Executors.newCachedThreadPool();
protected static final long DEFAULT_TIMEOUT = 1000 * 30;
- private AMQConnectionDelegate _delegate;
+ protected AMQConnectionDelegate _delegate;
/**
* @param broker brokerdetails
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=576933&r1=576932&r2=576933&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Tue Sep 18 07:37:09 2007
@@ -23,17 +23,21 @@
import java.io.IOException;
import javax.jms.JMSException;
+import javax.jms.XASession;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.Session;
public interface AMQConnectionDelegate
{
public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException;
- public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
+ public Session createSession(final boolean transacted, final int acknowledgeMode,
final int prefetchHigh, final int prefetchLow) throws JMSException;
+
+ public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException;
public void resubscribeSessions() throws JMSException, AMQException, FailoverException;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=576933&r1=576932&r2=576933&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Tue Sep 18 07:37:09 2007
@@ -3,6 +3,7 @@
import java.io.IOException;
import javax.jms.JMSException;
+import javax.jms.XASession;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
@@ -11,8 +12,6 @@
import org.apache.qpid.jms.Session;
import org.apache.qpidity.client.Client;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.jms.SessionImpl;
-import org.apache.qpidity.jms.ExceptionHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,6 +63,31 @@
}
return session;
}
+
+ /**
+ * create an XA Session and start it if required.
+ */
+ public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException
+ {
+ _conn.checkNotClosed();
+ int channelId = _conn._idFactory.incrementAndGet();
+ XASessionImpl session;
+ try
+ {
+ session = new XASessionImpl(_qpidConnection, _conn, channelId, prefetchHigh, prefetchLow);
+ _conn.registerSession(channelId, session);
+ if (_conn._started)
+ {
+ session.start();
+ }
+ }
+ catch (Exception e)
+ {
+ throw new JMSAMQException("cannot create session", e);
+ }
+ return session;
+ }
+
/**
* Make a connection with the broker
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java?rev=576933&r1=576932&r2=576933&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java Tue Sep 18 07:37:09 2007
@@ -28,6 +28,7 @@
import java.util.Iterator;
import javax.jms.JMSException;
+import javax.jms.XASession;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
@@ -99,6 +100,11 @@
throws JMSException
{
return createSession(transacted, acknowledgeMode, prefetch, prefetch);
+ }
+
+ public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException
+ {
+ throw new UnsupportedOperationException("0_8 version does not provide XA support");
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java?rev=576933&r1=576932&r2=576933&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java Tue Sep 18 07:37:09 2007
@@ -24,13 +24,7 @@
import java.net.UnknownHostException;
import java.util.Hashtable;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
+import javax.jms.*;
import javax.naming.Context;
import javax.naming.Name;
import javax.naming.NamingException;
@@ -45,7 +39,9 @@
import org.apache.qpid.url.URLSyntaxException;
-public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, ObjectFactory, Referenceable
+public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory,
+ ObjectFactory, Referenceable, XATopicConnectionFactory,
+ XAQueueConnectionFactory, XAConnectionFactory
{
private String _host;
private int _port;
@@ -77,18 +73,17 @@
_connectionDetails = url;
}
- /**
+ /**
* This constructor is never used!
*/
- public AMQConnectionFactory(String broker, String username, String password,
- String clientName, String virtualHost) throws URLSyntaxException
+ public AMQConnectionFactory(String broker, String username, String password, String clientName, String virtualHost)
+ throws URLSyntaxException
{
- this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" + clientName + "/" +
- virtualHost + "?brokerlist='" + broker + "'"));
+ this(new AMQConnectionURL(
+ ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + clientName + "/" + virtualHost + "?brokerlist='" + broker + "'"));
}
- /**
+ /**
* This constructor is never used!
*/
public AMQConnectionFactory(String host, int port, String virtualPath)
@@ -96,7 +91,7 @@
this(host, port, "guest", "guest", virtualPath);
}
- /**
+ /**
* This constructor is never used!
*/
public AMQConnectionFactory(String host, int port, String defaultUsername, String defaultPassword,
@@ -144,17 +139,21 @@
/**
* Getter for SSLConfiguration
+ *
* @return SSLConfiguration if set, otherwise null
*/
- public final SSLConfiguration getSSLConfiguration() {
+ public final SSLConfiguration getSSLConfiguration()
+ {
return _sslConfig;
}
/**
* Setter for SSLConfiguration
+ *
* @param sslConfig config to store
*/
- public final void setSSLConfiguration(SSLConfiguration sslConfig) {
+ public final void setSSLConfiguration(SSLConfiguration sslConfig)
+ {
_sslConfig = sslConfig;
}
@@ -355,8 +354,7 @@
* @return AMQConnection,AMQTopic,AMQQueue, or AMQConnectionFactory.
* @throws Exception
*/
- public Object getObjectInstance(Object obj, Name name, Context ctx,
- Hashtable env) throws Exception
+ public Object getObjectInstance(Object obj, Name name, Context ctx, Hashtable env) throws Exception
{
if (obj instanceof Reference)
{
@@ -409,10 +407,140 @@
public Reference getReference() throws NamingException
{
- return new Reference(
- AMQConnectionFactory.class.getName(),
- new StringRefAddr(AMQConnectionFactory.class.getName(), _connectionDetails.getURL()),
- AMQConnectionFactory.class.getName(),
- null); // factory location
+ return new Reference(AMQConnectionFactory.class.getName(),
+ new StringRefAddr(AMQConnectionFactory.class.getName(), _connectionDetails.getURL()),
+ AMQConnectionFactory.class.getName(), null); // factory location
+ }
+
+ // ---------------------------------------------------------------------------------------------------
+ // the following methods are provided for XA compatibility
+ // Those methods are only supported by 0_10 and above
+ // ---------------------------------------------------------------------------------------------------
+
+ /**
+ * Creates a XAConnection with the default user identity.
+ * <p> The XAConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @return A newly created XAConnection
+ * @throws JMSException If creating the XAConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public XAConnection createXAConnection() throws JMSException
+ {
+ if (_connectionDetails.getURLVersion() == ConnectionURL.URL_0_8)
+ {
+ throw new UnsupportedOperationException("This version does not support XA operations");
+ }
+ else
+ {
+ try
+ {
+ return new XAConnectionImpl(_connectionDetails, _sslConfig);
+ }
+ catch (Exception e)
+ {
+ JMSException jmse = new JMSException("Error creating connection: " + e.getMessage());
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+ }
+ }
+
+ /**
+ * Creates a XAConnection with the specified user identity.
+ * <p> The XAConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @param username the caller's user name
+ * @param password the caller's password
+ * @return A newly created XAConnection.
+ * @throws JMSException If creating the XAConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public XAConnection createXAConnection(String username, String password) throws JMSException
+ {
+ if (_connectionDetails != null)
+ {
+ _connectionDetails.setUsername(username);
+ _connectionDetails.setPassword(password);
+
+ if (_connectionDetails.getClientName() == null || _connectionDetails.getClientName().equals(""))
+ {
+ _connectionDetails.setClientName(getUniqueClientID());
+ }
+ }
+ else
+ {
+ throw new JMSException("A URL must be specified to access XA connections");
+ }
+ return createXAConnection();
+ }
+
+
+ /**
+ * Creates a XATopicConnection with the default user identity.
+ * <p> The XATopicConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @return A newly created XATopicConnection
+ * @throws JMSException If creating the XATopicConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public XATopicConnection createXATopicConnection() throws JMSException
+ {
+ return (XATopicConnection) createXAConnection();
+ }
+
+ /**
+ * Creates a XATopicConnection with the specified user identity.
+ * <p> The XATopicConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @param username the caller's user name
+ * @param password the caller's password
+ * @return A newly created XATopicConnection.
+ * @throws JMSException If creating the XATopicConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public XATopicConnection createXATopicConnection(String username, String password) throws JMSException
+ {
+ return (XATopicConnection) createXAConnection(username, password);
+ }
+
+ /**
+ * Creates a XAQueueConnection with the default user identity.
+ * <p> The XAQueueConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @return A newly created XAQueueConnection
+ * @throws JMSException If creating the XAQueueConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public XAQueueConnection createXAQueueConnection() throws JMSException
+ {
+ return (XAQueueConnection) createXAConnection();
+ }
+
+ /**
+ * Creates a XAQueueConnection with the specified user identity.
+ * <p> The XAQueueConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @param username the caller's user name
+ * @param password the caller's password
+ * @return A newly created XAQueueConnection.
+ * @throws JMSException If creating the XAQueueConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public XAQueueConnection createXAQueueConnection(String username, String password) throws JMSException
+ {
+ return (XAQueueConnection) createXAConnection(username, password);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=576933&r1=576932&r2=576933&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue Sep 18 07:37:09 2007
@@ -80,15 +80,15 @@
* @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
*/
- AMQSession_0_10( org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark,
- int defaultPrefetchLowMark)
+ AMQSession_0_10(org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId,
+ boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry,
+ int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
defaultPrefetchLowMark);
// create the qpid session with an expiry <= 0 so that the session does not expire
- _qpidSession = qpidConnection.createSession(0);
+ _qpidSession = qpidConnection.createSession(0);
// set the exception listnere for this session
_qpidSession.setExceptionListener(new QpidSessionExceptionListener());
// set transacted if required
@@ -108,8 +108,8 @@
* @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
*/
- AMQSession_0_10(org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
- int defaultPrefetchLow)
+ AMQSession_0_10(org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId,
+ boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
{
this(qpidConnection, con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(),
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=576933&r1=576932&r2=576933&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue Sep 18 07:37:09 2007
@@ -79,7 +79,7 @@
}
Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
// if there is a replyto destination then we need to request the exchange info
- if (message.getMessageProperties().getReplyTo() != null)
+ if (! message.getMessageProperties().getReplyTo().getExchangeName().equals(""))
{
Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession()
.exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java?rev=576933&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java Tue Sep 18 07:37:09 2007
@@ -0,0 +1,78 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.AMQException;
+
+import javax.jms.*;
+
+/**
+ * This class implements the javax.jms.XAConnection interface
+ */
+public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQueueConnection, XATopicConnection
+{
+ //-- constructor
+ /**
+ * Create a XAConnection from a connectionURL
+ */
+ public XAConnectionImpl(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
+ {
+ super(connectionURL, sslConfig);
+ }
+
+ //-- interface XAConnection
+ /**
+ * Creates an XASession.
+ *
+ * @return A newly created XASession.
+ * @throws JMSException If the XAConnectiono fails to create an XASession due to
+ * some internal error.
+ */
+ public synchronized XASession createXASession() throws JMSException
+ {
+ checkNotClosed();
+ return _delegate.createXASession(AMQSession.DEFAULT_PREFETCH_HIGH_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK);
+ }
+
+ //-- Interface XAQueueConnection
+ /**
+ * Creates an XAQueueSession.
+ *
+ * @return A newly created XASession.
+ * @throws JMSException If the XAQueueConnectionImpl fails to create an XASession due to
+ * some internal error.
+ */
+ public XAQueueSession createXAQueueSession() throws JMSException
+ {
+ return (XAQueueSession) createXASession();
+ }
+
+ //-- Interface XATopicConnection
+ /**
+ * Creates an XAQueueSession.
+ *
+ * @return A newly created XASession.
+ * @throws JMSException If the XAQueueConnectionImpl fails to create an XASession due to
+ * some internal error.
+ */
+ public XATopicSession createXATopicSession() throws JMSException
+ {
+ return (XATopicSession) createXASession();
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java?rev=576933&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java Tue Sep 18 07:37:09 2007
@@ -0,0 +1,507 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.dtx.XidImpl;
+import org.apache.qpidity.transport.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is an implementation of javax.jms.XAResource.
+ */
+public class XAResourceImpl implements XAResource
+{
+ /**
+ * this XAResourceImpl's logger
+ */
+ private static final Logger _logger = LoggerFactory.getLogger(XAResourceImpl.class);
+
+ /**
+ * Reference to the associated XASession
+ */
+ private XASessionImpl _xaSession = null;
+
+ /**
+ * The XID of this resource
+ */
+ private Xid _xid;
+
+ //--- constructor
+
+ /**
+ * Create an XAResource associated with a XASession
+ *
+ * @param xaSession The session XAresource
+ */
+ protected XAResourceImpl(XASessionImpl xaSession)
+ {
+ _xaSession = xaSession;
+ }
+
+ //--- The XAResource
+ /**
+ * Commits the global transaction specified by xid.
+ *
+ * @param xid A global transaction identifier
+ * @param b If true, use a one-phase commit protocol to commit the work done on behalf of xid.
+ * @throws XAException An error has occurred. An error has occurred. Possible XAExceptions are XA_HEURHAZ,
+ * XA_HEURCOM, XA_HEURRB, XA_HEURMIX, XAER_RMERR, XAER_RMFAIL, XAER_NOTA, XAER_INVAL, or XAER_PROTO.
+ */
+ public void commit(Xid xid, boolean b) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("commit ", xid);
+ }
+ if (xid == null)
+ {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ Future<DtxCoordinationCommitResult> future;
+ try
+ {
+ future = _xaSession.getQpidSession()
+ .dtxCoordinationCommit(XidImpl.convertToString(xid), b ? Option.ONE_PHASE : Option.NO_OPTION);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Cannot convert Xid into String format ", e);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ // now wait on the future for the result
+ DtxCoordinationCommitResult result = future.get();
+ int status = result.getStatus();
+ switch (status)
+ {
+ case Constant.XA_OK:
+ // do nothing this ok
+ break;
+ case Constant.XA_HEURHAZ:
+ throw new XAException(XAException.XA_HEURHAZ);
+ case Constant.XA_HEURCOM:
+ throw new XAException(XAException.XA_HEURCOM);
+ case Constant.XA_HEURRB:
+ throw new XAException(XAException.XA_HEURRB);
+ case Constant.XA_HEURMIX:
+ throw new XAException(XAException.XA_HEURMIX);
+ case Constant.XA_RBROLLBACK:
+ throw new XAException(XAException.XA_RBROLLBACK);
+ case Constant.XA_RBTIMEOUT:
+ throw new XAException(XAException.XA_RBTIMEOUT);
+ default:
+ // this should not happen
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("got unexpected status value: ", status);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ }
+
+ /**
+ * Ends the work performed on behalf of a transaction branch.
+ * The resource manager disassociates the XA resource from the transaction branch specified
+ * and lets the transaction complete.
+ * <ul>
+ * <li> If TMSUSPEND is specified in the flags, the transaction branch is temporarily suspended in an incomplete state.
+ * The transaction context is in a suspended state and must be resumed via the start method with TMRESUME specified.
+ * <li> If TMFAIL is specified, the portion of work has failed. The resource manager may mark the transaction as rollback-only
+ * <li> If TMSUCCESS is specified, the portion of work has completed successfully.
+ * /ul>
+ *
+ * @param xid A global transaction identifier that is the same as the identifier used previously in the start method
+ * @param flag One of TMSUCCESS, TMFAIL, or TMSUSPEND.
+ * @throws XAException An error has occurred. An error has occurred. Possible XAException values are XAER_RMERR,
+ * XAER_RMFAILED, XAER_NOTA, XAER_INVAL, XAER_PROTO, or XA_RB*.
+ */
+ public void end(Xid xid, int flag) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("end ", xid);
+ }
+ if (xid == null)
+ {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ Future<DtxDemarcationEndResult> future;
+ try
+ {
+ future = _xaSession.getQpidSession()
+ .dtxDemarcationEnd(XidImpl.convertToString(xid),
+ flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION,
+ flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Cannot convert Xid into String format ", e);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ // now wait on the future for the result
+ DtxDemarcationEndResult result = future.get();
+ int status = result.getStatus();
+ switch (status)
+ {
+ case Constant.XA_OK:
+ // do nothing this ok
+ break;
+ case Constant.XA_RBROLLBACK:
+ throw new XAException(XAException.XA_RBROLLBACK);
+ case Constant.XA_RBTIMEOUT:
+ throw new XAException(XAException.XA_RBTIMEOUT);
+ default:
+ // this should not happen
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("got unexpected status value: ", status);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ }
+
+ /**
+ * Tells the resource manager to forget about a heuristically completed transaction branch.
+ *
+ * @param xid String(xid.getGlobalTransactionId() A global transaction identifier
+ * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL,
+ * XAER_NOTA, XAER_INVAL, or XAER_PROTO.
+ */
+ public void forget(Xid xid) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("forget ", xid);
+ }
+ if (xid == null)
+ {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ _xaSession.getQpidSession().dtxCoordinationForget(new String(xid.getGlobalTransactionId()));
+ }
+
+ /**
+ * Obtains the current transaction timeout value set for this XAResource instance.
+ * If XAResource.setTransactionTimeout was not used prior to invoking this method,
+ * the return value is the default timeout i.e. 0;
+ * otherwise, the value used in the previous setTransactionTimeout call is returned.
+ *
+ * @return The transaction timeout value in seconds.
+ * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
+ */
+ public int getTransactionTimeout() throws XAException
+ {
+ int result = 0;
+ if (_xid != null)
+ {
+ try
+ {
+ Future<DtxCoordinationGetTimeoutResult> future =
+ _xaSession.getQpidSession().dtxCoordinationGetTimeout(XidImpl.convertToString(_xid));
+ result = (int) future.get().getTimeout();
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Cannot convert Xid into String format ", e);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * This method is called to determine if the resource manager instance represented
+ * by the target object is the same as the resouce manager instance represented by
+ * the parameter xaResource.
+ *
+ * @param xaResource An XAResource object whose resource manager instance is to
+ * be compared with the resource manager instance of the target object
+ * @return <code>true</code> if it's the same RM instance; otherwise <code>false</code>.
+ * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
+ */
+ public boolean isSameRM(XAResource xaResource) throws XAException
+ {
+ // TODO : get the server identity of xaResource and compare it with our own one
+ return false;
+ }
+
+ /**
+ * Prepare for a transaction commit of the transaction specified in <code>Xid</code>.
+ *
+ * @param xid A global transaction identifier.
+ * @return A value indicating the resource manager's vote on the outcome of the transaction.
+ * The possible values are: XA_RDONLY or XA_OK.
+ * @throws XAException An error has occurred. Possible exception values are: XAER_RMERR or XAER_NOTA
+ */
+ public int prepare(Xid xid) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("prepare ", xid);
+ }
+ if (xid == null)
+ {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ Future<DtxCoordinationPrepareResult> future;
+ try
+ {
+ future = _xaSession.getQpidSession()
+ .dtxCoordinationPrepare(XidImpl.convertToString(xid));
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Cannot convert Xid into String format ", e);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ DtxCoordinationPrepareResult result = future.get();
+ int status = result.getStatus();
+ int outcome;
+ switch (status)
+ {
+ case Constant.XA_OK:
+ outcome = XAResource.XA_OK;
+ break;
+ case Constant.XA_RDONLY:
+ outcome = XAResource.XA_RDONLY;
+ break;
+ case Constant.XA_RBROLLBACK:
+ throw new XAException(XAException.XA_RBROLLBACK);
+ case Constant.XA_RBTIMEOUT:
+ throw new XAException(XAException.XA_RBTIMEOUT);
+ default:
+ // this should not happen
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("got unexpected status value: ", status);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ return outcome;
+ }
+
+ /**
+ * Obtains a list of prepared transaction branches.
+ * <p/>
+ * The transaction manager calls this method during recovery to obtain the list of transaction branches
+ * that are currently in prepared or heuristically completed states.
+ *
+ * @param flag One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS.
+ * TMNOFLAGS must be used when no other flags are set in the parameter.
+ * @return zero or more XIDs of the transaction branches that are currently in a prepared or heuristically
+ * completed state.
+ * @throws XAException An error has occurred. Possible value is XAER_INVAL.
+ */
+ public Xid[] recover(int flag) throws XAException
+ {
+ // the flag is ignored
+ Future<DtxCoordinationRecoverResult> future = _xaSession.getQpidSession().dtxCoordinationRecover();
+ DtxCoordinationRecoverResult res = future.get();
+ // todo make sure that the keys of the returned map are the xids
+ Xid[] result = new Xid[res.getInDoubt().size()];
+ int i = 0;
+ try
+ {
+ for (String xid : res.getInDoubt().keySet())
+ {
+ result[i] = new XidImpl(xid);
+ i++;
+ }
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Cannot convert string into Xid ", e);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ return result;
+ }
+
+ /**
+ * Informs the resource manager to roll back work done on behalf of a transaction branch
+ *
+ * @param xid A global transaction identifier.
+ * @throws XAException An error has occurred.
+ */
+ public void rollback(Xid xid) throws XAException
+ {
+ if (xid == null)
+ {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ // the flag is ignored
+ Future<DtxCoordinationRollbackResult> future;
+ try
+ {
+ future = _xaSession.getQpidSession()
+ .dtxCoordinationRollback(XidImpl.convertToString(xid));
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Cannot convert Xid into String format ", e);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ // now wait on the future for the result
+ DtxCoordinationRollbackResult result = future.get();
+ int status = result.getStatus();
+ switch (status)
+ {
+ case Constant.XA_OK:
+ // do nothing this ok
+ break;
+ case Constant.XA_HEURHAZ:
+ throw new XAException(XAException.XA_HEURHAZ);
+ case Constant.XA_HEURCOM:
+ throw new XAException(XAException.XA_HEURCOM);
+ case Constant.XA_HEURRB:
+ throw new XAException(XAException.XA_HEURRB);
+ case Constant.XA_HEURMIX:
+ throw new XAException(XAException.XA_HEURMIX);
+ case Constant.XA_RBROLLBACK:
+ throw new XAException(XAException.XA_RBROLLBACK);
+ case Constant.XA_RBTIMEOUT:
+ throw new XAException(XAException.XA_RBTIMEOUT);
+ default:
+ // this should not happen
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("got unexpected status value: ", status);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ }
+
+ /**
+ * Sets the current transaction timeout value for this XAResource instance.
+ * Once set, this timeout value is effective until setTransactionTimeout is
+ * invoked again with a different value.
+ * To reset the timeout value to the default value used by the resource manager, set the value to zero.
+ *
+ * @param timeout The transaction timeout value in seconds.
+ * @return true if transaction timeout value is set successfully; otherwise false.
+ * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL, or XAER_INVAL.
+ */
+ public boolean setTransactionTimeout(int timeout) throws XAException
+ {
+ boolean result = false;
+ if (_xid != null)
+ {
+ try
+ {
+ _xaSession.getQpidSession()
+ .dtxCoordinationSetTimeout(XidImpl.convertToString(_xid), timeout);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Cannot convert Xid into String format ", e);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ result = true;
+ }
+ return result;
+ }
+
+ /**
+ * Starts work on behalf of a transaction branch specified in xid.
+ * <ul>
+ * <li> If TMJOIN is specified, an exception is thrown as it is not supported
+ * <li> If TMRESUME is specified, the start applies to resuming a suspended transaction specified in the parameter xid.
+ * <li> If neither TMJOIN nor TMRESUME is specified and the transaction specified by xid has previously been seen by the
+ * resource manager, the resource manager throws the XAException exception with XAER_DUPID error code.
+ * </ul>
+ *
+ * @param xid A global transaction identifier to be associated with the resource
+ * @param flag One of TMNOFLAGS, TMJOIN, or TMRESUME
+ * @throws XAException An error has occurred. Possible exceptions
+ * are XA_RB*, XAER_RMERR, XAER_RMFAIL, XAER_DUPID, XAER_OUTSIDE, XAER_NOTA, XAER_INVAL, or XAER_PROTO.
+ */
+ public void start(Xid xid, int flag) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("start ", xid);
+ }
+ if (xid == null)
+ {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ _xid = xid;
+ Future<DtxDemarcationStartResult> future;
+ try
+ {
+ future = _xaSession.getQpidSession()
+ .dtxDemarcationStart(XidImpl.convertToString(xid),
+ flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION,
+ flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Cannot convert Xid into String format ", e);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ // now wait on the future for the result
+ DtxDemarcationStartResult result = future.get();
+ int status = result.getStatus();
+ switch (status)
+ {
+ case Constant.XA_OK:
+ // do nothing this ok
+ break;
+ case Constant.XA_RBROLLBACK:
+ throw new XAException(XAException.XA_RBROLLBACK);
+ case Constant.XA_RBTIMEOUT:
+ throw new XAException(XAException.XA_RBTIMEOUT);
+ default:
+ // this should not happen
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("got unexpected status value: ", status);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java?rev=576933&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java Tue Sep 18 07:37:09 2007
@@ -0,0 +1,147 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpidity.client.DtxSession;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+
+import javax.jms.*;
+import javax.transaction.xa.XAResource;
+
+/**
+ * This is an implementation of the javax.jms.XASEssion interface.
+ */
+public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopicSession, XAQueueSession
+{
+ /**
+ * XAResource associated with this XASession
+ */
+ private final XAResourceImpl _xaResource;
+
+ /**
+ * This XASession Qpid DtxSession
+ */
+ private DtxSession _qpidDtxSession;
+
+ /**
+ * The standard session
+ */
+ private Session _jmsSession;
+
+
+ //-- Constructors
+ /**
+ * Create a JMS XASession
+ */
+ public XASessionImpl(org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId,
+ int defaultPrefetchHigh, int defaultPrefetchLow)
+ {
+ super(qpidConnection, con, channelId, false, // this is not a transacted session
+ Session.AUTO_ACKNOWLEDGE, // the ack mode is transacted
+ MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
+ _qpidDtxSession = qpidConnection.createDTXSession(0);
+ _xaResource = new XAResourceImpl(this);
+ }
+
+ //--- javax.jms.XASEssion API
+
+ /**
+ * Gets the session associated with this XASession.
+ *
+ * @return The session object.
+ * @throws JMSException if an internal error occurs.
+ */
+ public Session getSession() throws JMSException
+ {
+ if (_jmsSession == null)
+ {
+ _jmsSession = getAMQConnection().createSession(true, getAcknowledgeMode());
+ }
+ return _jmsSession;
+ }
+
+ /**
+ * Returns an XA resource.
+ *
+ * @return An XA resource.
+ */
+ public XAResource getXAResource()
+ {
+ return _xaResource;
+ }
+
+ //-- overwritten mehtods
+ /**
+ * Throws a {@link TransactionInProgressException}, since it should
+ * not be called for an XASession object.
+ *
+ * @throws TransactionInProgressException always.
+ */
+ public void commit() throws JMSException
+ {
+ throw new TransactionInProgressException(
+ "XASession: A direct invocation of the commit operation is probibited!");
+ }
+
+ /**
+ * Throws a {@link TransactionInProgressException}, since it should
+ * not be called for an XASession object.
+ *
+ * @throws TransactionInProgressException always.
+ */
+ public void rollback() throws JMSException
+ {
+ throw new TransactionInProgressException(
+ "XASession: A direct invocation of the rollback operation is probibited!");
+ }
+
+ /**
+ * Access to the underlying Qpid Session
+ *
+ * @return The associated Qpid Session.
+ */
+ protected org.apache.qpidity.client.DtxSession getQpidSession()
+ {
+ return _qpidDtxSession;
+ }
+
+ //--- interface XAQueueSession
+ /**
+ * Gets the topic session associated with this <CODE>XATopicSession</CODE>.
+ *
+ * @return the topic session object
+ * @throws JMSException If an internal error occurs.
+ */
+ public QueueSession getQueueSession() throws JMSException
+ {
+ return (QueueSession) getSession();
+ }
+
+ //--- interface XATopicSession
+
+ /**
+ * Gets the topic session associated with this <CODE>XATopicSession</CODE>.
+ *
+ * @return the topic session object
+ * @throws JMSException If an internal error occurs.
+ */
+ public TopicSession getTopicSession() throws JMSException
+ {
+ return (TopicSession) getSession();
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
------------------------------------------------------------------------------
svn:eol-style = native