You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/09/18 12:12:31 UTC

svn commit: r1387079 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/thread/ test/java/org/apache/activemq/

Author: tabish
Date: Tue Sep 18 10:12:30 2012
New Revision: 1387079

URL: http://svn.apache.org/viewvc?rev=1387079&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3664

Added now option to connection and consumer to allow for configuration of a time interval upon which all the outstanding acks are delivered when optimized acknowledge is used so that a long running consumer that doesn't receive any more messages will eventually ack the last few unacked messages.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OptimizedAckTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1387079&r1=1387078&r2=1387079&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Tue Sep 18 10:12:30 2012
@@ -146,6 +146,7 @@ public class ActiveMQConnection implemen
     private boolean useAsyncSend;
     private boolean optimizeAcknowledge;
     private long optimizeAcknowledgeTimeOut = 0;
+    private long optimizedAckScheduledAckInterval = 0;
     private boolean nestedMapAndListEnabled = true;
     private boolean useRetroactiveConsumer;
     private boolean exclusiveConsumer;
@@ -484,8 +485,7 @@ public class ActiveMQConnection implemen
      *
      * @return the listener or <code>null</code> if no listener is registered with the connection.
      */
-    public ClientInternalExceptionListener getClientInternalExceptionListener()
-    {
+    public ClientInternalExceptionListener getClientInternalExceptionListener() {
         return clientInternalExceptionListener;
     }
 
@@ -498,8 +498,7 @@ public class ActiveMQConnection implemen
      *
      * @param listener the exception listener
      */
-    public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
-    {
+    public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) {
         this.clientInternalExceptionListener = listener;
     }
 
@@ -1775,7 +1774,6 @@ public class ActiveMQConnection implemen
         this.sendAcksAsync = sendAcksAsync;
     }
 
-
     /**
      * Returns the time this connection was created
      */
@@ -1901,8 +1899,8 @@ public class ActiveMQConnection implemen
             } catch (Exception e) {
                 onClientInternalException(e);
             }
-
         }
+
         for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
             TransportListener listener = iter.next();
             listener.onCommand(command);
@@ -1937,6 +1935,7 @@ public class ActiveMQConnection implemen
             }
         }
     }
+
     /**
      * Used for handling async exceptions
      *
@@ -1976,8 +1975,7 @@ public class ActiveMQConnection implemen
                     } catch (JMSException e) {
                         LOG.warn("Exception during connection cleanup, " + e, e);
                     }
-                    for (Iterator<TransportListener> iter = transportListeners
-                            .iterator(); iter.hasNext();) {
+                    for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
                         TransportListener listener = iter.next();
                         listener.onException(error);
                     }
@@ -2051,9 +2049,8 @@ public class ActiveMQConnection implemen
 
         checkClosedOrFailed();
 
-        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
-            ActiveMQSession s = i.next();
-            if (s.isInUse(destination)) {
+        for (ActiveMQSession session : this.sessions) {
+            if (session.isInUse(destination)) {
                 throw new JMSException("A consumer is consuming from the temporary destination");
             }
         }
@@ -2109,7 +2106,6 @@ public class ActiveMQConnection implemen
         info.setDestination(destination);
         info.setTimeout(0);
         syncSendPacket(info);
-
     }
 
     public boolean isDispatchAsync() {
@@ -2160,8 +2156,6 @@ public class ActiveMQConnection implemen
         return createInputStream(dest, messageSelector, noLocal,  -1);
     }
 
-
-
     public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
         return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
     }
@@ -2276,12 +2270,9 @@ public class ActiveMQConnection implemen
 
         msg.setJMSExpiration(expiration);
         msg.setJMSPriority(priority);
-
         msg.setJMSRedelivered(false);
         msg.setMessageId(messageId);
-
         msg.onSend();
-
         msg.setProducerId(msg.getMessageId().getProducerId());
 
         if (LOG.isDebugEnabled()) {
@@ -2293,7 +2284,6 @@ public class ActiveMQConnection implemen
         } else {
             syncSendPacket(msg);
         }
-
     }
 
     public void addOutputStream(ActiveMQOutputStream stream) {
@@ -2319,13 +2309,15 @@ public class ActiveMQConnection implemen
                 LOG.info("JVM told to shutdown");
                 System.exit(0);
             }
-            if (false && "close".equals(text)){
-                LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
-                try {
-                    close();
-                } catch (JMSException e) {
-                }
-            }
+
+            // TODO Should we handle the "close" case?
+            // if (false && "close".equals(text)){
+            //     LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
+            //     try {
+            //         close();
+            //     } catch (JMSException e) {
+            //     }
+            // }
         }
     }
 
@@ -2341,14 +2333,12 @@ public class ActiveMQConnection implemen
 
     protected void onConsumerControl(ConsumerControl command) {
         if (command.isClose()) {
-            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
-                ActiveMQSession s = i.next();
-                s.close(command.getConsumerId());
+            for (ActiveMQSession session : this.sessions) {
+                session.close(command.getConsumerId());
             }
         } else {
-            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
-                ActiveMQSession s = i.next();
-                s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
+            for (ActiveMQSession session : this.sessions) {
+                session.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
             }
         }
     }
@@ -2517,7 +2507,6 @@ public class ActiveMQConnection implemen
         this.checkForDuplicates = checkForDuplicates;
     }
 
-
     public boolean isTransactedIndividualAck() {
         return transactedIndividualAck;
     }
@@ -2607,4 +2596,25 @@ public class ActiveMQConnection implemen
     public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
         this.rejectedTaskHandler = rejectedTaskHandler;
     }
+
+    /**
+     * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
+     * to send an ack for any outstanding Message Acks.  By default this value is set to zero meaning that the consumers
+     * will not do any background Message acknowledgment.
+     *
+     * @return the scheduledOptimizedAckInterval
+     */
+    public long getOptimizedAckScheduledAckInterval() {
+        return optimizedAckScheduledAckInterval;
+    }
+
+    /**
+     * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
+     * have been configured with optimizeAcknowledge enabled.
+     *
+     * @param scheduledOptimizedAckInterval the scheduledOptimizedAckInterval to set
+     */
+    public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
+        this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=1387079&r1=1387078&r2=1387079&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Tue Sep 18 10:12:30 2012
@@ -92,6 +92,7 @@ public class ActiveMQConnectionFactory e
     private boolean disableTimeStampsByDefault;
     private boolean optimizedMessageDispatch = true;
     private long optimizeAcknowledgeTimeOut = 300;
+    private long optimizedAckScheduledAckInterval = 0;
     private boolean copyMessageOnSend = true;
     private boolean useCompression;
     private boolean objectMessageSerializationDefered;
@@ -312,6 +313,7 @@ public class ActiveMQConnectionFactory e
         connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
         connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
         connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut());
+        connection.setOptimizedAckScheduledAckInterval(getOptimizedAckScheduledAckInterval());
         connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
         connection.setExclusiveConsumer(isExclusiveConsumer());
         connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap());
@@ -1117,4 +1119,25 @@ public class ActiveMQConnectionFactory e
     public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
         this.rejectedTaskHandler = rejectedTaskHandler;
     }
+
+    /**
+     * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
+     * to send an ack for any outstanding Message Acks.  By default this value is set to zero meaning that the consumers
+     * will not do any background Message acknowledgment.
+     *
+     * @return the scheduledOptimizedAckInterval
+     */
+    public long getOptimizedAckScheduledAckInterval() {
+        return optimizedAckScheduledAckInterval;
+    }
+
+    /**
+     * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
+     * have been configured with optimizeAcknowledge enabled.
+     *
+     * @param scheduledOptimizedAckInterval the scheduledOptimizedAckInterval to set
+     */
+    public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
+        this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1387079&r1=1387078&r2=1387079&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Tue Sep 18 10:12:30 2012
@@ -152,6 +152,8 @@ public class ActiveMQMessageConsumer imp
 
     private long optimizeAckTimestamp = System.currentTimeMillis();
     private long optimizeAcknowledgeTimeOut = 0;
+    private long optimizedAckScheduledAckInterval = 0;
+    private Runnable optimizedAckTask;
     private long failoverRedeliveryWaitPeriod = 0;
     private boolean transactedIndividualAck = false;
     private boolean nonBlockingRedelivery = false;
@@ -189,13 +191,11 @@ public class ActiveMQMessageConsumer imp
             String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
 
             if (physicalName.indexOf(connectionID) < 0) {
-                throw new InvalidDestinationException(
-                                                      "Cannot use a Temporary destination from another Connection");
+                throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
             }
 
             if (session.connection.isDeleted(dest)) {
-                throw new InvalidDestinationException(
-                                                      "Cannot use a Temporary destination that has been deleted");
+                throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
             }
             if (prefetch < 0) {
                 throw new JMSException("Cannot have a prefetch size less than zero");
@@ -258,7 +258,9 @@ public class ActiveMQMessageConsumer imp
                                    && !info.isBrowser();
         if (this.optimizeAcknowledge) {
             this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut();
+            setOptimizedAckScheduledAckInterval(session.connection.getOptimizedAckScheduledAckInterval());
         }
+
         this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
         this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
         this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
@@ -415,8 +417,7 @@ public class ActiveMQMessageConsumer imp
     public void setMessageListener(MessageListener listener) throws JMSException {
         checkClosed();
         if (info.getPrefetchSize() == 0) {
-            throw new JMSException(
-                                   "Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
+            throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
         }
         if (listener != null) {
             boolean wasRunning = session.isRunning();
@@ -551,7 +552,7 @@ public class ActiveMQMessageConsumer imp
                     session.acknowledge();
                 }
             });
-        }else if (session.isIndividualAcknowledge()) {
+        } else if (session.isIndividualAcknowledge()) {
             m.setAcknowledgeCallback(new Callback() {
                 public void execute() throws Exception {
                     session.checkClosed();
@@ -683,7 +684,8 @@ public class ActiveMQMessageConsumer imp
         this.session.asyncSendPacket(removeCommand);
         if (interrupted) {
             Thread.currentThread().interrupt();
-        }    }
+        }
+    }
 
     void inProgressClearRequired() {
         inProgressClearRequiredFlag.incrementAndGet();
@@ -772,6 +774,10 @@ public class ActiveMQMessageConsumer imp
                 ThreadPoolUtils.shutdownGraceful(executorService, 60000L);
                 executorService = null;
             }
+            if (optimizedAckTask != null) {
+                this.session.connection.getScheduler().cancel(optimizedAckTask);
+                optimizedAckTask = null;
+            }
 
             if (session.isClientAcknowledge()) {
                 if (!this.info.isBrowser()) {
@@ -888,8 +894,8 @@ public class ActiveMQMessageConsumer imp
                         if (!deliveredMessages.isEmpty()) {
                             if (optimizeAcknowledge) {
                                 ackCounter++;
-                                
-                                // AMQ-3956 evaluate both expired and normal msgs as 
+
+                                // AMQ-3956 evaluate both expired and normal msgs as
                                 // otherwise consumer may get stalled
                                 if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
                                     MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
@@ -899,16 +905,16 @@ public class ActiveMQMessageConsumer imp
                                         session.sendAck(ack);
                                         optimizeAckTimestamp = System.currentTimeMillis();
                                     }
-                                    // AMQ-3956 - as further optimization send 
+                                    // AMQ-3956 - as further optimization send
                                     // ack for expired msgs when there are any.
                                     // This resets the deliveredCounter to 0 so that
                                     // we won't sent standard acks with every msg just
-                                    // because the deliveredCounter just below 
+                                    // because the deliveredCounter just below
                                     // 0.5 * prefetch as used in ackLater()
                                     if (pendingAck != null && deliveredCounter > 0) {
-                                    	session.sendAck(pendingAck);
-                                    	pendingAck = null;
-                                    	deliveredCounter = 0;
+                                        session.sendAck(pendingAck);
+                                        pendingAck = null;
+                                        deliveredCounter = 0;
                                     }
                                 }
                             } else {
@@ -989,7 +995,7 @@ public class ActiveMQMessageConsumer imp
                 }
             }
         }
-        // AMQ-3956 evaluate both expired and normal msgs as 
+        // AMQ-3956 evaluate both expired and normal msgs as
         // otherwise consumer may get stalled
         if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) {
             session.sendAck(pendingAck);
@@ -1471,4 +1477,55 @@ public class ActiveMQMessageConsumer imp
     public void setFailureError(IOException failureError) {
         this.failureError = failureError;
     }
+
+    /**
+     * @return the optimizedAckScheduledAckInterval
+     */
+    public long getOptimizedAckScheduledAckInterval() {
+        return optimizedAckScheduledAckInterval;
+    }
+
+    /**
+     * @param optimizedAckScheduledAckInterval the optimizedAckScheduledAckInterval to set
+     */
+    public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) throws JMSException {
+        this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
+
+        if (this.optimizedAckTask != null) {
+            try {
+                this.session.connection.getScheduler().cancel(optimizedAckTask);
+            } catch (JMSException e) {
+                LOG.debug("Caught exception while cancelling old optimized ack task", e);
+                throw e;
+            }
+            this.optimizedAckTask = null;
+        }
+
+        // Should we periodically send out all outstanding acks.
+        if (this.optimizeAcknowledge && this.optimizedAckScheduledAckInterval > 0) {
+            this.optimizedAckTask = new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        if (optimizeAcknowledge && !unconsumedMessages.isClosed()) {
+                            if (LOG.isInfoEnabled()) {
+                                LOG.info("Consumer:{} is performing scheduled delivery of outstanding optimized Acks", info.getConsumerId());
+                            }
+                            deliverAcks();
+                        }
+                    } catch (Exception e) {
+                        LOG.debug("Optimized Ack Task caught exception during ack", e);
+                    }
+                }
+            };
+
+            try {
+                this.session.connection.getScheduler().executePeriodically(optimizedAckTask, optimizedAckScheduledAckInterval);
+            } catch (JMSException e) {
+                LOG.debug("Caught exception while scheduling new optimized ack task", e);
+                throw e;
+            }
+        }
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java?rev=1387079&r1=1387078&r2=1387079&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java Tue Sep 18 10:12:30 2012
@@ -19,6 +19,7 @@ package org.apache.activemq.thread;
 import java.util.HashMap;
 import java.util.Timer;
 import java.util.TimerTask;
+
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
 
@@ -54,7 +55,7 @@ public final class Scheduler extends Ser
         TimerTask ticket = timerTasks.remove(task);
         if (ticket != null) {
             ticket.cancel();
-            timer.purge();//remove cancelled TimerTasks
+            timer.purge(); // remove cancelled TimerTasks
         }
     }
 
@@ -70,7 +71,6 @@ public final class Scheduler extends Ser
     @Override
     protected synchronized void doStart() throws Exception {
         this.timer = new Timer(name, true);
-
     }
 
     @Override
@@ -78,7 +78,6 @@ public final class Scheduler extends Ser
        if (this.timer != null) {
            this.timer.cancel();
        }
-
     }
 
     public String getName() {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OptimizedAckTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OptimizedAckTest.java?rev=1387079&r1=1387078&r2=1387079&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OptimizedAckTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OptimizedAckTest.java Tue Sep 18 10:12:30 2012
@@ -16,10 +16,13 @@
  */
 package org.apache.activemq;
 
+import java.util.concurrent.TimeUnit;
+
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
+
 import org.apache.activemq.broker.BrokerRegistry;
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.util.Wait;
@@ -82,7 +85,6 @@ public class OptimizedAckTest extends Te
          }
      }
 
-
      public void testVerySlowReceivedMessageStillInflight() throws Exception {
          connection.start();
          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -119,4 +121,50 @@ public class OptimizedAckTest extends Te
              }
          }
      }
+
+     public void testReceivedMessageNotInFlightAfterScheduledAckFires() throws Exception {
+         connection.setOptimizedAckScheduledAckInterval(TimeUnit.SECONDS.toMillis(10));
+         connection.start();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue("test");
+         MessageProducer producer = session.createProducer(queue);
+         for (int i = 0; i < 10; i++) {
+             producer.send(session.createTextMessage("Hello" + i));
+         }
+
+         final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker();
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() {
+             @Override
+             public boolean isSatisified() throws Exception {
+                 LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
+                 return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
+             }
+         }));
+
+         for (int i=0; i<10; i++) {
+            javax.jms.Message msg = consumer.receive(4000);
+            assertNotNull(msg);
+             if (i<7) {
+                 assertEquals("all prefetch is still in flight", 10, regionBroker.getDestinationStatistics().getInflight().getCount());
+             } else {
+                 assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition(){
+                     @Override
+                     public boolean isSatisified() throws Exception {
+                         LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
+                         return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
+                     }
+                 }));
+             }
+         }
+
+         assertTrue("After delay the scheduled ack should ack all inflight.", Wait.waitFor(new Wait.Condition(){
+             @Override
+             public boolean isSatisified() throws Exception {
+                 LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
+                 return 0 == regionBroker.getDestinationStatistics().getInflight().getCount();
+             }
+         }));
+     }
 }