You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/03/05 15:57:46 UTC
svn commit: r1664366 - in /qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java:
broker-core/ broker-core/src/main/java/org/apache/qpid/server/
broker-core/src/main/java/org/apache/qpid/server/configuration/updater/
broker-core/src/main/java/org/apache/qpid...
Author: rgodfrey
Date: Thu Mar 5 14:57:46 2015
New Revision: 1664366
URL: http://svn.apache.org/r1664366
Log:
rewrite close
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/pom.xml
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaTransientExcludes
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/pom.xml?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/pom.xml (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/pom.xml Thu Mar 5 14:57:46 2015
@@ -107,8 +107,14 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava-version}</version>
+ </dependency>
+
<!-- test dependencies -->
- <dependency>
+ <dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-test-utils</artifactId>
<version>${project.version}</version>
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java Thu Mar 5 14:57:46 2015
@@ -29,10 +29,13 @@ import java.security.PrivilegedException
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
@@ -54,7 +57,6 @@ import org.apache.qpid.server.plugin.Plu
import org.apache.qpid.server.plugin.SystemConfigFactory;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.FutureResult;
public class Broker implements BrokerShutdownProvider
{
@@ -108,13 +110,13 @@ public class Broker implements BrokerShu
{
if(_systemConfig != null)
{
- final FutureResult closeResult = _systemConfig.close();
- closeResult.waitForCompletion(5000l);
+ ListenableFuture<Void> closeResult = _systemConfig.close();
+ closeResult.get(5000l, TimeUnit.MILLISECONDS);
}
_taskExecutor.stop();
}
- catch (TimeoutException e)
+ catch (TimeoutException | InterruptedException | ExecutionException e)
{
LOGGER.warn("Attempting to cleanly shutdown took too long, exiting immediately");
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java Thu Mar 5 14:57:46 2015
@@ -21,6 +21,7 @@
package org.apache.qpid.server.configuration.updater;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
public interface TaskExecutor
@@ -43,4 +44,7 @@ public interface TaskExecutor
<T> Future<T> submit(Task<T> task) throws CancellationException;
+ boolean isTaskExecutorThread();
+
+ Executor getExecutor();
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java Thu Mar 5 14:57:46 2015
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -277,7 +278,13 @@ public class TaskExecutorImpl implements
}
}
- private boolean isTaskExecutorThread()
+ @Override
+ public Executor getExecutor()
+ {
+ return _executor;
+ }
+
+ public boolean isTaskExecutorThread()
{
return Thread.currentThread() == _taskThread;
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java Thu Mar 5 14:57:46 2015
@@ -22,9 +22,10 @@ package org.apache.qpid.server.consumer;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.util.FutureResult;
public interface ConsumerImpl
{
@@ -66,7 +67,7 @@ public interface ConsumerImpl
boolean seesRequeues();
- FutureResult close();
+ ListenableFuture<Void> close();
boolean trySendLock();
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Thu Mar 5 14:57:46 2015
@@ -44,10 +44,15 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonProcessingException;
@@ -470,18 +475,66 @@ public abstract class AbstractConfigured
}
}
- protected FutureResult closeChildren()
+ private static class ChildCounter
{
- final List<FutureResult> childCloseFutures = new ArrayList<>();
+ private final AtomicInteger _count = new AtomicInteger();
+ private final Runnable _task;
+
+ private ChildCounter(final Runnable task)
+ {
+ _task = task;
+ }
+
+ public void incrementCount()
+ {
+ _count.incrementAndGet();
+ }
+
+ public void decrementCount()
+ {
+ if(_count.decrementAndGet() == 0)
+ {
+ _task.run();
+ }
+ }
+ }
+
+ protected final ListenableFuture<Void> closeChildren()
+ {
+ LOGGER.debug("KWDEBUG closing children");
+
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ final ChildCounter counter = new ChildCounter(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ returnVal.set(null);
+ }
+ });
+ counter.incrementCount();
+
+
applyToChildren(new Action<ConfiguredObject<?>>()
{
@Override
public void performAction(final ConfiguredObject<?> child)
{
- childCloseFutures.add(child.close());
+ counter.incrementCount();
+ ListenableFuture<Void> close = child.close();
+ close.addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ counter.decrementCount();
+ }
+ }, MoreExecutors.sameThreadExecutor());
}
});
+ counter.decrementCount();
+
for(Collection<ConfiguredObject<?>> childList : _children.values())
{
childList.clear();
@@ -497,101 +550,65 @@ public abstract class AbstractConfigured
childNameMap.clear();
}
-
- FutureResult futureResult;
- if(childCloseFutures.isEmpty())
- {
- futureResult = FutureResult.IMMEDIATE_FUTURE;
- }
- else
- {
- futureResult = new FutureResult()
- {
- @Override
- public boolean isComplete()
- {
- for(FutureResult childResult : childCloseFutures)
- {
- if(!childResult.isComplete())
- {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public void waitForCompletion()
- {
- for(FutureResult childResult : childCloseFutures)
- {
- childResult.waitForCompletion();
- }
- }
-
-
- @Override
- public void waitForCompletion(long timeout) throws TimeoutException
- {
- long startTime = System.currentTimeMillis();
- long remaining = timeout;
- for(FutureResult childResult : childCloseFutures)
- {
-
- childResult.waitForCompletion(remaining);
- remaining = startTime + timeout - System.currentTimeMillis();
- if(remaining < 0)
- {
- throw new TimeoutException("Completion did not occur within specified timeout: " + timeout);
- }
- }
- }
- };
- }
- return futureResult;
+ return returnVal;
}
@Override
- public final FutureResult close()
+ public final ListenableFuture<Void> close()
{
if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
{
- final CloseResult closeResult = new CloseResult();
+ final SettableFuture<Void> returnVal = SettableFuture.create();
- CloseFuture close = beforeClose();
+ final ListenableFuture<Void> beforeClose = beforeClose();
- Runnable closeRunnable = new Runnable()
+ if(beforeClose != null)
{
- @Override
- public void run()
+ beforeClose.addListener(new Runnable()
{
- final FutureResult result = closeChildren();
- closeResult.setChildFutureResult(result);
- onClose();
- unregister(false);
-
- }
- };
-
- if (close == null)
- {
- closeRunnable.run();
+ @Override
+ public void run()
+ {
+ final ListenableFuture<Void> childCloseFuture = closeChildren();
+ childCloseFuture.addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ onClose();
+ unregister(false);
+ returnVal.set(null);
+ }
+ }, getTaskExecutor().getExecutor());
+ }
+ }, getTaskExecutor().getExecutor());
}
else
{
- close.runWhenComplete(closeRunnable);
+ final ListenableFuture<Void> childCloseFuture = closeChildren();
+ childCloseFuture.addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ onClose();
+ unregister(false);
+ returnVal.set(null);
+ }
+ }, getTaskExecutor().getExecutor());
}
- // if future not complete, schedule the remainder to be done once complete.
- return closeResult;
+ return returnVal;
+
+
}
else
{
- return FutureResult.IMMEDIATE_FUTURE;
+ return Futures.immediateFuture(null);
}
}
- protected CloseFuture beforeClose()
+ protected ListenableFuture<Void> beforeClose()
{
return null;
}
@@ -2013,6 +2030,7 @@ public abstract class AbstractConfigured
}
}
_childFutureResult.waitForCompletion();
+
}
@Override
@@ -2042,6 +2060,7 @@ public abstract class AbstractConfigured
}
}
_childFutureResult.waitForCompletion(remaining);
+
}
public synchronized void setChildFutureResult(final FutureResult childFutureResult)
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java Thu Mar 5 14:57:46 2015
@@ -26,9 +26,10 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.util.FutureResult;
@ManagedObject( creatable = false, category = false )
/**
@@ -250,7 +251,7 @@ public interface ConfiguredObject<X exte
void open();
- FutureResult close();
+ ListenableFuture<Void> close();
TaskExecutor getTaskExecutor();
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java Thu Mar 5 14:57:46 2015
@@ -27,6 +27,8 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.protocol.AMQConstant;
@@ -166,25 +168,24 @@ public final class ConnectionAdapter ext
}
@Override
- protected CloseFuture beforeClose()
+ protected ListenableFuture<Void> beforeClose()
{
_closing.set(true);
- final ConnectionCloseFuture closeFuture = asyncClose();
+ return asyncClose();
- return closeFuture;
}
- private ConnectionCloseFuture asyncClose()
+ private ListenableFuture<Void> asyncClose()
{
- final ConnectionCloseFuture closeFuture = new ConnectionCloseFuture();
+ final SettableFuture<Void> closeFuture = SettableFuture.create();
_underlyingConnection.addDeleteTask(new Action()
{
@Override
public void performAction(final Object object)
{
- closeFuture.connectionClosed();
+ closeFuture.set(null);
}
});
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Mar 5 14:57:46 2015
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.Atomi
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
@@ -52,7 +53,6 @@ import org.apache.qpid.server.connection
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.logging.EventLogger;
@@ -97,6 +97,7 @@ import org.apache.qpid.server.util.MapVa
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.transport.TransportException;
public abstract class AbstractQueue<X extends AbstractQueue<X>>
@@ -823,7 +824,7 @@ public abstract class AbstractQueue<X ex
}
@Override
- protected org.apache.qpid.server.model.CloseFuture beforeClose()
+ protected ListenableFuture<Void> beforeClose()
{
_closing = true;
return super.beforeClose();
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Thu Mar 5 14:57:46 2015
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.Atomi
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -62,6 +63,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.adapter.ConnectionAdapter;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ConnectionValidator;
@@ -805,15 +807,18 @@ public abstract class AbstractVirtualHos
}
@Override
- protected CloseFuture beforeClose()
+ protected ListenableFuture<Void> beforeClose()
{
+ _logger.debug("KWDEBUG setting state to UNAVAILABLE");
setState(State.UNAVAILABLE);
- return null;
+
+ return super.beforeClose();
}
@Override
protected void onClose()
{
+ _logger.debug("KWDEBUG onClose");
//Stop Connections
_connectionRegistry.close();
_dtxRegistry.close();
@@ -825,6 +830,7 @@ public abstract class AbstractVirtualHos
private void closeMessageStore()
{
+ _logger.debug("KWDEBUG closeMessageStore");
if (getMessageStore() != null)
{
try
@@ -1308,6 +1314,7 @@ public abstract class AbstractVirtualHos
@StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
protected void doStop()
{
+ // TODO - need to deal with async close children
closeChildren();
shutdownHouseKeeping();
closeMessageStore();
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java Thu Mar 5 14:57:46 2015
@@ -38,6 +38,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -186,7 +187,7 @@ public abstract class AbstractVirtualHos
{
setState(State.DELETED);
deleteVirtualHostIfExists();
- close();
+ final ListenableFuture<Void> closeFuture = close();
deleted();
DurableConfigurationStore configurationStore = getConfigurationStore();
if (configurationStore != null)
@@ -212,6 +213,7 @@ public abstract class AbstractVirtualHos
protected void stopAndSetStateTo(State stoppedState)
{
+ // TODO - deal with async close children
closeChildren();
closeConfigurationStoreSafely();
setState(stoppedState);
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java Thu Mar 5 14:57:46 2015
@@ -22,11 +22,14 @@ package org.apache.qpid.server.configura
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.util.concurrent.MoreExecutors;
+
public class CurrentThreadTaskExecutor implements TaskExecutor
{
private final AtomicReference<Thread> _thread = new AtomicReference<>();
@@ -144,4 +147,15 @@ public class CurrentThreadTaskExecutor i
return executor;
}
+ @Override
+ public boolean isTaskExecutorThread()
+ {
+ return true;
+ }
+
+ @Override
+ public Executor getExecutor()
+ {
+ return MoreExecutors.sameThreadExecutor();
+ }
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Thu Mar 5 14:57:46 2015
@@ -24,12 +24,14 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.util.StateChangeListener;
class ManagementNodeConsumer implements ConsumerImpl
@@ -123,9 +125,9 @@ class ManagementNodeConsumer implements
}
@Override
- public FutureResult close()
+ public ListenableFuture<Void> close()
{
- return FutureResult.IMMEDIATE_FUTURE;
+ return Futures.immediateFuture(null);
}
@Override
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaTransientExcludes
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaTransientExcludes?rev=1664366&r1=1664365&r2=1664366&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaTransientExcludes (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaTransientExcludes Thu Mar 5 14:57:46 2015
@@ -47,6 +47,7 @@ org.apache.qpid.server.store.VirtualHost
org.apache.qpid.server.store.berkeleydb.*
org.apache.qpid.server.store.berkeleydb.replication.*
org.apache.qpid.server.store.berkeleydb.upgrade.*
+org.apache.qpid.server.virtualhostnode.berkeleydb.*
org.apache.qpid.systest.management.jmx.QueueManagementTest#testAlternateExchangeSurvivesRestart
org.apache.qpid.systest.management.jmx.QueueManagementTest#testQueueDescriptionSurvivesRestart
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org