You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/18 16:37:10 UTC

svn commit: r576933 - /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/

Author: arnaudsimon
Date: Tue Sep 18 07:37:09 2007
New Revision: 576933

URL: http://svn.apache.org/viewvc?rev=576933&view=rev
Log:
Added XA support (for 0_10 only)

Added:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java   (with props)
Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=576933&r1=576932&r2=576933&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Sep 18 07:37:09 2007
@@ -153,7 +153,7 @@
     protected final ExecutorService _taskPool = Executors.newCachedThreadPool();
     protected static final long DEFAULT_TIMEOUT = 1000 * 30;
 
-    private AMQConnectionDelegate _delegate;
+    protected AMQConnectionDelegate _delegate;
 
     /**
      * @param broker      brokerdetails

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=576933&r1=576932&r2=576933&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Tue Sep 18 07:37:09 2007
@@ -23,17 +23,21 @@
 import java.io.IOException;
 
 import javax.jms.JMSException;
+import javax.jms.XASession;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.Session;
 
 public interface AMQConnectionDelegate
 {
     public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException;
 
-    public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
+    public Session createSession(final boolean transacted, final int acknowledgeMode,
             final int prefetchHigh, final int prefetchLow) throws JMSException;
+
+    public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException;
 
     public void resubscribeSessions() throws JMSException, AMQException, FailoverException;
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=576933&r1=576932&r2=576933&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Tue Sep 18 07:37:09 2007
@@ -3,6 +3,7 @@
 import java.io.IOException;
 
 import javax.jms.JMSException;
+import javax.jms.XASession;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
@@ -11,8 +12,6 @@
 import org.apache.qpid.jms.Session;
 import org.apache.qpidity.client.Client;
 import org.apache.qpidity.QpidException;
-import org.apache.qpidity.jms.SessionImpl;
-import org.apache.qpidity.jms.ExceptionHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,6 +63,31 @@
         }
         return session;
     }
+
+    /**
+     * create an XA Session and start it if required.
+     */
+    public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException
+    {
+        _conn.checkNotClosed();
+        int channelId = _conn._idFactory.incrementAndGet();
+        XASessionImpl session;
+        try
+        {
+            session = new XASessionImpl(_qpidConnection, _conn, channelId, prefetchHigh, prefetchLow);
+            _conn.registerSession(channelId, session);
+            if (_conn._started)
+            {
+                session.start();
+            }
+        }
+        catch (Exception e)
+        {
+            throw new JMSAMQException("cannot create session", e);
+        }
+        return session;
+    }
+
 
     /**
      * Make a connection with the broker

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java?rev=576933&r1=576932&r2=576933&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java Tue Sep 18 07:37:09 2007
@@ -28,6 +28,7 @@
 import java.util.Iterator;
 
 import javax.jms.JMSException;
+import javax.jms.XASession;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
@@ -99,6 +100,11 @@
             throws JMSException
     {
         return createSession(transacted, acknowledgeMode, prefetch, prefetch);
+    }
+
+    public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException
+    {
+        throw new UnsupportedOperationException("0_8 version does not provide XA support");
     }
 
     public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java?rev=576933&r1=576932&r2=576933&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java Tue Sep 18 07:37:09 2007
@@ -24,13 +24,7 @@
 import java.net.UnknownHostException;
 import java.util.Hashtable;
 
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
+import javax.jms.*;
 import javax.naming.Context;
 import javax.naming.Name;
 import javax.naming.NamingException;
@@ -45,7 +39,9 @@
 import org.apache.qpid.url.URLSyntaxException;
 
 
-public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, ObjectFactory, Referenceable
+public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory,
+                                             ObjectFactory, Referenceable, XATopicConnectionFactory,
+                                             XAQueueConnectionFactory, XAConnectionFactory
 {
     private String _host;
     private int _port;
@@ -77,18 +73,17 @@
         _connectionDetails = url;
     }
 
-     /**
+    /**
      * This constructor is never used!
      */
-    public AMQConnectionFactory(String broker, String username, String password,
-                                String clientName, String virtualHost) throws URLSyntaxException
+    public AMQConnectionFactory(String broker, String username, String password, String clientName, String virtualHost)
+            throws URLSyntaxException
     {
-        this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
-                                  username + ":" + password + "@" + clientName + "/" +
-                                  virtualHost + "?brokerlist='" + broker + "'"));
+        this(new AMQConnectionURL(
+                ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + clientName + "/" + virtualHost + "?brokerlist='" + broker + "'"));
     }
 
-     /**
+    /**
      * This constructor is never used!
      */
     public AMQConnectionFactory(String host, int port, String virtualPath)
@@ -96,7 +91,7 @@
         this(host, port, "guest", "guest", virtualPath);
     }
 
-     /**
+    /**
      * This constructor is never used!
      */
     public AMQConnectionFactory(String host, int port, String defaultUsername, String defaultPassword,
@@ -144,17 +139,21 @@
 
     /**
      * Getter for SSLConfiguration
+     *
      * @return SSLConfiguration if set, otherwise null
      */
-    public final SSLConfiguration getSSLConfiguration() {
+    public final SSLConfiguration getSSLConfiguration()
+    {
         return _sslConfig;
     }
 
     /**
      * Setter for SSLConfiguration
+     *
      * @param sslConfig config to store
      */
-    public final void setSSLConfiguration(SSLConfiguration sslConfig) {
+    public final void setSSLConfiguration(SSLConfiguration sslConfig)
+    {
         _sslConfig = sslConfig;
     }
 
@@ -355,8 +354,7 @@
      * @return AMQConnection,AMQTopic,AMQQueue, or AMQConnectionFactory.
      * @throws Exception
      */
-    public Object getObjectInstance(Object obj, Name name, Context ctx,
-                                    Hashtable env) throws Exception
+    public Object getObjectInstance(Object obj, Name name, Context ctx, Hashtable env) throws Exception
     {
         if (obj instanceof Reference)
         {
@@ -409,10 +407,140 @@
 
     public Reference getReference() throws NamingException
     {
-        return new Reference(
-                AMQConnectionFactory.class.getName(),
-                new StringRefAddr(AMQConnectionFactory.class.getName(), _connectionDetails.getURL()),
-                AMQConnectionFactory.class.getName(),
-                null);          // factory location
+        return new Reference(AMQConnectionFactory.class.getName(),
+                             new StringRefAddr(AMQConnectionFactory.class.getName(), _connectionDetails.getURL()),
+                             AMQConnectionFactory.class.getName(), null);          // factory location
+    }
+
+    // ---------------------------------------------------------------------------------------------------
+    // the following methods are provided for XA compatibility
+    // Those methods are only supported by 0_10 and above 
+    // ---------------------------------------------------------------------------------------------------
+
+    /**
+     * Creates a XAConnection with the default user identity.
+     * <p> The XAConnection is created in stopped mode. No messages
+     * will be delivered until the <code>Connection.start</code> method
+     * is explicitly called.
+     *
+     * @return A newly created XAConnection
+     * @throws JMSException         If creating the XAConnection fails due to some internal error.
+     * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+     */
+    public XAConnection createXAConnection() throws JMSException
+    {
+        if (_connectionDetails.getURLVersion() == ConnectionURL.URL_0_8)
+        {
+            throw new UnsupportedOperationException("This version does not support XA operations");
+        }
+        else
+        {
+            try
+            {
+                return new XAConnectionImpl(_connectionDetails, _sslConfig);
+            }
+            catch (Exception e)
+            {
+                JMSException jmse = new JMSException("Error creating connection: " + e.getMessage());
+                jmse.setLinkedException(e);
+                throw jmse;
+            }
+        }
+    }
+
+    /**
+     * Creates a XAConnection with the specified user identity.
+     * <p> The XAConnection is created in stopped mode. No messages
+     * will be delivered until the <code>Connection.start</code> method
+     * is explicitly called.
+     *
+     * @param username the caller's user name
+     * @param password the caller's password
+     * @return A newly created XAConnection.
+     * @throws JMSException         If creating the XAConnection fails due to some internal error.
+     * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+     */
+    public XAConnection createXAConnection(String username, String password) throws JMSException
+    {
+        if (_connectionDetails != null)
+        {
+            _connectionDetails.setUsername(username);
+            _connectionDetails.setPassword(password);
+
+            if (_connectionDetails.getClientName() == null || _connectionDetails.getClientName().equals(""))
+            {
+                _connectionDetails.setClientName(getUniqueClientID());
+            }
+        }
+        else
+        {
+            throw new JMSException("A URL must be specified to access XA connections");
+        }
+        return createXAConnection();
+    }
+
+
+    /**
+     * Creates a XATopicConnection with the default user identity.
+     * <p> The XATopicConnection is created in stopped mode. No messages
+     * will be delivered until the <code>Connection.start</code> method
+     * is explicitly called.
+     *
+     * @return A newly created XATopicConnection
+     * @throws JMSException         If creating the XATopicConnection fails due to some internal error.
+     * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+     */
+    public XATopicConnection createXATopicConnection() throws JMSException
+    {
+        return (XATopicConnection) createXAConnection();
+    }
+
+    /**
+     * Creates a XATopicConnection with the specified user identity.
+     * <p> The XATopicConnection is created in stopped mode. No messages
+     * will be delivered until the <code>Connection.start</code> method
+     * is explicitly called.
+     *
+     * @param username the caller's user name
+     * @param password the caller's password
+     * @return A newly created XATopicConnection.
+     * @throws JMSException         If creating the XATopicConnection fails due to some internal error.
+     * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+     */
+    public XATopicConnection createXATopicConnection(String username, String password) throws JMSException
+    {
+         return (XATopicConnection) createXAConnection(username, password);
+    }
+
+    /**
+     * Creates a XAQueueConnection with the default user identity.
+     * <p> The XAQueueConnection is created in stopped mode. No messages
+     * will be delivered until the <code>Connection.start</code> method
+     * is explicitly called.
+     *
+     * @return A newly created XAQueueConnection
+     * @throws JMSException         If creating the XAQueueConnection fails due to some internal error.
+     * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+     */
+    public XAQueueConnection createXAQueueConnection() throws JMSException
+    {
+       return (XAQueueConnection) createXAConnection();
+    }
+
+    /**
+     * Creates a XAQueueConnection with the specified user identity.
+     * <p> The XAQueueConnection is created in stopped mode. No messages
+     * will be delivered until the <code>Connection.start</code> method
+     * is explicitly called.
+     *
+     * @param username the caller's user name
+     * @param password the caller's password
+     * @return A newly created XAQueueConnection.
+     * @throws JMSException         If creating the XAQueueConnection fails due to some internal error.
+     * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+     */
+    public XAQueueConnection createXAQueueConnection(String username, String password) throws JMSException
+    {
+        return (XAQueueConnection) createXAConnection(username, password);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=576933&r1=576932&r2=576933&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue Sep 18 07:37:09 2007
@@ -80,15 +80,15 @@
      * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
      * @param defaultPrefetchLowMark  The number of prefetched messages at which to resume the session.
      */
-    AMQSession_0_10( org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
-                    MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark,
-                    int defaultPrefetchLowMark)
+    AMQSession_0_10(org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId,
+                    boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry,
+                    int defaultPrefetchHighMark, int defaultPrefetchLowMark)
     {
 
         super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
               defaultPrefetchLowMark);
         // create the qpid session with an expiry  <= 0 so that the session does not expire
-        _qpidSession =  qpidConnection.createSession(0);
+        _qpidSession = qpidConnection.createSession(0);
         // set the exception listnere for this session
         _qpidSession.setExceptionListener(new QpidSessionExceptionListener());
         // set transacted if required
@@ -108,8 +108,8 @@
      * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
      * @param defaultPrefetchLow  The number of prefetched messages at which to resume the session.
      */
-    AMQSession_0_10(org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
-                    int defaultPrefetchLow)
+    AMQSession_0_10(org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId,
+                    boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
     {
 
         this(qpidConnection, con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(),

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=576933&r1=576932&r2=576933&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue Sep 18 07:37:09 2007
@@ -79,7 +79,7 @@
         }
         Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
         // if there is a replyto destination then we need to request the exchange info
-        if (message.getMessageProperties().getReplyTo() != null)
+        if (! message.getMessageProperties().getReplyTo().getExchangeName().equals(""))
         {
             Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession()
                     .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java?rev=576933&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java Tue Sep 18 07:37:09 2007
@@ -0,0 +1,78 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.AMQException;
+
+import javax.jms.*;
+
+/**
+ * This class implements the javax.jms.XAConnection interface
+ */
+public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQueueConnection, XATopicConnection
+{
+    //-- constructor
+    /**
+     * Create a XAConnection from a connectionURL
+     */
+    public XAConnectionImpl(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
+    {
+        super(connectionURL, sslConfig);
+    }
+
+    //-- interface XAConnection
+    /**
+     * Creates an XASession.
+     *
+     * @return A newly created XASession.
+     * @throws JMSException If the XAConnectiono fails to create an XASession due to
+     *                      some internal error.
+     */
+    public synchronized XASession createXASession() throws JMSException
+    {
+        checkNotClosed();
+        return _delegate.createXASession(AMQSession.DEFAULT_PREFETCH_HIGH_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK);
+    }
+
+    //-- Interface  XAQueueConnection
+    /**
+     * Creates an XAQueueSession.
+     *
+     * @return A newly created XASession.
+     * @throws JMSException If the XAQueueConnectionImpl fails to create an XASession due to
+     *                      some internal error.
+     */
+    public XAQueueSession createXAQueueSession() throws JMSException
+    {
+        return (XAQueueSession) createXASession();
+    }
+
+    //-- Interface  XATopicConnection
+    /**
+     * Creates an XAQueueSession.
+     *
+     * @return A newly created XASession.
+     * @throws JMSException If the XAQueueConnectionImpl fails to create an XASession due to
+     *                      some internal error.
+     */
+    public XATopicSession createXATopicSession() throws JMSException
+    {
+        return (XATopicSession) createXASession();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java?rev=576933&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java Tue Sep 18 07:37:09 2007
@@ -0,0 +1,507 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.dtx.XidImpl;
+import org.apache.qpidity.transport.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is an implementation of javax.jms.XAResource.
+ */
+public class XAResourceImpl implements XAResource
+{
+    /**
+     * this XAResourceImpl's logger
+     */
+    private static final Logger _logger = LoggerFactory.getLogger(XAResourceImpl.class);
+
+    /**
+     * Reference to the associated XASession
+     */
+    private XASessionImpl _xaSession = null;
+
+    /**
+     * The XID of this resource
+     */
+    private Xid _xid;
+
+    //--- constructor
+
+    /**
+     * Create an XAResource associated with a XASession
+     *
+     * @param xaSession The session XAresource
+     */
+    protected XAResourceImpl(XASessionImpl xaSession)
+    {
+        _xaSession = xaSession;
+    }
+
+    //--- The XAResource
+    /**
+     * Commits the global transaction specified by xid.
+     *
+     * @param xid A global transaction identifier
+     * @param b   If true, use a one-phase commit protocol to commit the work done on behalf of xid.
+     * @throws XAException An error has occurred. An error has occurred. Possible XAExceptions are XA_HEURHAZ,
+     *                     XA_HEURCOM, XA_HEURRB, XA_HEURMIX, XAER_RMERR, XAER_RMFAIL, XAER_NOTA, XAER_INVAL, or XAER_PROTO.
+     */
+    public void commit(Xid xid, boolean b) throws XAException
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("commit ", xid);
+        }
+        if (xid == null)
+        {
+            throw new XAException(XAException.XAER_PROTO);
+        }
+        Future<DtxCoordinationCommitResult> future;
+        try
+        {
+            future = _xaSession.getQpidSession()
+                    .dtxCoordinationCommit(XidImpl.convertToString(xid), b ? Option.ONE_PHASE : Option.NO_OPTION);
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Cannot convert Xid into String format ", e);
+            }
+            throw new XAException(XAException.XAER_PROTO);
+        }
+        // now wait on the future for the result
+        DtxCoordinationCommitResult result = future.get();
+        int status = result.getStatus();
+        switch (status)
+        {
+            case Constant.XA_OK:
+                // do nothing this ok
+                break;
+            case Constant.XA_HEURHAZ:
+                throw new XAException(XAException.XA_HEURHAZ);
+            case Constant.XA_HEURCOM:
+                throw new XAException(XAException.XA_HEURCOM);
+            case Constant.XA_HEURRB:
+                throw new XAException(XAException.XA_HEURRB);
+            case Constant.XA_HEURMIX:
+                throw new XAException(XAException.XA_HEURMIX);
+            case Constant.XA_RBROLLBACK:
+                throw new XAException(XAException.XA_RBROLLBACK);
+            case Constant.XA_RBTIMEOUT:
+                throw new XAException(XAException.XA_RBTIMEOUT);
+            default:
+                // this should not happen
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("got unexpected status value: ", status);
+                }
+                throw new XAException(XAException.XAER_PROTO);
+        }
+    }
+
+    /**
+     * Ends the work performed on behalf of a transaction branch.
+     * The resource manager disassociates the XA resource from the transaction branch specified
+     * and lets the transaction complete.
+     * <ul>
+     * <li> If TMSUSPEND is specified in the flags, the transaction branch is temporarily suspended in an incomplete state.
+     * The transaction context is in a suspended state and must be resumed via the start method with TMRESUME specified.
+     * <li> If TMFAIL is specified, the portion of work has failed. The resource manager may mark the transaction as rollback-only
+     * <li> If TMSUCCESS is specified, the portion of work has completed successfully.
+     * /ul>
+     *
+     * @param xid  A global transaction identifier that is the same as the identifier used previously in the start method
+     * @param flag One of TMSUCCESS, TMFAIL, or TMSUSPEND.
+     * @throws XAException An error has occurred. An error has occurred. Possible XAException values are XAER_RMERR,
+     *                     XAER_RMFAILED, XAER_NOTA, XAER_INVAL, XAER_PROTO, or XA_RB*.
+     */
+    public void end(Xid xid, int flag) throws XAException
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("end ", xid);
+        }
+        if (xid == null)
+        {
+            throw new XAException(XAException.XAER_PROTO);
+        }
+        Future<DtxDemarcationEndResult> future;
+        try
+        {
+            future = _xaSession.getQpidSession()
+                    .dtxDemarcationEnd(XidImpl.convertToString(xid),
+                                       flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION,
+                                       flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION);
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Cannot convert Xid into String format ", e);
+            }
+            throw new XAException(XAException.XAER_PROTO);
+        }
+        // now wait on the future for the result
+        DtxDemarcationEndResult result = future.get();
+        int status = result.getStatus();
+        switch (status)
+        {
+            case Constant.XA_OK:
+                // do nothing this ok
+                break;
+            case Constant.XA_RBROLLBACK:
+                throw new XAException(XAException.XA_RBROLLBACK);
+            case Constant.XA_RBTIMEOUT:
+                throw new XAException(XAException.XA_RBTIMEOUT);
+            default:
+                // this should not happen
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("got unexpected status value: ", status);
+                }
+                throw new XAException(XAException.XAER_PROTO);
+        }
+    }
+
+    /**
+     * Tells the resource manager to forget about a heuristically completed transaction branch.
+     *
+     * @param xid String(xid.getGlobalTransactionId() A global transaction identifier
+     * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL,
+     *                     XAER_NOTA, XAER_INVAL, or XAER_PROTO.
+     */
+    public void forget(Xid xid) throws XAException
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("forget ", xid);
+        }
+        if (xid == null)
+        {
+            throw new XAException(XAException.XAER_PROTO);
+        }
+        _xaSession.getQpidSession().dtxCoordinationForget(new String(xid.getGlobalTransactionId()));
+    }
+
+    /**
+     * Obtains the current transaction timeout value set for this XAResource instance.
+     * If XAResource.setTransactionTimeout was not used prior to invoking this method,
+     * the return value is the default timeout i.e. 0;
+     * otherwise, the value used in the previous setTransactionTimeout call is returned.
+     *
+     * @return The transaction timeout value in seconds.
+     * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
+     */
+    public int getTransactionTimeout() throws XAException
+    {
+        int result = 0;
+        if (_xid != null)
+        {
+            try
+            {
+                Future<DtxCoordinationGetTimeoutResult> future =
+                        _xaSession.getQpidSession().dtxCoordinationGetTimeout(XidImpl.convertToString(_xid));
+                result = (int) future.get().getTimeout();
+            }
+            catch (QpidException e)
+            {
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("Cannot convert Xid into String format ", e);
+                }
+                throw new XAException(XAException.XAER_PROTO);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * This method is called to determine if the resource manager instance represented
+     * by the target object is the same as the resouce manager instance represented by
+     * the parameter xaResource.
+     *
+     * @param xaResource An XAResource object whose resource manager instance is to
+     *                   be compared with the resource manager instance of the target object
+     * @return <code>true</code> if it's the same RM instance; otherwise <code>false</code>.
+     * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
+     */
+    public boolean isSameRM(XAResource xaResource) throws XAException
+    {
+        // TODO : get the server identity of xaResource and compare it with our own one
+        return false;
+    }
+
+    /**
+     * Prepare for a transaction commit of the transaction specified in <code>Xid</code>.
+     *
+     * @param xid A global transaction identifier.
+     * @return A value indicating the resource manager's vote on the outcome of the transaction.
+     *         The possible values are: XA_RDONLY or XA_OK.
+     * @throws XAException An error has occurred. Possible exception values are: XAER_RMERR or XAER_NOTA
+     */
+    public int prepare(Xid xid) throws XAException
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("prepare ", xid);
+        }
+        if (xid == null)
+        {
+            throw new XAException(XAException.XAER_PROTO);
+        }
+        Future<DtxCoordinationPrepareResult> future;
+        try
+        {
+            future = _xaSession.getQpidSession()
+                    .dtxCoordinationPrepare(XidImpl.convertToString(xid));
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Cannot convert Xid into String format ", e);
+            }
+            throw new XAException(XAException.XAER_PROTO);
+        }
+        DtxCoordinationPrepareResult result = future.get();
+        int status = result.getStatus();
+        int outcome;
+        switch (status)
+        {
+            case Constant.XA_OK:
+                outcome = XAResource.XA_OK;
+                break;
+            case Constant.XA_RDONLY:
+                outcome = XAResource.XA_RDONLY;
+                break;
+            case Constant.XA_RBROLLBACK:
+                throw new XAException(XAException.XA_RBROLLBACK);
+            case Constant.XA_RBTIMEOUT:
+                throw new XAException(XAException.XA_RBTIMEOUT);
+            default:
+                // this should not happen
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("got unexpected status value: ", status);
+                }
+                throw new XAException(XAException.XAER_PROTO);
+        }
+        return outcome;
+    }
+
+    /**
+     * Obtains a list of prepared transaction branches.
+     * <p/>
+     * The transaction manager calls this method during recovery to obtain the list of transaction branches
+     * that are currently in prepared or heuristically completed states.
+     *
+     * @param flag One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS.
+     *             TMNOFLAGS must be used when no other flags are set in the parameter.
+     * @return zero or more XIDs of the transaction branches that are currently in a prepared or heuristically
+     *         completed state.
+     * @throws XAException An error has occurred. Possible value is XAER_INVAL.
+     */
+    public Xid[] recover(int flag) throws XAException
+    {
+        // the flag is ignored 
+        Future<DtxCoordinationRecoverResult> future = _xaSession.getQpidSession().dtxCoordinationRecover();
+        DtxCoordinationRecoverResult res = future.get();
+        // todo make sure that the keys of the returned map are the xids
+        Xid[] result = new Xid[res.getInDoubt().size()];
+        int i = 0;
+        try
+        {
+            for (String xid : res.getInDoubt().keySet())
+            {
+                result[i] = new XidImpl(xid);
+                i++;
+            }
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Cannot convert string into Xid ", e);
+            }
+            throw new XAException(XAException.XAER_PROTO);
+        }
+        return result;
+    }
+
+    /**
+     * Informs the resource manager to roll back work done on behalf of a transaction branch
+     *
+     * @param xid A global transaction identifier.
+     * @throws XAException An error has occurred.
+     */
+    public void rollback(Xid xid) throws XAException
+    {
+        if (xid == null)
+        {
+            throw new XAException(XAException.XAER_PROTO);
+        }
+        //      the flag is ignored
+        Future<DtxCoordinationRollbackResult> future;
+        try
+        {
+            future = _xaSession.getQpidSession()
+                    .dtxCoordinationRollback(XidImpl.convertToString(xid));
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Cannot convert Xid into String format ", e);
+            }
+            throw new XAException(XAException.XAER_PROTO);
+        }
+        // now wait on the future for the result
+        DtxCoordinationRollbackResult result = future.get();
+        int status = result.getStatus();
+        switch (status)
+        {
+            case Constant.XA_OK:
+                // do nothing this ok
+                break;
+            case Constant.XA_HEURHAZ:
+                throw new XAException(XAException.XA_HEURHAZ);
+            case Constant.XA_HEURCOM:
+                throw new XAException(XAException.XA_HEURCOM);
+            case Constant.XA_HEURRB:
+                throw new XAException(XAException.XA_HEURRB);
+            case Constant.XA_HEURMIX:
+                throw new XAException(XAException.XA_HEURMIX);
+            case Constant.XA_RBROLLBACK:
+                throw new XAException(XAException.XA_RBROLLBACK);
+            case Constant.XA_RBTIMEOUT:
+                throw new XAException(XAException.XA_RBTIMEOUT);
+            default:
+                // this should not happen
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("got unexpected status value: ", status);
+                }
+                throw new XAException(XAException.XAER_PROTO);
+        }
+    }
+
+    /**
+     * Sets the current transaction timeout value for this XAResource instance.
+     * Once set, this timeout value is effective until setTransactionTimeout is
+     * invoked again with a different value.
+     * To reset the timeout value to the default value used by the resource manager, set the value to zero.
+     *
+     * @param timeout The transaction timeout value in seconds.
+     * @return true if transaction timeout value is set successfully; otherwise false.
+     * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL, or XAER_INVAL.
+     */
+    public boolean setTransactionTimeout(int timeout) throws XAException
+    {
+        boolean result = false;
+        if (_xid != null)
+        {
+            try
+            {
+                _xaSession.getQpidSession()
+                        .dtxCoordinationSetTimeout(XidImpl.convertToString(_xid), timeout);
+            }
+            catch (QpidException e)
+            {
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("Cannot convert Xid into String format ", e);
+                }
+                throw new XAException(XAException.XAER_PROTO);
+            }
+            result = true;
+        }
+        return result;
+    }
+
+    /**
+     * Starts work on behalf of a transaction branch specified in xid.
+     * <ul>
+     * <li> If TMJOIN is specified, an exception is thrown as it is not supported
+     * <li> If TMRESUME is specified, the start applies to resuming a suspended transaction specified in the parameter xid.
+     * <li> If neither TMJOIN nor TMRESUME is specified and the transaction specified by xid has previously been seen by the
+     * resource manager, the resource manager throws the XAException exception with XAER_DUPID error code.
+     * </ul>
+     *
+     * @param xid  A global transaction identifier to be associated with the resource
+     * @param flag One of TMNOFLAGS, TMJOIN, or TMRESUME
+     * @throws XAException An error has occurred. Possible exceptions
+     *                     are XA_RB*, XAER_RMERR, XAER_RMFAIL, XAER_DUPID, XAER_OUTSIDE, XAER_NOTA, XAER_INVAL, or XAER_PROTO.
+     */
+    public void start(Xid xid, int flag) throws XAException
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("start ", xid);
+        }
+        if (xid == null)
+        {
+            throw new XAException(XAException.XAER_PROTO);
+        }
+        _xid = xid;
+        Future<DtxDemarcationStartResult> future;
+        try
+        {
+            future = _xaSession.getQpidSession()
+                    .dtxDemarcationStart(XidImpl.convertToString(xid),
+                                         flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION,
+                                         flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION);
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Cannot convert Xid into String format ", e);
+            }
+            throw new XAException(XAException.XAER_PROTO);
+        }
+        // now wait on the future for the result
+        DtxDemarcationStartResult result = future.get();
+        int status = result.getStatus();
+        switch (status)
+        {
+            case Constant.XA_OK:
+                // do nothing this ok
+                break;
+            case Constant.XA_RBROLLBACK:
+                throw new XAException(XAException.XA_RBROLLBACK);
+            case Constant.XA_RBTIMEOUT:
+                throw new XAException(XAException.XA_RBTIMEOUT);
+            default:
+                // this should not happen
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("got unexpected status value: ", status);
+                }
+                throw new XAException(XAException.XAER_PROTO);
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java?rev=576933&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java Tue Sep 18 07:37:09 2007
@@ -0,0 +1,147 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpidity.client.DtxSession;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+
+import javax.jms.*;
+import javax.transaction.xa.XAResource;
+
+/**
+ * This is an implementation of the javax.jms.XASEssion interface.
+ */
+public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopicSession, XAQueueSession
+{
+    /**
+     * XAResource associated with this XASession
+     */
+    private final XAResourceImpl _xaResource;
+
+    /**
+     * This XASession Qpid DtxSession
+     */
+    private DtxSession _qpidDtxSession;
+
+    /**
+     * The standard session
+     */
+    private Session _jmsSession;
+
+
+    //-- Constructors
+    /**
+     * Create a JMS XASession
+     */
+    public XASessionImpl(org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId,
+                         int defaultPrefetchHigh, int defaultPrefetchLow)
+    {
+        super(qpidConnection, con, channelId, false,  // this is not a transacted session
+              Session.AUTO_ACKNOWLEDGE, // the ack mode is transacted
+              MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
+        _qpidDtxSession = qpidConnection.createDTXSession(0);
+        _xaResource = new XAResourceImpl(this);
+    }
+
+    //--- javax.jms.XASEssion API
+
+    /**
+     * Gets the session associated with this XASession.
+     *
+     * @return The session object.
+     * @throws JMSException if an internal error occurs.
+     */
+    public Session getSession() throws JMSException
+    {
+        if (_jmsSession == null)
+        {
+            _jmsSession = getAMQConnection().createSession(true, getAcknowledgeMode());
+        }
+        return _jmsSession;
+    }
+
+    /**
+     * Returns an XA resource.
+     *
+     * @return An XA resource.
+     */
+    public XAResource getXAResource()
+    {
+        return _xaResource;
+    }
+
+    //-- overwritten mehtods
+    /**
+     * Throws a {@link TransactionInProgressException}, since it should
+     * not be called for an XASession object.
+     *
+     * @throws TransactionInProgressException always.
+     */
+    public void commit() throws JMSException
+    {
+        throw new TransactionInProgressException(
+                "XASession:  A direct invocation of the commit operation is probibited!");
+    }
+
+    /**
+     * Throws a {@link TransactionInProgressException}, since it should
+     * not be called for an XASession object.
+     *
+     * @throws TransactionInProgressException always.
+     */
+    public void rollback() throws JMSException
+    {
+        throw new TransactionInProgressException(
+                "XASession: A direct invocation of the rollback operation is probibited!");
+    }
+
+    /**
+     * Access to the underlying Qpid Session
+     *
+     * @return The associated Qpid Session.
+     */
+    protected org.apache.qpidity.client.DtxSession getQpidSession()
+    {
+        return _qpidDtxSession;
+    }
+
+    //--- interface  XAQueueSession
+    /**
+     * Gets the topic session associated with this <CODE>XATopicSession</CODE>.
+     *
+     * @return the topic session object
+     * @throws JMSException If an internal error occurs.
+     */
+    public QueueSession getQueueSession() throws JMSException
+    {
+        return (QueueSession) getSession();
+    }
+
+    //--- interface  XATopicSession
+
+    /**
+     * Gets the topic session associated with this <CODE>XATopicSession</CODE>.
+     *
+     * @return the topic session object
+     * @throws JMSException If an internal error occurs.
+     */
+    public TopicSession getTopicSession() throws JMSException
+    {
+        return (TopicSession) getSession();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native