You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2011/12/12 17:04:27 UTC
svn commit: r1213290 - in
/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client:
AMQConnectionDelegate.java AMQConnectionDelegate_0_10.java
AMQConnectionDelegate_8_0.java XAConnectionImpl.java XASessionImpl.java
Author: rajith
Date: Mon Dec 12 16:04:27 2011
New Revision: 1213290
URL: http://svn.apache.org/viewvc?rev=1213290&view=rev
Log:
QPID-3625 Comitting a patch from Weston Price.
(cherry picked from commit e7b630fd35055c9b80e3799c6939c7f3687b0c02)
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=1213290&r1=1213289&r2=1213290&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Mon Dec 12 16:04:27 2011
@@ -50,6 +50,8 @@ public interface AMQConnectionDelegate
XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException;
+ XASession createXASession(int ackMode) throws JMSException;
+
void failoverPrep();
void resubscribeSessions() throws JMSException, AMQException, FailoverException;
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1213290&r1=1213289&r2=1213290&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Mon Dec 12 16:04:27 2011
@@ -164,6 +164,35 @@ public class AMQConnectionDelegate_0_10
return session;
}
+ @Override
+ public XASession createXASession(int ackMode)
+ throws JMSException
+ {
+
+ _conn.checkNotClosed();
+
+ if (_conn.channelLimitReached())
+ {
+ throw new ChannelLimitReachedException(_conn.getMaximumChannelCount());
+ }
+
+ int channelId = _conn.getNextChannelID();
+ XASessionImpl session;
+ try
+ {
+ session = new XASessionImpl(_qpidConnection, _conn, channelId, ackMode, (int)_conn.getMaxPrefetch(), (int)_conn.getMaxPrefetch() / 2);
+ _conn.registerSession(channelId, session);
+ if (_conn._started)
+ {
+ session.start();
+ }
+ }
+ catch (Exception e)
+ {
+ throw new JMSAMQException("cannot create session", e);
+ }
+ return session;
+ }
/**
* Make a connection with the broker
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1213290&r1=1213289&r2=1213290&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Mon Dec 12 16:04:27 2011
@@ -161,8 +161,8 @@ public class AMQConnectionDelegate_8_0 i
_conn._failoverPolicy.attainedConnection();
_conn._connected = true;
return null;
- }
- else
+ }
+ else
{
return _conn._protocolHandler.getSuggestedProtocolVersion();
}
@@ -175,11 +175,16 @@ public class AMQConnectionDelegate_8_0 i
return createSession(transacted, acknowledgeMode, prefetch, prefetch);
}
+
public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException
{
throw new UnsupportedOperationException("0_8 version does not provide XA support");
}
+ public XASession createXASession(int ackMode) throws JMSException
+ {
+ throw new UnsupportedOperationException("0_8 version does not provide XA support");
+ }
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
final int prefetchHigh, final int prefetchLow) throws JMSException
{
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java?rev=1213290&r1=1213289&r2=1213290&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java Mon Dec 12 16:04:27 2011
@@ -5,9 +5,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -75,4 +75,11 @@ public class XAConnectionImpl extends AM
{
return (XATopicSession) createXASession();
}
+
+ //Specialized call for JCA
+ public XASession createXASession(int ackMode) throws JMSException
+ {
+ checkNotClosed();
+ return _delegate.createXASession(ackMode);
+ }
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java?rev=1213290&r1=1213289&r2=1213290&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java Mon Dec 12 16:04:27 2011
@@ -6,7 +6,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE 2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -43,21 +43,36 @@ public class XASessionImpl extends AMQSe
private Session _jmsSession;
- //-- Constructors
+ // Constructors
/**
* Create a JMS XASession
*/
public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
int defaultPrefetchHigh, int defaultPrefetchLow)
{
- super(qpidConnection, con, channelId, false, // this is not a transacted session
- Session.AUTO_ACKNOWLEDGE,
- MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow,null);
+ this(qpidConnection, con, channelId, false, Session.AUTO_ACKNOWLEDGE,
+ MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow, null);
+ }
+
+ public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
+ int ackMode, int defaultPrefetchHigh, int defaultPrefetchLow)
+ {
+ this(qpidConnection, con, channelId, false, ackMode, MessageFactoryRegistry.newDefaultRegistry(),
+ defaultPrefetchHigh, defaultPrefetchLow, null);
+
+ }
+
+ public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
+ boolean transacted, int ackMode, MessageFactoryRegistry registry, int defaultPrefetchHigh, int defaultPrefetchLow,
+ String name)
+ {
+ super(qpidConnection, con, channelId, transacted, ackMode, registry, defaultPrefetchHigh, defaultPrefetchLow, name);
createSession();
_xaResource = new XAResourceImpl(this);
- }
+ }
+
- //-- public methods
+ // public methods
/**
* Create a qpid session.
@@ -70,7 +85,7 @@ public class XASessionImpl extends AMQSe
}
- //--- javax.njms.XASEssion API
+ // javax.njms.XASEssion API
/**
* Gets the session associated with this XASession.
@@ -97,7 +112,7 @@ public class XASessionImpl extends AMQSe
return _xaResource;
}
- //-- overwritten mehtods
+ // overwritten mehtods
/**
* Throws a {@link TransactionInProgressException}, since it should
* not be called for an XASession object.
@@ -132,7 +147,7 @@ public class XASessionImpl extends AMQSe
return _qpidDtxSession;
}
- //--- interface XAQueueSession
+ // interface XAQueueSession
/**
* Gets the topic session associated with this <CODE>XATopicSession</CODE>.
*
@@ -144,7 +159,7 @@ public class XASessionImpl extends AMQSe
return (QueueSession) getSession();
}
- //--- interface XATopicSession
+ // interface XATopicSession
/**
* Gets the topic session associated with this <CODE>XATopicSession</CODE>.
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org