You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/06/11 14:22:48 UTC

svn commit: r1684863 - in /qpid/java/trunk/client/src/main/java/org/apache/qpid/client: AMQConnection.java AMQSession_0_10.java

Author: rgodfrey
Date: Thu Jun 11 12:22:47 2015
New Revision: 1684863

URL: http://svn.apache.org/r1684863
Log:
QPID-6583 : [Java Client] AMQSession_0_10 should use connection task pool rather than a Timer

Modified:
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1684863&r1=1684862&r2=1684863&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java Thu Jun 11 12:22:47 2015
@@ -47,6 +47,8 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -170,7 +172,7 @@ public class AMQConnection extends Close
      * Thread Pool for executing connection level processes such as reporting asynchronous exceptions
      * and for 0-8..0-91 returning bounced messages.
      */
-    private final ExecutorService _taskPool = Executors.newSingleThreadExecutor(new ThreadFactory()
+    private final ScheduledExecutorService _taskPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactory()
     {
         @Override
         public Thread newThread(final Runnable r)
@@ -1658,6 +1660,11 @@ public class AMQConnection extends Close
         }
     }
 
+    ScheduledFuture<?> scheduleTask(Runnable task, long initialDelay, long period, TimeUnit timeUnit)
+    {
+        return _taskPool.scheduleAtFixedRate(task, initialDelay, period, timeUnit);
+    }
+
     public AMQSession getSession(int channelId)
     {
         return _sessions.get(channelId);

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1684863&r1=1684862&r2=1684863&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Jun 11 12:22:47 2015
@@ -25,15 +25,15 @@ import static org.apache.qpid.transport.
 import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Timer;
-import java.util.TimerTask;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -48,7 +48,6 @@ import org.apache.qpid.client.failover.F
 import org.apache.qpid.client.failover.FailoverNoopSupport;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
 import org.apache.qpid.client.message.AMQMessageDelegateFactory;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage_0_10;
 import org.apache.qpid.client.messaging.address.AddressHelper;
 import org.apache.qpid.client.messaging.address.Link;
@@ -72,23 +71,33 @@ public class AMQSession_0_10 extends AMQ
      */
     private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class);
 
-    private static Timer timer = new Timer("ack-flusher", true);
     private final String _name;
 
-    private static class Flusher extends TimerTask
+    private static class Flusher implements Runnable
     {
 
         private WeakReference<AMQSession_0_10> session;
+        private ScheduledFuture<?> _future;
+
         public Flusher(AMQSession_0_10 session)
         {
             this.session = new WeakReference<AMQSession_0_10>(session);
         }
 
-        public void run() {
+        public void setFuture(final ScheduledFuture<?> future)
+        {
+            _future = future;
+        }
+
+        public void run()
+        {
             AMQSession_0_10 ssn = session.get();
             if (ssn == null)
             {
-                cancel();
+                if(_future != null)
+                {
+                    _future.cancel(false);
+                }
             }
             else
             {
@@ -120,7 +129,7 @@ public class AMQSession_0_10 extends AMQ
     private org.apache.qpid.transport.Connection _qpidConnection;
 
     private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000);
-    private TimerTask flushTask = null;
+    private ScheduledFuture<?> _flushTaskFuture = null;
     private RangeSet unacked = RangeSetFactory.createRangeSet();
     private int unackedCount = 0;
 
@@ -156,8 +165,9 @@ public class AMQSession_0_10 extends AMQ
 
         if (maxAckDelay > 0)
         {
-            flushTask = new Flusher(this);
-            timer.schedule(flushTask, new Date(), maxAckDelay);
+            Flusher flusher = new Flusher(this);
+            _flushTaskFuture = con.scheduleTask(flusher, 0, maxAckDelay, TimeUnit.MILLISECONDS);
+            flusher.setFuture(_flushTaskFuture);
         }
     }
 
@@ -911,10 +921,10 @@ public class AMQSession_0_10 extends AMQ
         try
         {
             super.closed(null);
-            if (flushTask != null)
+            if (_flushTaskFuture != null)
             {
-                flushTask.cancel();
-                flushTask = null;
+                _flushTaskFuture.cancel(false);
+                _flushTaskFuture = null;
             }
         } catch (Exception e)
         {
@@ -1334,10 +1344,10 @@ public class AMQSession_0_10 extends AMQ
 
     private void cancelTimerTask()
     {
-        if (flushTask != null)
+        if (_flushTaskFuture != null)
         {
-            flushTask.cancel();
-            flushTask = null;
+            _flushTaskFuture.cancel(false);
+            _flushTaskFuture = null;
         }
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org