You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/06/11 14:22:48 UTC
svn commit: r1684863 - in
/qpid/java/trunk/client/src/main/java/org/apache/qpid/client:
AMQConnection.java AMQSession_0_10.java
Author: rgodfrey
Date: Thu Jun 11 12:22:47 2015
New Revision: 1684863
URL: http://svn.apache.org/r1684863
Log:
QPID-6583 : [Java Client] AMQSession_0_10 should use connection task pool rather than a Timer
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1684863&r1=1684862&r2=1684863&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java Thu Jun 11 12:22:47 2015
@@ -47,6 +47,8 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -170,7 +172,7 @@ public class AMQConnection extends Close
* Thread Pool for executing connection level processes such as reporting asynchronous exceptions
* and for 0-8..0-91 returning bounced messages.
*/
- private final ExecutorService _taskPool = Executors.newSingleThreadExecutor(new ThreadFactory()
+ private final ScheduledExecutorService _taskPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactory()
{
@Override
public Thread newThread(final Runnable r)
@@ -1658,6 +1660,11 @@ public class AMQConnection extends Close
}
}
+ ScheduledFuture<?> scheduleTask(Runnable task, long initialDelay, long period, TimeUnit timeUnit)
+ {
+ return _taskPool.scheduleAtFixedRate(task, initialDelay, period, timeUnit);
+ }
+
public AMQSession getSession(int channelId)
{
return _sessions.get(channelId);
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1684863&r1=1684862&r2=1684863&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Jun 11 12:22:47 2015
@@ -25,15 +25,15 @@ import static org.apache.qpid.transport.
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
-import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -48,7 +48,6 @@ import org.apache.qpid.client.failover.F
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.client.messaging.address.AddressHelper;
import org.apache.qpid.client.messaging.address.Link;
@@ -72,23 +71,33 @@ public class AMQSession_0_10 extends AMQ
*/
private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class);
- private static Timer timer = new Timer("ack-flusher", true);
private final String _name;
- private static class Flusher extends TimerTask
+ private static class Flusher implements Runnable
{
private WeakReference<AMQSession_0_10> session;
+ private ScheduledFuture<?> _future;
+
public Flusher(AMQSession_0_10 session)
{
this.session = new WeakReference<AMQSession_0_10>(session);
}
- public void run() {
+ public void setFuture(final ScheduledFuture<?> future)
+ {
+ _future = future;
+ }
+
+ public void run()
+ {
AMQSession_0_10 ssn = session.get();
if (ssn == null)
{
- cancel();
+ if(_future != null)
+ {
+ _future.cancel(false);
+ }
}
else
{
@@ -120,7 +129,7 @@ public class AMQSession_0_10 extends AMQ
private org.apache.qpid.transport.Connection _qpidConnection;
private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000);
- private TimerTask flushTask = null;
+ private ScheduledFuture<?> _flushTaskFuture = null;
private RangeSet unacked = RangeSetFactory.createRangeSet();
private int unackedCount = 0;
@@ -156,8 +165,9 @@ public class AMQSession_0_10 extends AMQ
if (maxAckDelay > 0)
{
- flushTask = new Flusher(this);
- timer.schedule(flushTask, new Date(), maxAckDelay);
+ Flusher flusher = new Flusher(this);
+ _flushTaskFuture = con.scheduleTask(flusher, 0, maxAckDelay, TimeUnit.MILLISECONDS);
+ flusher.setFuture(_flushTaskFuture);
}
}
@@ -911,10 +921,10 @@ public class AMQSession_0_10 extends AMQ
try
{
super.closed(null);
- if (flushTask != null)
+ if (_flushTaskFuture != null)
{
- flushTask.cancel();
- flushTask = null;
+ _flushTaskFuture.cancel(false);
+ _flushTaskFuture = null;
}
} catch (Exception e)
{
@@ -1334,10 +1344,10 @@ public class AMQSession_0_10 extends AMQ
private void cancelTimerTask()
{
- if (flushTask != null)
+ if (_flushTaskFuture != null)
{
- flushTask.cancel();
- flushTask = null;
+ _flushTaskFuture.cancel(false);
+ _flushTaskFuture = null;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org