You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/07 15:15:01 UTC
svn commit: r1768530 - in /qpid/java/branches/remove-queue-runner:
bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/mod...
Author: rgodfrey
Date: Mon Nov 7 15:15:00 2016
New Revision: 1768530
URL: http://svn.apache.org/viewvc?rev=1768530&view=rev
Log:
Removed sendLock, made onClose return future
Removed:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
Modified:
qpid/java/branches/remove-queue-runner/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/branches/remove-queue-runner/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/BrokerFileLoggerImpl.java
qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
qpid/java/branches/remove-queue-runner/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
Modified: qpid/java/branches/remove-queue-runner/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java Mon Nov 7 15:15:00 2016
@@ -510,16 +510,18 @@ public class BDBHAVirtualHostNodeImpl ex
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
- try
- {
- super.onClose();
- }
- finally
- {
- closeEnvironment();
- }
+ return doAfterAlways(super.onClose(),
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ closeEnvironment();
+ }
+ });
+
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Mon Nov 7 15:15:00 2016
@@ -32,6 +32,8 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +41,7 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AbstractQueue;
import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
import org.apache.qpid.server.transport.AMQPConnection;
@@ -90,7 +93,7 @@ public abstract class AbstractConsumerTa
public void notifyWork()
{
_waitingOnStateChange.set(false);
- getSessionModel().getAMQPConnection().notifyWork();
+ getSessionModel().notifyWork(this);
}
@Override
@@ -100,32 +103,25 @@ public abstract class AbstractConsumerTa
{
return false;
}
- if(sendNextMessage())
- {
- return true;
- }
- else
- {
- processStateChanged();
- processClosed();
- return false;
- }
+ return sendNextMessage();
}
@Override
public boolean hasPendingWork()
{
- return hasMessagesToSend() || hasStateChanged() || hasClosed();
+ if (!_waitingOnStateChange.get() && hasCredit())
+ {
+ for (ConsumerImpl consumer : _consumers)
+ {
+ if (consumer.hasAvailableMessages())
+ {
+ return true;
+ }
+ }
+ }
+ return false;
}
- protected abstract boolean hasStateChanged();
-
- protected abstract boolean hasClosed();
-
- protected abstract void processStateChanged();
-
- protected abstract void processClosed();
-
@Override
public void consumerAdded(final ConsumerImpl sub)
{
@@ -133,7 +129,33 @@ public abstract class AbstractConsumerTa
}
@Override
- public void consumerRemoved(final ConsumerImpl sub)
+ public ListenableFuture<Void> consumerRemoved(final ConsumerImpl sub)
+ {
+ if(_consumers.contains(sub))
+ {
+ return doOnIoThreadAsync(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ consumerRemovedInternal(sub);
+ }
+ });
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
+ }
+ }
+
+ private ListenableFuture<Void> doOnIoThreadAsync(final Runnable task)
+ {
+ AMQSessionModel<?> sessionModel = getSessionModel();
+ return sessionModel.getAMQPConnection().doOnIOThreadAsync(task);
+ }
+
+ private void consumerRemovedInternal(final ConsumerImpl sub)
{
_consumers.remove(sub);
if(_consumers.isEmpty())
@@ -250,21 +272,6 @@ public abstract class AbstractConsumerTa
_stateChangeListeners.remove(listener);
}
- public final boolean trySendLock()
- {
- return _stateChangeLock.tryLock();
- }
-
- public final void getSendLock()
- {
- _stateChangeLock.lock();
- }
-
- public final void releaseSendLock()
- {
- _stateChangeLock.unlock();
- }
-
@Override
public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
@@ -279,21 +286,6 @@ public abstract class AbstractConsumerTa
protected abstract void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
- @Override
- public boolean hasMessagesToSend()
- {
- if (!_waitingOnStateChange.get() && hasCredit())
- {
- for (ConsumerImpl consumer : _consumers)
- {
- if (consumer.hasAvailableMessages())
- {
- return true;
- }
- }
- }
- return false;
- }
@Override
public boolean sendNextMessage()
@@ -351,27 +343,21 @@ public abstract class AbstractConsumerTa
{
boolean closed = false;
State state = getState();
+ List<ConsumerImpl> consumers = new ArrayList<>(_consumers);
+ _consumers.clear();
- getSendLock();
- try
+ while(!closed && state != State.CLOSED)
{
- while(!closed && state != State.CLOSED)
+ closed = updateState(state, State.CLOSED);
+ if(!closed)
{
- closed = updateState(state, State.CLOSED);
- if(!closed)
- {
- state = getState();
- }
+ state = getState();
}
- ConsumerMessageInstancePair instance;
- doCloseInternal();
- }
- finally
- {
- releaseSendLock();
}
- for (ConsumerImpl consumer : _consumers)
+ doCloseInternal();
+
+ for (ConsumerImpl consumer : consumers)
{
consumer.close();
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java Mon Nov 7 15:15:00 2016
@@ -72,13 +72,6 @@ public interface ConsumerImpl
void close();
- boolean trySendLock();
-
-
- void getSendLock();
-
- void releaseSendLock();
-
boolean isActive();
String getName();
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Mon Nov 7 15:15:00 2016
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.consumer;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -54,7 +56,7 @@ public interface ConsumerTarget
void consumerAdded(ConsumerImpl sub);
- void consumerRemoved(ConsumerImpl sub);
+ ListenableFuture<Void> consumerRemoved(ConsumerImpl sub);
void notifyCurrentState();
@@ -68,8 +70,6 @@ public interface ConsumerTarget
long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
- boolean hasMessagesToSend();
-
boolean sendNextMessage();
void flushBatched();
@@ -85,11 +85,4 @@ public interface ConsumerTarget
boolean isSuspended();
boolean close();
-
- boolean trySendLock();
-
- void getSendLock();
-
- void releaseSendLock();
-
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Mon Nov 7 15:15:00 2016
@@ -774,16 +774,25 @@ public abstract class AbstractConfigured
{
return closeChildren();
}
+ }).then(new Callable<ListenableFuture<Void>>()
+ {
+ @Override
+ public ListenableFuture<Void> call() throws Exception
+ {
+ return onClose();
+ }
}).then(new Runnable()
- {
- @Override
- public void run()
- {
- onClose();
- unregister(false);
- LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
- }
- });
+ {
+ @Override
+ public void run()
+ {
+ unregister(false);
+ LOGGER.debug("Closed "
+ + AbstractConfiguredObject.this.getClass().getSimpleName()
+ + " : "
+ + getName());
+ }
+ });
}
else
{
@@ -819,8 +828,9 @@ public abstract class AbstractConfigured
return Futures.immediateFuture(null);
}
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
+ return Futures.immediateFuture(null);
}
public final void create()
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java Mon Nov 7 15:15:00 2016
@@ -168,7 +168,7 @@ public abstract class AbstractSystemConf
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
final TaskExecutor taskExecutor = getTaskExecutor();
try
@@ -192,7 +192,7 @@ public abstract class AbstractSystemConf
taskExecutor.stopImmediately();
}
}
-
+ return Futures.immediateFuture(null);
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java Mon Nov 7 15:15:00 2016
@@ -671,7 +671,7 @@ public class BrokerImpl extends Abstract
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
if (_reportingTimer != null)
{
@@ -700,6 +700,8 @@ public class BrokerImpl extends Abstract
task.run();
}
}
+ return Futures.immediateFuture(null);
+
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java Mon Nov 7 15:15:00 2016
@@ -60,9 +60,9 @@ public final class SessionAdapter extend
// Attributes
private final AMQSessionModel _session;
private final Action _deleteModelTask;
- private final AbstractAMQPConnection<?> _amqpConnection;
+ private final AbstractAMQPConnection<?,?> _amqpConnection;
- public SessionAdapter(final AbstractAMQPConnection<?> amqpConnection,
+ public SessionAdapter(final AbstractAMQPConnection<?,?> amqpConnection,
final AMQSessionModel session)
{
super(parentsMap(amqpConnection), createAttributes(session));
@@ -206,7 +206,7 @@ public final class SessionAdapter extend
return Futures.immediateFuture(null);
}
- private void registerTransactionTimeoutTickers(AbstractAMQPConnection<?> amqpConnection,
+ private void registerTransactionTimeoutTickers(AbstractAMQPConnection<?,?> amqpConnection,
final AMQSessionModel session)
{
NamedAddressSpace addressSpace = amqpConnection.getAddressSpace();
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java Mon Nov 7 15:15:00 2016
@@ -43,6 +43,7 @@ import javax.net.ssl.X509TrustManager;
import javax.security.auth.Subject;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
@@ -310,7 +311,7 @@ public class AmqpPortImpl extends Abstra
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
if (_transport != null)
{
@@ -322,6 +323,7 @@ public class AmqpPortImpl extends Abstra
_transport.close();
}
+ return Futures.immediateFuture(null);
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Mon Nov 7 15:15:00 2016
@@ -25,6 +25,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Queue;
@@ -106,5 +107,5 @@ public interface AMQSessionModel<T exten
void addTicker(Ticker ticker);
void removeTicker(Ticker ticker);
- void ensureConsumersNoticedStateChange();
+ void notifyWork(ConsumerTarget target);
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Mon Nov 7 15:15:00 2016
@@ -1851,11 +1851,11 @@ public abstract class AbstractQueue<X ex
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
- super.onClose();
_stopped.set(true);
_closing = false;
+ return Futures.immediateFuture(null);
}
public void checkCapacity(AMQSessionModel channel)
@@ -1931,7 +1931,6 @@ public abstract class AbstractQueue<X ex
boolean queueEmpty = false;
MessageContainer messageContainer = null;
- sub.getSendLock();
try
{
if (!sub.isSuspended())
@@ -1950,7 +1949,6 @@ public abstract class AbstractQueue<X ex
}
finally
{
- sub.releaseSendLock();
if(queueEmpty)
{
sub.queueEmpty();
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Mon Nov 7 15:15:00 2016
@@ -33,6 +33,8 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -301,28 +303,33 @@ class QueueConsumerImpl
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
if(_closed.compareAndSet(false,true))
{
- _target.getSendLock();
- try
- {
- _waitingOnCreditMessageListener.remove();
- _target.consumerRemoved(this);
- _target.removeStateChangeListener(_listener);
- _queue.unregisterConsumer(this);
- if(_suspendedConsumerLoggingTicker != null)
- {
- getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
- }
- deleted();
- }
- finally
- {
- _target.releaseSendLock();
- }
+ _waitingOnCreditMessageListener.remove();
+ return doAfter(_target.consumerRemoved(this),
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ _target.removeStateChangeListener(_listener);
+
+ _queue.unregisterConsumer(QueueConsumerImpl.this);
+
+ if (_suspendedConsumerLoggingTicker != null)
+ {
+ getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+ }
+ deleted();
+ }
+ });
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
}
}
@@ -524,21 +531,6 @@ class QueueConsumerImpl
return filterLogString.toString();
}
- public final boolean trySendLock()
- {
- return getTarget().trySendLock();
- }
-
- public final void getSendLock()
- {
- getTarget().getSendLock();
- }
-
- public final void releaseSendLock()
- {
- getTarget().releaseSendLock();
- }
-
public final long getCreateTime()
{
return _createTime;
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java Mon Nov 7 15:15:00 2016
@@ -79,14 +79,14 @@ public abstract class AbstractKeyStore<X
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
- super.onClose();
if(_checkExpiryTaskFuture != null)
{
_checkExpiryTaskFuture.cancel(false);
_checkExpiryTaskFuture = null;
}
+ return Futures.immediateFuture(null);
}
protected void initializeExpiryChecking()
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Mon Nov 7 15:15:00 2016
@@ -27,6 +27,8 @@ import java.util.Collection;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.model.Connection;
@@ -73,6 +75,7 @@ public interface AMQPConnection<C extend
void sendConnectionCloseAsync(AMQConstant connectionForced, String reason);
boolean isIOThread();
+ ListenableFuture<Void> doOnIOThreadAsync(final Runnable task);
void checkAuthorizedMessagePrincipal(String messageUserId);
@@ -86,4 +89,6 @@ public interface AMQPConnection<C extend
Collection<? extends AMQSessionModel<?>> getSessionModels();
void resetStatistics();
+
+ void notifyWork(AMQSessionModel<?> sessionModel);
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Mon Nov 7 15:15:00 2016
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.Atomi
import javax.security.auth.Subject;
import javax.security.auth.SubjectDomainCombiner;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
@@ -75,7 +76,7 @@ import org.apache.qpid.server.util.Fixed
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.Ticker;
-public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C>>
+public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,T>, T>
extends AbstractConfiguredObject<C>
implements ProtocolEngine, AMQPConnection<C>, EventLoggerProvider
@@ -497,6 +498,42 @@ public abstract class AbstractAMQPConnec
return Thread.currentThread() == _ioThread;
}
+ @Override
+ public ListenableFuture<Void> doOnIOThreadAsync(final Runnable task)
+ {
+ if (isIOThread())
+ {
+ task.run();
+ return Futures.immediateFuture(null);
+ }
+ else
+ {
+ final SettableFuture<Void> future = SettableFuture.create();
+
+ addAsyncTask(
+ new Action<Object>()
+ {
+ @Override
+ public void performAction(final Object object)
+ {
+ try
+ {
+ task.run();
+ future.set(null);
+ }
+ catch (RuntimeException e)
+ {
+ future.setException(e);
+ }
+ }
+ });
+ return future;
+ }
+ }
+
+ protected abstract void addAsyncTask(final Action<? super T> action);
+
+
protected <T> T runAsSubject(PrivilegedAction<T> action)
{
return Subject.doAs(_subject, action);
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Mon Nov 7 15:15:00 2016
@@ -44,7 +44,6 @@ import org.apache.qpid.server.queue.Abst
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
@@ -146,23 +145,15 @@ public abstract class AbstractSystemMess
@Override
public AbstractQueue.MessageContainer pullMessage()
{
- _target.getSendLock();
- try
+ if (!_queue.isEmpty())
{
- if (!_queue.isEmpty())
+ final PropertiesMessageInstance propertiesMessageInstance = _queue.get(0);
+ if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
{
- final PropertiesMessageInstance propertiesMessageInstance = _queue.get(0);
- if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
- {
- _queue.remove(0);
- return new AbstractQueue.MessageContainer(propertiesMessageInstance, null);
- }
+ _queue.remove(0);
+ return new AbstractQueue.MessageContainer(propertiesMessageInstance, null);
}
}
- finally
- {
- _target.releaseSendLock();
- }
return null;
}
@@ -239,24 +230,6 @@ public abstract class AbstractSystemMess
}
@Override
- public boolean trySendLock()
- {
- return _target.trySendLock();
- }
-
- @Override
- public void getSendLock()
- {
- _target.getSendLock();
- }
-
- @Override
- public void releaseSendLock()
- {
- _target.releaseSendLock();
- }
-
- @Override
public boolean isActive()
{
return false;
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Mon Nov 7 15:15:00 2016
@@ -1444,7 +1444,7 @@ public abstract class AbstractVirtualHos
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
_dtxRegistry.close();
shutdownHouseKeeping();
@@ -1455,6 +1455,7 @@ public abstract class AbstractVirtualHos
_eventLogger.message(VirtualHostMessages.CLOSED(getName()));
stopLogging(_virtualHostLoggersToClose);
+ return Futures.immediateFuture(null);
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java Mon Nov 7 15:15:00 2016
@@ -405,10 +405,11 @@ public abstract class AbstractVirtualHos
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
closeConfigurationStore();
_virtualHostExecutor.stop();
+ return Futures.immediateFuture(null);
}
private void closeConfigurationStore()
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java Mon Nov 7 15:15:00 2016
@@ -142,8 +142,9 @@ public class ManagementModeStoreHandlerT
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.QUIESCED)
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java Mon Nov 7 15:15:00 2016
@@ -31,6 +31,9 @@ import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
@@ -153,12 +156,6 @@ public class TestConsumerTarget implemen
}
@Override
- public boolean hasMessagesToSend()
- {
- return false;
- }
-
- @Override
public boolean sendNextMessage()
{
return false;
@@ -199,9 +196,10 @@ public class TestConsumerTarget implemen
}
@Override
- public void consumerRemoved(final ConsumerImpl sub)
+ public ListenableFuture<Void> consumerRemoved(final ConsumerImpl sub)
{
close();
+ return Futures.immediateFuture(null);
}
@Override
@@ -276,21 +274,6 @@ public class TestConsumerTarget implemen
}
- public final boolean trySendLock()
- {
- return _stateChangeLock.tryLock();
- }
-
- public final void getSendLock()
- {
- _stateChangeLock.lock();
- }
-
- public final void releaseSendLock()
- {
- _stateChangeLock.unlock();
- }
-
@Override
public boolean isMultiQueue()
{
@@ -508,9 +491,9 @@ public class TestConsumerTarget implemen
}
@Override
- public void ensureConsumersNoticedStateChange()
+ public void notifyWork(final ConsumerTarget target)
{
-
+ _connection.notifyWork(this);
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Mon Nov 7 15:15:00 2016
@@ -55,7 +55,7 @@ import org.apache.qpid.transport.Constan
import org.apache.qpid.server.transport.AggregateTicker;
-public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10>
+public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10, ServerConnection>
{
private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_10.class);
private final ServerInputHandler _inputHandler;
@@ -277,6 +277,12 @@ public class AMQPConnection_0_10 extends
}
}
+ @Override
+ public void notifyWork(final AMQSessionModel<?> sessionModel)
+ {
+ notifyWork();
+ }
+
public void clearWork()
{
_stateChanged.set(false);
@@ -303,6 +309,12 @@ public class AMQPConnection_0_10 extends
_connection.closeSessionAsync((ServerSession) session, cause, message);
}
+ @Override
+ protected void addAsyncTask(final Action<? super ServerConnection> action)
+ {
+ _connection.addAsyncTask(action);
+ }
+
public void block()
{
_connection.block();
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Mon Nov 7 15:15:00 2016
@@ -536,19 +536,10 @@ public class ConsumerTarget_0_10 extends
public void stop()
{
- try
- {
- getSendLock();
-
- updateState(State.ACTIVE, State.SUSPENDED);
- _stopped.set(true);
- FlowCreditManager_0_10 creditManager = getCreditManager();
- creditManager.clearCredit();
- }
- finally
- {
- releaseSendLock();
- }
+ updateState(State.ACTIVE, State.SUSPENDED);
+ _stopped.set(true);
+ FlowCreditManager_0_10 creditManager = getCreditManager();
+ creditManager.clearCredit();
}
public void addCredit(MessageCreditUnit unit, long value)
@@ -647,30 +638,6 @@ public class ConsumerTarget_0_10 extends
}
@Override
- protected void processClosed()
- {
-
- }
-
- @Override
- protected void processStateChanged()
- {
-
- }
-
- @Override
- protected boolean hasStateChanged()
- {
- return false;
- }
-
- @Override
- protected boolean hasClosed()
- {
- return false;
- }
-
- @Override
public String toString()
{
return "ConsumerTarget_0_10[name=" + _name + ", session=" + _session.toLogString() + "]";
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Mon Nov 7 15:15:00 2016
@@ -31,7 +31,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -133,7 +132,7 @@ public class ServerConnection extends Co
getAmqpConnection().getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_OK_TIMEOUT, (ServerNetworkConnection) getNetworkConnection()));
// trigger a wakeup to ensure the ticker will be taken into account
- notifyWork();
+ getAmqpConnection().notifyWork();
}
}
@@ -468,10 +467,10 @@ public class ServerConnection extends Co
super.doHeartBeat();
}
- private void addAsyncTask(final Action<ServerConnection> action)
+ void addAsyncTask(final Action<? super ServerConnection> action)
{
_asyncTaskList.add(action);
- notifyWork();
+ getAmqpConnection().notifyWork();
}
public int getMessageCompressionThreshold()
@@ -492,11 +491,6 @@ public class ServerConnection extends Co
}
}
- public void notifyWork()
- {
- _amqpConnection.notifyWork();
- }
-
public Iterator<Runnable> processPendingIterator()
{
return new ProcessPendingIterator();
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Mon Nov 7 15:15:00 2016
@@ -206,7 +206,7 @@ public class ServerSession extends Sessi
}
_blockingTimeout = serverConnection.getBroker().getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
- _maxUncommittedInMemorySize = getConnection().getAmqpConnection().getContextProvider().getContextValue(Long.class, org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+ _maxUncommittedInMemorySize = getAMQPConnection().getContextProvider().getContextValue(Long.class, org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
_publishAuthCahe = new PublishAuthorisationCache(_token,
amqpConnection.getContextValue(Long.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_TIMEOUT),
amqpConnection.getContextValue(Integer.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_SIZE));
@@ -226,7 +226,7 @@ public class ServerSession extends Sessi
if (state == State.OPEN)
{
- getConnection().getAmqpConnection().getEventLogger().message(ChannelMessages.CREATE());
+ getAMQPConnection().getEventLogger().message(ChannelMessages.CREATE());
}
}
else
@@ -320,8 +320,8 @@ public class ServerSession extends Sessi
handle.flowToDisk();
if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize())
{
- getConnection().getAmqpConnection().getEventLogger()
- .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
+ getAMQPConnection().getEventLogger()
+ .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
}
if(!_uncommittedMessages.isEmpty())
@@ -528,7 +528,7 @@ public class ServerSession extends Sessi
{
operationalLoggingMessage = ChannelMessages.CLOSE();
}
- getConnection().getAmqpConnection().getEventLogger().message(getLogSubject(), operationalLoggingMessage);
+ getAMQPConnection().getEventLogger().message(getLogSubject(), operationalLoggingMessage);
}
@Override
@@ -853,10 +853,10 @@ public class ServerSession extends Sessi
if(_blocking.compareAndSet(false,true))
{
- getConnection().getAmqpConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
+ getAMQPConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
if(getState() == State.OPEN)
{
- getConnection().notifyWork();
+ getAMQPConnection().notifyWork(this);
}
}
@@ -881,8 +881,8 @@ public class ServerSession extends Sessi
{
if(_blocking.compareAndSet(true,false) && !isClosing())
{
- getConnection().getAmqpConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
- getConnection().notifyWork();
+ getAMQPConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
+ getAMQPConnection().notifyWork(this);
}
}
}
@@ -1239,7 +1239,7 @@ public class ServerSession extends Sessi
@Override
public void addTicker(final Ticker ticker)
{
- getConnection().getAmqpConnection().getAggregateTicker().addTicker(ticker);
+ getAMQPConnection().getAggregateTicker().addTicker(ticker);
// trigger a wakeup to ensure the ticker will be taken into account
getAMQPConnection().notifyWork();
}
@@ -1247,24 +1247,13 @@ public class ServerSession extends Sessi
@Override
public void removeTicker(final Ticker ticker)
{
- getConnection().getAmqpConnection().getAggregateTicker().removeTicker(ticker);
+ getAMQPConnection().getAggregateTicker().removeTicker(ticker);
}
@Override
- public void ensureConsumersNoticedStateChange()
+ public void notifyWork(final ConsumerTarget target)
{
- Collection<ConsumerTarget_0_10> consumerTargets = getSubscriptions();
- for(ConsumerTarget_0_10 consumerTarget: consumerTargets)
- {
- try
- {
- consumerTarget.getSendLock();
- }
- finally
- {
- consumerTarget.releaseSendLock();
- }
- }
+ getAMQPConnection().notifyWork(this);
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Mon Nov 7 15:15:00 2016
@@ -99,8 +99,6 @@ import org.apache.qpid.server.protocol.C
import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.PublishAuthorisationCache;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.queue.QueueConsumer;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.MessageStore;
@@ -879,12 +877,12 @@ public class AMQChannel
{
for(ConsumerImpl sub : subs)
{
- sub.close();
if (sub instanceof Consumer<?>)
{
_consumers.remove(sub);
}
}
+ target.close();
return true;
}
else
@@ -1075,19 +1073,12 @@ public class AMQChannel
if (subscriber != null && subscriber.getSessionModel() == this)
{
ConsumerTarget target = subscriber.getTarget();
- target.getSendLock();
- try
- {
- if (target.getState() != ConsumerTarget.State.CLOSED)
- {
- target.send(subscriber, messageInstance, false);
- return true;
- }
- }
- finally
+ if (target.getState() != ConsumerTarget.State.CLOSED)
{
- target.releaseSendLock();
+ target.send(subscriber, messageInstance, false);
+ return true;
}
+
}
return false;
}
@@ -1221,11 +1212,6 @@ public class AMQChannel
messageWithSubject(ChannelMessages.FLOW("Started"));
}
-
- // This section takes two different approaches to perform to perform
- // the same function. Ensuring that the Subscription has taken note
- // of the change in Channel State
-
// Here we have become unsuspended and so we ask each the queue to
// perform an Async delivery for each of the subscriptions in this
// Channel. The alternative would be to ensure that the subscription
@@ -1244,20 +1230,6 @@ public class AMQChannel
}
}
-
- // Here we have become suspended so we need to ensure that each of
- // the Subscriptions has noticed this change so that we can be sure
- // they are not still sending messages. Again the code here is a
- // very simplistic approach to ensure that the change of suspension
- // has been noticed by each of the Subscriptions. Unlike the above
- // case we don't actually need to do anything else.
- if (!wasSuspended)
- {
- // may need to deliver queued messages
- ensureConsumersNoticedStateChange();
- }
-
-
// Log Suspension only after we have confirmed all suspensions are
// stopped.
if (suspended && _logChannelFlowMessages)
@@ -1324,9 +1296,6 @@ public class AMQChannel
boolean requiresSuspend = _suspended.compareAndSet(false,true); // TODO This is probably superfluous owing to the
// message assignment suspended logic in NBC.
- // ensure all subscriptions have seen the change to the channel state
- ensureConsumersNoticedStateChange();
-
try
{
_transaction.rollback();
@@ -1381,7 +1350,7 @@ public class AMQChannel
return _closing.get();
}
- public AMQPConnection_0_8 getConnection()
+ public AMQPConnection_0_8<?> getConnection()
{
return _connection;
}
@@ -1441,7 +1410,7 @@ public class AMQChannel
}
@Override
- public AMQPConnection_0_8 getAMQPConnection()
+ public AMQPConnection_0_8<?> getAMQPConnection()
{
return _connection;
}
@@ -1731,7 +1700,7 @@ public class AMQChannel
messageWithSubject(ChannelMessages.FLOW_ENFORCED("** All Queues **"));
- getConnection().notifyWork();
+ getConnection().notifyWork(this);
}
}
}
@@ -1743,7 +1712,7 @@ public class AMQChannel
if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false))
{
messageWithSubject(ChannelMessages.FLOW_REMOVED());
- getConnection().notifyWork();
+ getConnection().notifyWork(this);
}
}
}
@@ -1757,7 +1726,7 @@ public class AMQChannel
if(_blocking.compareAndSet(false,true))
{
messageWithSubject(ChannelMessages.FLOW_ENFORCED(queue.getName()));
- getConnection().notifyWork();
+ getConnection().notifyWork(this);
}
}
@@ -1770,7 +1739,7 @@ public class AMQChannel
if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing())
{
messageWithSubject(ChannelMessages.FLOW_REMOVED());
- getConnection().notifyWork();
+ getConnection().notifyWork(this);
}
}
}
@@ -3815,19 +3784,9 @@ public class AMQChannel
}
@Override
- public void ensureConsumersNoticedStateChange()
+ public void notifyWork(final ConsumerTarget target)
{
- for (ConsumerTarget_0_8 consumerTarget : getConsumerTargets())
- {
- try
- {
- consumerTarget.getSendLock();
- }
- finally
- {
- consumerTarget.releaseSendLock();
- }
- }
+ getAMQPConnection().notifyWork(this);
}
private Collection<ConsumerTarget_0_8> getConsumerTargets()
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java Mon Nov 7 15:15:00 2016
@@ -86,7 +86,7 @@ import org.apache.qpid.transport.ByteBuf
import org.apache.qpid.transport.TransportException;
public class AMQPConnection_0_8Impl
- extends AbstractAMQPConnection<AMQPConnection_0_8Impl>
+ extends AbstractAMQPConnection<AMQPConnection_0_8Impl, AMQPConnection_0_8Impl>
implements ServerMethodProcessor<ServerChannelMethodProcessor>, AMQPConnection_0_8<AMQPConnection_0_8Impl>
{
@@ -812,12 +812,14 @@ public class AMQPConnection_0_8Impl
addAsyncTask(action);
}
- private void addAsyncTask(final Action<AMQPConnection_0_8Impl> action)
+ @Override
+ protected void addAsyncTask(final Action<? super AMQPConnection_0_8Impl> action)
{
_asyncTaskList.add(action);
notifyWork();
}
+ @Override
public void block()
{
synchronized (_channelAddRemoveLock)
@@ -1391,6 +1393,12 @@ public class AMQPConnection_0_8Impl
}
@Override
+ public void notifyWork(final AMQSessionModel<?> sessionModel)
+ {
+ notifyWork();
+ }
+
+ @Override
public void clearWork()
{
_stateChanged.set(false);
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Mon Nov 7 15:15:00 2016
@@ -54,7 +54,6 @@ public abstract class ConsumerTarget_0_8
private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
- private final AtomicBoolean _needToClose = new AtomicBoolean();
private final String _targetAddress;
@@ -461,41 +460,13 @@ public abstract class ConsumerTarget_0_8
public void queueEmpty()
{
- if (isAutoClose())
- {
- _needToClose.set(true);
- getChannel().getConnection().notifyWork();
- }
- }
-
- @Override
- protected void processClosed()
- {
- if (hasClosed())
+ if(isAutoClose() && getState() != State.CLOSED)
{
close();
confirmAutoClose();
}
}
- @Override
- protected void processStateChanged()
- {
-
- }
-
- @Override
- protected boolean hasStateChanged()
- {
- return false;
- }
-
- @Override
- protected boolean hasClosed()
- {
- return (_needToClose.get() && getState() != State.CLOSED);
- }
-
public void flushBatched()
{
_channel.getConnection().setDeferFlush(false);
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Mon Nov 7 15:15:00 2016
@@ -110,7 +110,7 @@ import org.apache.qpid.server.virtualhos
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.server.transport.AggregateTicker;
-public class AMQPConnection_1_0 extends AbstractAMQPConnection<AMQPConnection_1_0>
+public class AMQPConnection_1_0 extends AbstractAMQPConnection<AMQPConnection_1_0, ConnectionHandler>
implements FrameOutputHandler, DescribedTypeConstructorRegistry.Source,
ValueWriter.Registry.Source,
ErrorHandler,
@@ -1377,6 +1377,12 @@ public class AMQPConnection_1_0 extends
}
@Override
+ public void notifyWork(final AMQSessionModel<?> sessionModel)
+ {
+ notifyWork();
+ }
+
+ @Override
public void clearWork()
{
_stateChanged.set(false);
@@ -1444,7 +1450,8 @@ public class AMQPConnection_1_0 extends
return _orderlyClose.get();
}
- private void addAsyncTask(final Action<ConnectionHandler> action)
+ @Override
+ protected void addAsyncTask(final Action<? super ConnectionHandler> action)
{
_asyncTaskList.add(action);
notifyWork();
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Mon Nov 7 15:15:00 2016
@@ -21,12 +21,20 @@
package org.apache.qpid.server.protocol.v1_0;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
+import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.LinkRegistry;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl;
@@ -34,7 +42,6 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
-import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.Target;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
@@ -42,20 +49,10 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.consumer.AbstractConsumerTarget;
-import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.LinkRegistry;
-import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -303,7 +300,11 @@ class ConsumerTarget_1_0 extends Abstrac
public void queueEmpty()
{
- _queueEmpty = true;
+ if(_link.drained())
+ {
+ updateState(State.ACTIVE, State.SUSPENDED);
+ }
+
}
public void flowStateChanged()
@@ -528,38 +529,6 @@ class ConsumerTarget_1_0 extends Abstrac
}
@Override
- protected void processClosed()
- {
-
- }
-
- @Override
- protected void processStateChanged()
- {
- if(_queueEmpty)
- {
- _queueEmpty = false;
-
- if(_link.drained())
- {
- updateState(State.ACTIVE, State.SUSPENDED);
- }
- }
- }
-
- @Override
- protected boolean hasStateChanged()
- {
- return _queueEmpty;
- }
-
- @Override
- protected boolean hasClosed()
- {
- return false;
- }
-
- @Override
public String toString()
{
return "ConsumerTarget_1_0[linkSession=" + _link.getSession().toLogString() + "]";
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Mon Nov 7 15:15:00 2016
@@ -1578,20 +1578,9 @@ public class Session_1_0 implements AMQS
}
@Override
- public void ensureConsumersNoticedStateChange()
+ public void notifyWork(final ConsumerTarget target)
{
- for(SendingLink_1_0 link : _sendingLinks)
- {
- ConsumerTarget_1_0 consumerTarget = link.getConsumerTarget();
- try
- {
- consumerTarget.getSendLock();
- }
- finally
- {
- consumerTarget.releaseSendLock();
- }
- }
+ getAMQPConnection().notifyWork(this);
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/BrokerFileLoggerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/BrokerFileLoggerImpl.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/BrokerFileLoggerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/BrokerFileLoggerImpl.java Mon Nov 7 15:15:00 2016
@@ -34,23 +34,24 @@ import ch.qos.logback.classic.spi.ILoggi
import ch.qos.logback.core.Appender;
import ch.qos.logback.core.Context;
import ch.qos.logback.core.rolling.RollingFileAppender;
-
import ch.qos.logback.core.status.Status;
import ch.qos.logback.core.status.StatusListener;
import ch.qos.logback.core.status.StatusManager;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.qpid.server.logging.LogFileDetails;
import org.apache.qpid.server.logging.messages.BrokerMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Content;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
-import org.apache.qpid.server.model.Content;
import org.apache.qpid.server.model.Param;
import org.apache.qpid.server.model.SystemConfig;
import org.apache.qpid.server.util.DaemonThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class BrokerFileLoggerImpl extends AbstractBrokerLogger<BrokerFileLoggerImpl>
implements BrokerFileLogger<BrokerFileLoggerImpl>, FileLoggerSettings
@@ -209,14 +210,14 @@ public class BrokerFileLoggerImpl extend
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
- super.onClose();
-
if (_statusManager != null)
{
_statusManager.remove(_logbackStatusListener);
}
+ return Futures.immediateFuture(null);
+
}
static class BrokerFileLoggerStatusListener implements StatusListener
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Mon Nov 7 15:15:00 2016
@@ -73,24 +73,16 @@ class ManagementNodeConsumer implements
@Override
public AbstractQueue.MessageContainer pullMessage()
{
- _target.getSendLock();
- try
+ if (!_queue.isEmpty())
{
- if (!_queue.isEmpty())
- {
- final ManagementResponse managementResponse = _queue.get(0);
- if (!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage()))
- {
- _queue.remove(0);
- return new AbstractQueue.MessageContainer(managementResponse, null);
- }
+ final ManagementResponse managementResponse = _queue.get(0);
+ if (!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage()))
+ {
+ _queue.remove(0);
+ return new AbstractQueue.MessageContainer(managementResponse, null);
}
}
- finally
- {
- _target.releaseSendLock();
- }
return null;
}
@@ -165,24 +157,6 @@ class ManagementNodeConsumer implements
{
}
- @Override
- public boolean trySendLock()
- {
- return _target.trySendLock();
- }
-
- @Override
- public void getSendLock()
- {
- _target.getSendLock();
- }
-
- @Override
- public void releaseSendLock()
- {
- _target.releaseSendLock();
- }
-
@Override
public boolean isActive()
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java?rev=1768530&r1=1768529&r2=1768530&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java Mon Nov 7 15:15:00 2016
@@ -226,7 +226,7 @@ public class HttpManagement extends Abst
}
@Override
- protected void onClose()
+ protected ListenableFuture<Void> onClose()
{
if (_server != null)
{
@@ -242,6 +242,7 @@ public class HttpManagement extends Abst
}
getBroker().getEventLogger().message(ManagementConsoleMessages.STOPPED(OPERATIONAL_LOGGING_NAME));
+ return Futures.immediateFuture(null);
}
public int getSessionTimeout()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org