You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/03/05 15:57:46 UTC

svn commit: r1664366 - in /qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java: broker-core/ broker-core/src/main/java/org/apache/qpid/server/ broker-core/src/main/java/org/apache/qpid/server/configuration/updater/ broker-core/src/main/java/org/apache/qpid...

Author: rgodfrey
Date: Thu Mar  5 14:57:46 2015
New Revision: 1664366

URL: http://svn.apache.org/r1664366
Log:
rewrite close

Modified:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/pom.xml
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaTransientExcludes

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/pom.xml?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/pom.xml (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/pom.xml Thu Mar  5 14:57:46 2015
@@ -107,8 +107,14 @@
       </exclusions>
     </dependency>
 
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava-version}</version>
+    </dependency>
+
      <!-- test dependencies -->
-     <dependency>
+    <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-test-utils</artifactId>
       <version>${project.version}</version>

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java Thu Mar  5 14:57:46 2015
@@ -29,10 +29,13 @@ import java.security.PrivilegedException
 import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import javax.security.auth.Subject;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
@@ -54,7 +57,6 @@ import org.apache.qpid.server.plugin.Plu
 import org.apache.qpid.server.plugin.SystemConfigFactory;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.FutureResult;
 
 public class Broker implements BrokerShutdownProvider
 {
@@ -108,13 +110,13 @@ public class Broker implements BrokerShu
             {
                 if(_systemConfig != null)
                 {
-                    final FutureResult closeResult = _systemConfig.close();
-                    closeResult.waitForCompletion(5000l);
+                    ListenableFuture<Void> closeResult = _systemConfig.close();
+                    closeResult.get(5000l, TimeUnit.MILLISECONDS);
                 }
                 _taskExecutor.stop();
 
             }
-            catch (TimeoutException e)
+            catch (TimeoutException | InterruptedException | ExecutionException e)
             {
                 LOGGER.warn("Attempting to cleanly shutdown took too long, exiting immediately");
             }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java Thu Mar  5 14:57:46 2015
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.configuration.updater;
 
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 
 public interface TaskExecutor
@@ -43,4 +44,7 @@ public interface TaskExecutor
 
     <T> Future<T> submit(Task<T> task) throws CancellationException;
 
+    boolean isTaskExecutorThread();
+
+    Executor getExecutor();
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java Thu Mar  5 14:57:46 2015
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -277,7 +278,13 @@ public class TaskExecutorImpl implements
         }
     }
 
-    private boolean isTaskExecutorThread()
+    @Override
+    public Executor getExecutor()
+    {
+        return _executor;
+    }
+
+    public boolean isTaskExecutorThread()
     {
         return Thread.currentThread() == _taskThread;
     }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java Thu Mar  5 14:57:46 2015
@@ -22,9 +22,10 @@ package org.apache.qpid.server.consumer;
 
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.util.FutureResult;
 
 public interface ConsumerImpl
 {
@@ -66,7 +67,7 @@ public interface ConsumerImpl
 
     boolean seesRequeues();
 
-    FutureResult close();
+    ListenableFuture<Void> close();
 
     boolean trySendLock();
 

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Thu Mar  5 14:57:46 2015
@@ -44,10 +44,15 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.security.auth.Subject;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.log4j.Logger;
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.JsonProcessingException;
@@ -470,18 +475,66 @@ public abstract class AbstractConfigured
         }
     }
 
-    protected FutureResult closeChildren()
+    private static class ChildCounter
     {
-        final List<FutureResult> childCloseFutures = new ArrayList<>();
+        private final AtomicInteger _count = new AtomicInteger();
+        private final Runnable _task;
+
+        private ChildCounter(final Runnable task)
+        {
+            _task = task;
+        }
+
+        public void incrementCount()
+        {
+            _count.incrementAndGet();
+        }
+
+        public void decrementCount()
+        {
+            if(_count.decrementAndGet() == 0)
+            {
+                _task.run();
+            }
+        }
+    }
+
+    protected final ListenableFuture<Void> closeChildren()
+    {
+        LOGGER.debug("KWDEBUG closing children");
+
+        final SettableFuture<Void> returnVal = SettableFuture.create();
+        final ChildCounter counter = new ChildCounter(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                returnVal.set(null);
+            }
+        });
+        counter.incrementCount();
+
+
         applyToChildren(new Action<ConfiguredObject<?>>()
         {
             @Override
             public void performAction(final ConfiguredObject<?> child)
             {
-                childCloseFutures.add(child.close());
+                counter.incrementCount();
+                ListenableFuture<Void> close = child.close();
+                close.addListener(new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        counter.decrementCount();
+                    }
+                }, MoreExecutors.sameThreadExecutor());
             }
         });
 
+        counter.decrementCount();
+
         for(Collection<ConfiguredObject<?>> childList : _children.values())
         {
             childList.clear();
@@ -497,101 +550,65 @@ public abstract class AbstractConfigured
             childNameMap.clear();
         }
 
-
-        FutureResult futureResult;
-        if(childCloseFutures.isEmpty())
-        {
-            futureResult = FutureResult.IMMEDIATE_FUTURE;
-        }
-        else
-        {
-            futureResult = new FutureResult()
-                                {
-                                    @Override
-                                    public boolean isComplete()
-                                    {
-                                        for(FutureResult childResult : childCloseFutures)
-                                        {
-                                            if(!childResult.isComplete())
-                                            {
-                                                return false;
-                                            }
-                                        }
-                                        return true;
-                                    }
-
-                                    @Override
-                                    public void waitForCompletion()
-                                    {
-                                        for(FutureResult childResult : childCloseFutures)
-                                        {
-                                            childResult.waitForCompletion();
-                                        }
-                                    }
-
-
-                                    @Override
-                                    public void waitForCompletion(long timeout) throws TimeoutException
-                                    {
-                                        long startTime = System.currentTimeMillis();
-                                        long remaining = timeout;
-                                        for(FutureResult childResult : childCloseFutures)
-                                        {
-
-                                            childResult.waitForCompletion(remaining);
-                                            remaining = startTime + timeout - System.currentTimeMillis();
-                                            if(remaining < 0)
-                                            {
-                                                throw new TimeoutException("Completion did not occur within specified timeout: " + timeout);
-                                            }
-                                        }
-                                    }
-                                };
-        }
-        return futureResult;
+        return returnVal;
     }
 
     @Override
-    public final FutureResult close()
+    public final ListenableFuture<Void> close()
     {
         if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
         {
-            final CloseResult closeResult = new CloseResult();
+            final SettableFuture<Void> returnVal = SettableFuture.create();
 
-            CloseFuture close = beforeClose();
+            final ListenableFuture<Void> beforeClose = beforeClose();
 
-            Runnable closeRunnable = new Runnable()
+            if(beforeClose != null)
             {
-                @Override
-                public void run()
+                beforeClose.addListener(new Runnable()
                 {
-                    final FutureResult result = closeChildren();
-                    closeResult.setChildFutureResult(result);
-                    onClose();
-                    unregister(false);
-
-                }
-            };
-
-            if (close == null)
-            {
-                closeRunnable.run();
+                    @Override
+                    public void run()
+                    {
+                        final ListenableFuture<Void> childCloseFuture = closeChildren();
+                        childCloseFuture.addListener(new Runnable()
+                        {
+                            @Override
+                            public void run()
+                            {
+                                onClose();
+                                unregister(false);
+                                returnVal.set(null);
+                            }
+                        }, getTaskExecutor().getExecutor());
+                    }
+                }, getTaskExecutor().getExecutor());
             }
             else
             {
-                close.runWhenComplete(closeRunnable);
+                final ListenableFuture<Void> childCloseFuture = closeChildren();
+                childCloseFuture.addListener(new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        onClose();
+                        unregister(false);
+                        returnVal.set(null);
+                    }
+                }, getTaskExecutor().getExecutor());
             }
 
-            // if future not complete, schedule the remainder to be done once complete.
-            return closeResult;
+            return returnVal;
+
+
         }
         else
         {
-            return FutureResult.IMMEDIATE_FUTURE;
+            return Futures.immediateFuture(null);
         }
     }
 
-    protected CloseFuture beforeClose()
+    protected ListenableFuture<Void> beforeClose()
     {
         return null;
     }
@@ -2013,6 +2030,7 @@ public abstract class AbstractConfigured
                 }
             }
             _childFutureResult.waitForCompletion();
+
         }
 
         @Override
@@ -2042,6 +2060,7 @@ public abstract class AbstractConfigured
                 }
             }
             _childFutureResult.waitForCompletion(remaining);
+
         }
 
         public synchronized void setChildFutureResult(final FutureResult childFutureResult)

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java Thu Mar  5 14:57:46 2015
@@ -26,9 +26,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.util.FutureResult;
 
 @ManagedObject( creatable = false, category = false )
 /**
@@ -250,7 +251,7 @@ public interface ConfiguredObject<X exte
 
     void open();
 
-    FutureResult close();
+    ListenableFuture<Void> close();
 
     TaskExecutor getTaskExecutor();
 

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java Thu Mar  5 14:57:46 2015
@@ -27,6 +27,8 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.protocol.AMQConstant;
@@ -166,25 +168,24 @@ public final class ConnectionAdapter ext
     }
 
     @Override
-    protected CloseFuture beforeClose()
+    protected ListenableFuture<Void> beforeClose()
     {
         _closing.set(true);
 
-        final ConnectionCloseFuture closeFuture = asyncClose();
+        return asyncClose();
 
-        return closeFuture;
     }
 
-    private ConnectionCloseFuture asyncClose()
+    private ListenableFuture<Void> asyncClose()
     {
-        final ConnectionCloseFuture closeFuture = new ConnectionCloseFuture();
+        final SettableFuture<Void> closeFuture = SettableFuture.create();
 
         _underlyingConnection.addDeleteTask(new Action()
         {
             @Override
             public void performAction(final Object object)
             {
-                closeFuture.connectionClosed();
+                closeFuture.set(null);
             }
         });
 

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Mar  5 14:57:46 2015
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.Atomi
 
 import javax.security.auth.Subject;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.pool.ReferenceCountingExecutorService;
@@ -52,7 +53,6 @@ import org.apache.qpid.server.connection
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.logging.EventLogger;
@@ -97,6 +97,7 @@ import org.apache.qpid.server.util.MapVa
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 import org.apache.qpid.transport.TransportException;
 
 public abstract class AbstractQueue<X extends AbstractQueue<X>>
@@ -823,7 +824,7 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
-    protected org.apache.qpid.server.model.CloseFuture beforeClose()
+    protected ListenableFuture<Void> beforeClose()
     {
         _closing = true;
         return super.beforeClose();

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Thu Mar  5 14:57:46 2015
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.Atomi
 
 import javax.security.auth.Subject;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
@@ -62,6 +63,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.adapter.ConnectionAdapter;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.ConnectionValidator;
@@ -805,15 +807,18 @@ public abstract class AbstractVirtualHos
     }
 
     @Override
-    protected CloseFuture beforeClose()
+    protected ListenableFuture<Void> beforeClose()
     {
+        _logger.debug("KWDEBUG setting state to UNAVAILABLE");
         setState(State.UNAVAILABLE);
-        return null;
+
+        return super.beforeClose();
     }
 
     @Override
     protected void onClose()
     {
+        _logger.debug("KWDEBUG onClose");
         //Stop Connections
         _connectionRegistry.close();
         _dtxRegistry.close();
@@ -825,6 +830,7 @@ public abstract class AbstractVirtualHos
 
     private void closeMessageStore()
     {
+        _logger.debug("KWDEBUG closeMessageStore");
         if (getMessageStore() != null)
         {
             try
@@ -1308,6 +1314,7 @@ public abstract class AbstractVirtualHos
     @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
     protected void doStop()
     {
+        // TODO - need to deal with async close children
         closeChildren();
         shutdownHouseKeeping();
         closeMessageStore();

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java Thu Mar  5 14:57:46 2015
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
@@ -186,7 +187,7 @@ public abstract class AbstractVirtualHos
     {
         setState(State.DELETED);
         deleteVirtualHostIfExists();
-        close();
+        final ListenableFuture<Void> closeFuture = close();
         deleted();
         DurableConfigurationStore configurationStore = getConfigurationStore();
         if (configurationStore != null)
@@ -212,6 +213,7 @@ public abstract class AbstractVirtualHos
 
     protected void stopAndSetStateTo(State stoppedState)
     {
+        // TODO - deal with async close children
         closeChildren();
         closeConfigurationStoreSafely();
         setState(stoppedState);

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java Thu Mar  5 14:57:46 2015
@@ -22,11 +22,14 @@ package org.apache.qpid.server.configura
 
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.util.concurrent.MoreExecutors;
+
 public class CurrentThreadTaskExecutor implements TaskExecutor
 {
     private final AtomicReference<Thread> _thread = new AtomicReference<>();
@@ -144,4 +147,15 @@ public class CurrentThreadTaskExecutor i
         return executor;
     }
 
+    @Override
+    public boolean isTaskExecutorThread()
+    {
+        return true;
+    }
+
+    @Override
+    public Executor getExecutor()
+    {
+        return MoreExecutors.sameThreadExecutor();
+    }
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Thu Mar  5 14:57:46 2015
@@ -24,12 +24,14 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.util.FutureResult;
 import org.apache.qpid.server.util.StateChangeListener;
 
 class ManagementNodeConsumer implements ConsumerImpl
@@ -123,9 +125,9 @@ class ManagementNodeConsumer implements
     }
 
     @Override
-    public FutureResult close()
+    public ListenableFuture<Void> close()
     {
-        return FutureResult.IMMEDIATE_FUTURE;
+        return Futures.immediateFuture(null);
     }
 
     @Override

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaTransientExcludes
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaTransientExcludes?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaTransientExcludes (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaTransientExcludes Thu Mar  5 14:57:46 2015
@@ -47,6 +47,7 @@ org.apache.qpid.server.store.VirtualHost
 org.apache.qpid.server.store.berkeleydb.*
 org.apache.qpid.server.store.berkeleydb.replication.*
 org.apache.qpid.server.store.berkeleydb.upgrade.*
+org.apache.qpid.server.virtualhostnode.berkeleydb.*
 
 org.apache.qpid.systest.management.jmx.QueueManagementTest#testAlternateExchangeSurvivesRestart
 org.apache.qpid.systest.management.jmx.QueueManagementTest#testQueueDescriptionSurvivesRestart



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org