You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/06/18 19:23:50 UTC

svn commit: r1494214 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ common/src/main/java/org/apache/qpid/transport/

Author: rgodfrey
Date: Tue Jun 18 17:23:50 2013
New Revision: 1494214

URL: http://svn.apache.org/r1494214
Log:
QPID-4934 : [Java XA] Stop redundant session creation for XA Sessions, improve logging for XA

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Struct.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1494214&r1=1494213&r2=1494214&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue Jun 18 17:23:50 2013
@@ -24,7 +24,6 @@ import static org.apache.qpid.transport.
 
 import java.lang.ref.WeakReference;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -95,6 +94,7 @@ public class AMQSession_0_10 extends AMQ
     private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class);
 
     private static Timer timer = new Timer("ack-flusher", true);
+    private final String _name;
 
     private static class Flusher extends TimerTask
     {
@@ -153,6 +153,7 @@ public class AMQSession_0_10 extends AMQ
     private boolean _isHardError = Boolean.getBoolean("qpid.session.legacy_exception_behaviour");
     //--- constructors
 
+
     /**
      * Creates a new session on a connection.
      *
@@ -173,28 +174,38 @@ public class AMQSession_0_10 extends AMQ
         super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
               defaultPrefetchLowMark);
         _qpidConnection = qpidConnection;
-        if (name == null)
+        _name = name;
+        _qpidSession = createSession();
+
+        if (maxAckDelay > 0)
+        {
+            flushTask = new Flusher(this);
+            timer.schedule(flushTask, new Date(), maxAckDelay);
+        }
+    }
+
+    protected Session createSession()
+    {
+        Session qpidSession;
+        if (_name == null)
         {
-            _qpidSession = _qpidConnection.createSession(1);
+            qpidSession = _qpidConnection.createSession(1);
         }
         else
         {
-            _qpidSession = _qpidConnection.createSession(name,1);
+            qpidSession = _qpidConnection.createSession(_name,1);
         }
-        _qpidSession.setSessionListener(this);
         if (isTransacted())
         {
-            _qpidSession.txSelect();
-            _qpidSession.setTransacted(true);
+            qpidSession.txSelect();
+            qpidSession.setTransacted(true);
         }
+        qpidSession.setSessionListener(this);
 
-        if (maxAckDelay > 0)
-        {
-            flushTask = new Flusher(this);
-            timer.schedule(flushTask, new Date(), maxAckDelay);
-        }
+        return qpidSession;
     }
 
+
     /**
      * Creates a new session on a connection with the default 0-10 message factory.
      *

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java?rev=1494214&r1=1494213&r2=1494214&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java Tue Jun 18 17:23:50 2013
@@ -88,7 +88,7 @@ public class XAResourceImpl implements A
     {
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("commit tx branch with xid:  ", xid);
+            _logger.debug("commit tx branch with xid: {} ", xid);
         }
         Future<XaResult> future =
                 _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NONE);
@@ -132,7 +132,7 @@ public class XAResourceImpl implements A
     {
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("end tx branch with xid: ", xid);
+            _logger.debug("end tx branch with xid: {}", xid);
         }
         switch (flag)
         {
@@ -191,7 +191,7 @@ public class XAResourceImpl implements A
     {
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("forget tx branch with xid: ", xid);
+            _logger.debug("forget tx branch with xid: {}", xid);
         }
         _xaSession.getQpidSession().dtxForget(convertXid(xid));
         try
@@ -281,7 +281,7 @@ public class XAResourceImpl implements A
     {
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("prepare ", xid);
+            _logger.debug("prepare {}", xid);
         }
         Future<XaResult> future = _xaSession.getQpidSession().dtxPrepare(convertXid(xid));
         XaResult result = null;
@@ -361,7 +361,7 @@ public class XAResourceImpl implements A
     {
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("rollback tx branch with xid: ", xid);
+            _logger.debug("rollback tx branch with xid: {}", xid);
         }
 
         Future<XaResult> future = _xaSession.getQpidSession().dtxRollback(convertXid(xid));
@@ -428,7 +428,7 @@ public class XAResourceImpl implements A
     {
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("start tx branch with xid: ", xid);
+            _logger.debug("start tx branch with xid: {}", xid);
         }
         switch (flag)
         {
@@ -524,7 +524,7 @@ public class XAResourceImpl implements A
                 // this should not happen
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("got unexpected status value: ", status);
+                    _logger.debug("got unexpected status value: {}", status);
                 }
                 //A resource manager error has occured in the transaction branch.
                 throw new XAException(XAException.XAER_RMERR);

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java?rev=1494214&r1=1494213&r2=1494214&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java Tue Jun 18 17:23:50 2013
@@ -75,8 +75,15 @@ public class XASessionImpl extends AMQSe
                boolean transacted, int ackMode, MessageFactoryRegistry registry, int defaultPrefetchHigh, int defaultPrefetchLow,
                String name)
      {
-        super(qpidConnection, con, channelId, transacted, ackMode, registry, defaultPrefetchHigh, defaultPrefetchLow, name);
-        createSession();
+        super(qpidConnection,
+              con,
+              channelId,
+              transacted,
+              ackMode,
+              registry,
+              defaultPrefetchHigh,
+              defaultPrefetchLow,
+              name);
         _xaResource = new XAResourceImpl(this);
      }
 
@@ -86,11 +93,13 @@ public class XASessionImpl extends AMQSe
     /**
      * Create a qpid session.
      */
-    public void createSession()
+    @Override
+    public org.apache.qpid.transport.Session createSession()
     {
         _qpidDtxSession = getQpidConnection().createSession(0,true);
-        _qpidDtxSession.setSessionListener(this);
         _qpidDtxSession.dtxSelect();
+        _qpidDtxSession.setSessionListener(this);
+        return _qpidDtxSession;
     }
 
     /**
@@ -101,11 +110,7 @@ public class XASessionImpl extends AMQSe
      */
     public Session getSession() throws JMSException
     {
-        if (_jmsSession == null)
-        {
-            _jmsSession = getAMQConnection().createSession(true, getAcknowledgeMode());
-        }
-        return _jmsSession;
+        return this;
     }
 
     /**
@@ -162,7 +167,7 @@ public class XASessionImpl extends AMQSe
      */
     public QueueSession getQueueSession() throws JMSException
     {
-        return (QueueSession) getSession();
+        return this;
     }
 
     //    interface  XATopicSession
@@ -175,7 +180,7 @@ public class XASessionImpl extends AMQSe
      */
     public TopicSession getTopicSession() throws JMSException
     {
-        return (TopicSession) getSession();
+        return this;
     }
 
     @Override

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Struct.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Struct.java?rev=1494214&r1=1494213&r2=1494214&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Struct.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Struct.java Tue Jun 18 17:23:50 2013
@@ -23,7 +23,9 @@ package org.apache.qpid.transport;
 import org.apache.qpid.transport.codec.Decoder;
 import org.apache.qpid.transport.codec.Encodable;
 import org.apache.qpid.transport.codec.Encoder;
+import org.apache.qpid.transport.util.Functions;
 
+import java.util.Arrays;
 import java.util.Map;
 
 
@@ -131,11 +133,24 @@ public abstract class Struct implements 
             }
             str.append(me.getKey());
             str.append("=");
-            str.append(me.getValue());
+            str.append(formatValue(me.getValue()));
         }
         str.append(")");
 
         return str.toString();
     }
 
+    private Object formatValue(Object value)
+    {
+        if(value instanceof byte[])
+        {
+            return Functions.str((byte[])value);
+        }
+        else if(value instanceof Object[])
+        {
+            return Arrays.asList((Object[])value);
+        }
+        return value;
+    }
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org