You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/10/24 12:18:18 UTC
[1/3] qpid-broker-j git commit: QPID-7910: [Qpid Broker-J] [Scripts]
Revert switch from pgrep to ps as -F flag not supported on old pgrep
versions. Reinstate kill -9 if the processes don't go away sufficiently
quickly.
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 2acc60129 -> cd12e2dbc
QPID-7910: [Qpid Broker-J] [Scripts] Revert switch from pgrep to ps as -F flag not supported on old pgrep versions.
Reinstate kill -9 if the processes don't go away sufficiently quickly.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/cd12e2db
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/cd12e2db
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/cd12e2db
Branch: refs/heads/master
Commit: cd12e2dbcafba3013349df0858fdb07c9c6b30c7
Parents: 8031475
Author: Keith Wall <kw...@apache.org>
Authored: Tue Oct 24 13:16:12 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Oct 24 13:17:54 2017 +0100
----------------------------------------------------------------------
broker/bin/qpid.stop | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/cd12e2db/broker/bin/qpid.stop
----------------------------------------------------------------------
diff --git a/broker/bin/qpid.stop b/broker/bin/qpid.stop
index c55700e..f91b21f 100755
--- a/broker/bin/qpid.stop
+++ b/broker/bin/qpid.stop
@@ -41,7 +41,7 @@ shutdown_brokers()
declare -a monitored_pids=()
for pid in "${pids[@]}"
do
- echo "Killing Qpid Broker-J with PID '$pid'"
+ echo "Shutting down Qpid Broker-J with PID '$pid'"
if kill $pid 2>/dev/null; then
monitored_pids+=($pid)
else
@@ -49,7 +49,7 @@ shutdown_brokers()
fi
done
- echo "Waiting for up to $MAX_WAIT_PERIOD seconds for process(es) to terminate..."
+ echo "Waiting for up to $MAX_WAIT_PERIOD seconds for process(es) to shutdown..."
end_time=$(($SECONDS+$MAX_WAIT_PERIOD))
while [[ ${#monitored_pids[@]} -ne 0 && "${SECONDS}" -lt ${end_time} ]]
do
@@ -71,7 +71,12 @@ shutdown_brokers()
done
if [[ ${#monitored_pids[@]} -ne 0 ]]; then
- echo "Process(es) with PID(s) ${monitored_pids[@]} not terminated within ${MAX_WAIT_PERIOD} seconds. Please, investigate..."
+ echo "Process(es) with PID(s) ${monitored_pids[@]} did not shutdown within ${MAX_WAIT_PERIOD} seconds. Killing processes."
+ for pid in "${monitored_pids[@]}"
+ do
+ echo "Killing Qpid Broker-J with PID '$pid'"
+ kill -9 ${pid} 2>/dev/null
+ done
exit 1
else
echo "Qpid Broker-J process(es) terminated successfully"
@@ -90,17 +95,14 @@ main()
shutdown_brokers "${pids[@]}"
fi
elif [[ $arg_length -eq 1 && "$1" == "-h" ]] ; then
- echo "$0: script tries to stop instances of Qpid Broker-J with given PIDs or all running Qpid Brokers if no PID is provided."
+ echo "$0: script tries to stop instances of Qpid Broker-J with given PIDs or all running Qpid Broker-J instances if no PID is provided."
echo "usage: $0 [pid...]"
else
pids=( "$@" )
declare -a broker_pids=()
- pidfile=$(mktemp)
- trap "rm -f ${pidfile} || true" EXIT
for pid in "${pids[@]}"
do
- echo ${pid} > ${pidfile}
- pgrep -f -U "${USER}" -F "${pidfile}" -- "${SEARCH}" >/dev/null
+ ps -f -p $pid | grep -- "${SEARCH}" | grep -v grep >/dev/null
if [[ $? -eq 0 ]]; then
broker_pids+=($pid)
else
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/3] qpid-broker-j git commit: QPID-7971: [Qpid Broker-J] Ensure
that consumers are notified no-work during consumer target close
Posted by kw...@apache.org.
QPID-7971: [Qpid Broker-J] Ensure that consumers are notified no-work during consumer target close
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/d1b74e42
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/d1b74e42
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/d1b74e42
Branch: refs/heads/master
Commit: d1b74e424d2bf71004357cf7b3e968b68b1aed10
Parents: 2acc601
Author: Keith Wall <kw...@apache.org>
Authored: Tue Oct 24 10:40:21 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Oct 24 13:17:54 2017 +0100
----------------------------------------------------------------------
.../server/consumer/AbstractConsumerTarget.java | 51 ++++++------------
.../consumer/AbstractConsumerTargetTest.java | 54 ++++++++++++++++++--
2 files changed, 67 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d1b74e42/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index 2ef82cf..76d4652 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -51,14 +51,7 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>
{
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumerTarget.class);
- private static final LogSubject MULTI_QUEUE_LOG_SUBJECT = new LogSubject()
- {
- @Override
- public String toLogString()
- {
- return "[(** Multi-Queue **)] ";
- }
- };
+ private static final LogSubject MULTI_QUEUE_LOG_SUBJECT = () -> "[(** Multi-Queue **)] ";
protected final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
protected final AtomicLong _unacknowledgedCount = new AtomicLong(0);
private final AtomicReference<State> _state = new AtomicReference<>(State.OPEN);
@@ -66,11 +59,11 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>
private final boolean _isMultiQueue;
private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
private final List<MessageInstanceConsumer> _consumers = new CopyOnWriteArrayList<>();
-
- private Iterator<MessageInstanceConsumer> _pullIterator;
- private boolean _notifyWorkDesired;
private final AtomicBoolean _scheduled = new AtomicBoolean();
+ private volatile Iterator<MessageInstanceConsumer> _pullIterator;
+ private volatile boolean _notifyWorkDesired;
+
protected AbstractConsumerTarget(final boolean isMultiQueue,
final AMQPConnection<?> amqpConnection)
{
@@ -122,17 +115,14 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>
{
if (desired != _notifyWorkDesired)
{
- if(_suspendedConsumerLoggingTicker != null)
+ if (desired)
{
- if (desired)
- {
- getSession().removeTicker(_suspendedConsumerLoggingTicker);
- }
- else
- {
- _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
- getSession().addTicker(_suspendedConsumerLoggingTicker);
- }
+ getSession().removeTicker(_suspendedConsumerLoggingTicker);
+ }
+ else
+ {
+ _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
+ getSession().addTicker(_suspendedConsumerLoggingTicker);
}
for (MessageInstanceConsumer consumer : _consumers)
@@ -174,14 +164,7 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>
if(_consumers.contains(sub))
{
return doOnIoThreadAsync(
- new Runnable()
- {
- @Override
- public void run()
- {
- consumerRemovedInternal(sub);
- }
- });
+ () -> consumerRemovedInternal(sub));
}
else
{
@@ -355,19 +338,17 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>
{
if (_state.compareAndSet(State.OPEN, State.CLOSED))
{
+ setNotifyWorkDesired(false);
+
List<MessageInstanceConsumer> consumers = new ArrayList<>(_consumers);
_consumers.clear();
- setNotifyWorkDesired(false);
-
for (MessageInstanceConsumer consumer : consumers)
{
consumer.close();
}
- if (_suspendedConsumerLoggingTicker != null)
- {
- getSession().removeTicker(_suspendedConsumerLoggingTicker);
- }
+
+ getSession().removeTicker(_suspendedConsumerLoggingTicker);
return true;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d1b74e42/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
index e29458a..8e41a9c 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
@@ -22,11 +22,16 @@ package org.apache.qpid.server.consumer;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import org.mockito.InOrder;
+
import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
@@ -49,7 +54,8 @@ public class AbstractConsumerTargetTest extends QpidTestCase
private TestAbstractConsumerTarget _consumerTarget;
private Consumer _consumer;
private MessageSource _messageSource;
- private AMQPConnection _connection = mock(AMQPConnection.class);
+ private AMQPConnection<?> _connection = mock(AMQPConnection.class);
+ private AMQPSession<?,TestAbstractConsumerTarget> _session = mock(AMQPSession.class);
private MessageInstance _messageInstance;
@Override
@@ -71,6 +77,48 @@ public class AbstractConsumerTargetTest extends QpidTestCase
_consumerTarget.consumerAdded(_consumer);
}
+ public void testClose() throws Exception
+ {
+ _consumerTarget = new TestAbstractConsumerTarget();
+ assertEquals("Unexpected number of consumers", 0, _consumerTarget.getConsumers().size());
+
+ _consumerTarget.consumerAdded(_consumer);
+ assertEquals("Unexpected number of consumers after add", 1, _consumerTarget.getConsumers().size());
+
+ _consumerTarget.close();
+ assertEquals("Unexpected number of consumers after close", 0, _consumerTarget.getConsumers().size());
+
+ verify(_consumer, times(1)).close();
+ }
+
+ public void testNotifyWork() throws Exception
+ {
+ InOrder order = inOrder(_consumer);
+
+ _consumerTarget = new TestAbstractConsumerTarget();
+ assertEquals("Unexpected number of consumers", 0, _consumerTarget.getConsumers().size());
+
+ _consumerTarget.consumerAdded(_consumer);
+
+ _consumerTarget.setNotifyWorkDesired(true);
+ order.verify(_consumer, times(1)).setNotifyWorkDesired(true);
+
+ _consumerTarget.setNotifyWorkDesired(false);
+ order.verify(_consumer, times(1)).setNotifyWorkDesired(false);
+
+ _consumerTarget.setNotifyWorkDesired(true);
+ order.verify(_consumer, times(1)).setNotifyWorkDesired(true);
+
+ _consumerTarget.setNotifyWorkDesired(true);
+ // no change of state - should not be propagated to the consumer
+
+ _consumerTarget.close();
+ order.verify(_consumer, times(1)).setNotifyWorkDesired(false);
+ order.verify(_consumer, times(1)).close();
+
+ verifyNoMoreInteractions(_consumer);
+ }
+
public void testConversionExceptionPolicyClose() throws Exception
{
configureBehaviour(true, MessageSource.MessageConversionExceptionHandlingPolicy.CLOSE);
@@ -199,13 +247,13 @@ public class AbstractConsumerTargetTest extends QpidTestCase
@Override
public void updateNotifyWorkDesired()
{
-
+ throw new UnsupportedOperationException();
}
@Override
public AMQPSession<?, TestAbstractConsumerTarget> getSession()
{
- return null;
+ return _session;
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/3] qpid-broker-j git commit: QPID-7986: [Qpid Broker-J] [AMQP0-10]
Remove redundant casts left by collapse of ServerConnection/SessionSession
etc. No functional changes.
Posted by kw...@apache.org.
QPID-7986: [Qpid Broker-J] [AMQP0-10] Remove redundant casts left by collapse of ServerConnection/SessionSession etc. No functional changes.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/80314758
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/80314758
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/80314758
Branch: refs/heads/master
Commit: 8031475838849d6509510981975c3af0533e252f
Parents: d1b74e4
Author: Keith Wall <kw...@apache.org>
Authored: Tue Oct 24 10:46:24 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Oct 24 13:17:54 2017 +0100
----------------------------------------------------------------------
.../server/protocol/v0_10/ServerSession.java | 8 +-
.../protocol/v0_10/ServerSessionDelegate.java | 113 +++++++++----------
2 files changed, 60 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/80314758/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 2d4a731..aac916a 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -110,7 +110,7 @@ public class ServerSession extends SessionInvoker
private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
- private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
+ private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<>());
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
@@ -136,7 +136,7 @@ public class ServerSession extends SessionInvoker
private int syncPoint;
// outgoing command count
private int commandsOut = 0;
- private Map<Integer,Method> commands = new HashMap<Integer, Method>();
+ private Map<Integer,Method> commands = new HashMap<>();
private int commandBytes = 0;
private int byteLimit = Integer.getInteger("qpid.session.byte_limit", 1024 * 1024);
private int maxComplete = commandsOut - 1;
@@ -146,11 +146,11 @@ public class ServerSession extends SessionInvoker
private boolean transacted = false;
private SessionDetachCode detachCode;
private boolean _isNoReplay = false;
- private Map<Integer,ResultFuture<?>> results = new HashMap<Integer,ResultFuture<?>>();
+ private Map<Integer,ResultFuture<?>> results = new HashMap<>();
private org.apache.qpid.server.protocol.v0_10.transport.ExecutionException exception = null;
private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
- new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
+ new ConcurrentSkipListMap<>();
private ServerTransaction _transaction;
private final AtomicLong _txnStarts = new AtomicLong(0);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/80314758/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index fa37c17..ab5581c 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -104,9 +104,9 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
{
if(!session.isClosing())
{
- Object asyncCommandMark = ((ServerSession)session).getAsyncCommandMark();
+ Object asyncCommandMark = session.getAsyncCommandMark();
command(session, method, false);
- Object newOutstanding = ((ServerSession)session).getAsyncCommandMark();
+ Object newOutstanding = session.getAsyncCommandMark();
if(newOutstanding == null || newOutstanding == asyncCommandMark)
{
session.processed(method);
@@ -114,12 +114,12 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
if(newOutstanding != null)
{
- ((ServerSession)session).completeAsyncCommands();
+ session.completeAsyncCommands();
}
if (method.isSync())
{
- ((ServerSession)session).awaitCommandCompletion();
+ session.awaitCommandCompletion();
session.flushProcessed();
}
}
@@ -138,7 +138,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
@Override
public void messageAccept(ServerSession session, MessageAccept method)
{
- final ServerSession serverSession = (ServerSession) session;
+ final ServerSession serverSession = session;
serverSession.accept(method.getTransfers());
if(!serverSession.isTransactional())
{
@@ -150,19 +150,19 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
@Override
public void messageReject(ServerSession session, MessageReject method)
{
- ((ServerSession)session).reject(method.getTransfers());
+ session.reject(method.getTransfers());
}
@Override
public void messageRelease(ServerSession session, MessageRelease method)
{
- ((ServerSession)session).release(method.getTransfers(), method.getSetRedelivered());
+ session.release(method.getTransfers(), method.getSetRedelivered());
}
@Override
public void messageAcquire(ServerSession session, MessageAcquire method)
{
- RangeSet acquiredRanges = ((ServerSession)session).acquire(method.getTransfers());
+ RangeSet acquiredRanges = session.acquire(method.getTransfers());
Acquired result = new Acquired(acquiredRanges);
@@ -209,7 +209,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
{
exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Subscriber must provide a destination. The protocol specification marking the destination argument as optional is considered a mistake.");
}
- else if(((ServerSession)session).getSubscription(destination) != null)
+ else if(session.getSubscription(destination) != null)
{
exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Subscription already exists with destination '"+destination+"'");
}
@@ -252,7 +252,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
{
exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
}
- else if(!verifySessionAccess((ServerSession) session, sources))
+ else if(!verifySessionAccess(session, sources))
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
@@ -309,7 +309,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
}
boolean multiQueue = sources.size()>1;
- ConsumerTarget_0_10 target = new ConsumerTarget_0_10((ServerSession)session, destination,
+ ConsumerTarget_0_10 target = new ConsumerTarget_0_10(session, destination,
method.getAcceptMode(),
method.getAcquireMode(),
MessageFlowMode.WINDOW,
@@ -338,7 +338,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
}
}
- ((ServerSession)session).register(destination, target);
+ session.register(destination, target);
try
{
EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
@@ -418,18 +418,17 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
{
try
{
- ServerSession serverSession = (ServerSession) ssn;
- if(serverSession.blockingTimeoutExceeded())
+ if(ssn.blockingTimeoutExceeded())
{
getEventLogger(ssn).message(ChannelMessages.FLOW_CONTROL_IGNORED());
- serverSession.close(ErrorCodes.MESSAGE_TOO_LARGE,
- "Session flow control was requested, but not enforced by sender");
+ ssn.close(ErrorCodes.MESSAGE_TOO_LARGE,
+ "Session flow control was requested, but not enforced by sender");
}
- else if(xfr.getBodySize() > serverSession.getConnection().getMaxMessageSize())
+ else if(xfr.getBodySize() > ssn.getConnection().getMaxMessageSize())
{
exception(ssn, xfr, ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED,
- "Message size of " + xfr.getBodySize() + " greater than allowed maximum of " + serverSession.getConnection().getMaxMessageSize());
+ "Message size of " + xfr.getBodySize() + " greater than allowed maximum of " + ssn.getConnection().getMaxMessageSize());
}
else
{
@@ -447,8 +446,9 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
final NamedAddressSpace virtualHost = getAddressSpace(ssn);
try
{
- serverSession.getAMQPConnection().checkAuthorizedMessagePrincipal(getMessageUserId(xfr));
- serverSession.authorisePublish(destination, messageMetaData.getRoutingKey(), messageMetaData.isImmediate(), serverSession.getAMQPConnection().getLastReadTime());
+ ssn.getAMQPConnection().checkAuthorizedMessagePrincipal(getMessageUserId(xfr));
+ ssn.authorisePublish(destination, messageMetaData.getRoutingKey(), messageMetaData.isImmediate(), ssn
+ .getAMQPConnection().getLastReadTime());
}
catch (AccessControlException e)
@@ -462,7 +462,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
final MessageStore store = virtualHost.getMessageStore();
final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
final MessageTransferMessage message =
- new MessageTransferMessage(storeMessage, serverSession.getReference());
+ new MessageTransferMessage(storeMessage, ssn.getReference());
MessageReference<MessageTransferMessage> reference = message.newReference();
try
@@ -490,12 +490,12 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
}
};
- RoutingResult<MessageTransferMessage> routingResult = serverSession.enqueue(message, instanceProperties, destination);
+ RoutingResult<MessageTransferMessage> routingResult = ssn.enqueue(message, instanceProperties, destination);
boolean explictlyRejected = routingResult.containsReject(RejectType.LIMIT_EXCEEDED);
if (!routingResult.hasRoutes() || explictlyRejected)
{
- boolean closeWhenNoRoute = serverSession.getAMQPConnection().getPort().getCloseWhenNoRoute();
+ boolean closeWhenNoRoute = ssn.getAMQPConnection().getPort().getCloseWhenNoRoute();
boolean discardUnroutable = delvProps != null && delvProps.getDiscardUnroutable();
if (!discardUnroutable && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
{
@@ -515,8 +515,8 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
ExecutionException ex = new ExecutionException();
ex.setErrorCode(code);
ex.setDescription(errorMessage);
- serverSession.invoke(ex);
- serverSession.close(ErrorCodes.RESOURCE_ERROR, errorMessage);
+ ssn.invoke(ex);
+ ssn.close(ErrorCodes.RESOURCE_ERROR, errorMessage);
return;
}
else
@@ -527,19 +527,19 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
}
// TODO: we currently do not send MessageAccept when AcceptMode is EXPLICIT
- if (serverSession.isTransactional())
+ if (ssn.isTransactional())
{
- serverSession.processed(xfr);
+ ssn.processed(xfr);
}
else
{
- serverSession.recordFuture(Futures.immediateFuture(null),
- new CommandProcessedAction(serverSession, xfr));
+ ssn.recordFuture(Futures.immediateFuture(null),
+ new CommandProcessedAction(ssn, xfr));
}
}
catch (VirtualHostUnavailableException e)
{
- getServerConnection(serverSession).sendConnectionCloseAsync(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
+ getServerConnection(ssn).sendConnectionCloseAsync(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
}
finally
{
@@ -571,7 +571,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
{
String destination = method.getDestination();
- ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = session.getSubscription(destination);
if(sub == null)
{
@@ -579,7 +579,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
}
else
{
- ((ServerSession)session).unregister(sub);
+ session.unregister(sub);
}
}
@@ -588,7 +588,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
{
String destination = method.getDestination();
- ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = session.getSubscription(destination);
if(sub == null)
{
@@ -604,28 +604,28 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
public void txSelect(ServerSession session, TxSelect method)
{
// TODO - check current tx mode
- ((ServerSession)session).selectTx();
+ session.selectTx();
}
@Override
public void txCommit(ServerSession session, TxCommit method)
{
// TODO - check current tx mode
- ((ServerSession)session).commit();
+ session.commit();
}
@Override
public void txRollback(ServerSession session, TxRollback method)
{
// TODO - check current tx mode
- ((ServerSession)session).rollback();
+ session.rollback();
}
@Override
public void dtxSelect(ServerSession session, DtxSelect method)
{
// TODO - check current tx mode
- ((ServerSession)session).selectDtx();
+ session.selectDtx();
}
@Override
@@ -635,7 +635,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
result.setStatus(DtxXaStatus.XA_OK);
try
{
- ((ServerSession)session).startDtx(method.getXid(), method.getJoin(), method.getResume());
+ session.startDtx(method.getXid(), method.getJoin(), method.getResume());
session.executionResult(method.getId(), result);
}
catch(JoinAndResumeDtxException e)
@@ -667,7 +667,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
{
try
{
- ((ServerSession) session).endDtx(method.getXid(), method.getFail(), method.getSuspend());
+ session.endDtx(method.getXid(), method.getFail(), method.getSuspend());
}
catch (TimeoutDtxException e)
{
@@ -703,7 +703,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
{
try
{
- ((ServerSession)session).commitDtx(method.getXid(), method.getOnePhase());
+ session.commitDtx(method.getXid(), method.getOnePhase());
}
catch (RollbackOnlyDtxException e)
{
@@ -735,7 +735,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
{
try
{
- ((ServerSession)session).forgetDtx(method.getXid());
+ session.forgetDtx(method.getXid());
}
catch(UnknownDtxBranchException e)
{
@@ -754,7 +754,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
GetTimeoutResult result = new GetTimeoutResult();
try
{
- result.setTimeout(((ServerSession) session).getTimeoutDtx(method.getXid()));
+ result.setTimeout(session.getTimeoutDtx(method.getXid()));
session.executionResult(method.getId(), result);
}
catch(UnknownDtxBranchException e)
@@ -772,7 +772,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
{
try
{
- ((ServerSession)session).prepareDtx(method.getXid());
+ session.prepareDtx(method.getXid());
}
catch (RollbackOnlyDtxException e)
{
@@ -803,7 +803,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
public void dtxRecover(ServerSession session, DtxRecover method)
{
RecoverResult result = new RecoverResult();
- List inDoubt = ((ServerSession)session).recoverDtx();
+ List inDoubt = session.recoverDtx();
result.setInDoubt(inDoubt);
session.executionResult(method.getId(), result);
}
@@ -818,7 +818,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
{
try
{
- ((ServerSession)session).rollbackDtx(method.getXid());
+ session.rollbackDtx(method.getXid());
}
catch (TimeoutDtxException e)
{
@@ -846,7 +846,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
{
try
{
- ((ServerSession)session).setTimeoutDtx(method.getXid(), method.getTimeout());
+ session.setTimeoutDtx(method.getXid(), method.getTimeout());
}
catch(UnknownDtxBranchException e)
{
@@ -1003,7 +1003,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
session.invoke(ex);
- ((ServerSession)session).close(errorCode.getValue(), description);
+ session.close(errorCode.getValue(), description);
}
private Exchange<?> getExchange(ServerSession session, String exchangeName)
@@ -1528,7 +1528,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
exception(session, method, errorCode, description);
}
- else if (!verifySessionAccess((ServerSession) session, queue))
+ else if (!verifySessionAccess(session, queue))
{
String description = "Cannot passively declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
@@ -1589,7 +1589,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
catch(AbstractConfiguredObject.DuplicateNameException qe)
{
queue = (Queue<?>) qe.getExisting();
- if (!verifySessionAccess((ServerSession) session, queue))
+ if (!verifySessionAccess(session, queue))
{
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
@@ -1640,7 +1640,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
}
else
{
- if(!verifySessionAccess((ServerSession) session, queue))
+ if(!verifySessionAccess(session, queue))
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
@@ -1745,7 +1745,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
{
String destination = sfm.getDestination();
- ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = session.getSubscription(destination);
if(sub == null)
{
@@ -1766,7 +1766,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
{
String destination = stop.getDestination();
- ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = session.getSubscription(destination);
if(sub == null)
{
@@ -1784,7 +1784,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
{
String destination = flow.getDestination();
- ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = session.getSubscription(destination);
if(sub == null)
{
@@ -1799,11 +1799,10 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
public void closed(ServerSession session)
{
- ServerSession serverSession = (ServerSession)session;
- serverSession.stopSubscriptions();
- serverSession.onClose();
- serverSession.unregisterSubscriptions();
+ session.stopSubscriptions();
+ session.onClose();
+ session.unregisterSubscriptions();
}
public void detached(ServerSession session)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org