You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/10/24 12:18:18 UTC

[1/3] qpid-broker-j git commit: QPID-7910: [Qpid Broker-J] [Scripts] Revert switch from pgrep to ps as -F flag not supported on old pgrep versions. Reinstate kill -9 if the processes don't go away sufficiently quickly.

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 2acc60129 -> cd12e2dbc


QPID-7910: [Qpid Broker-J] [Scripts] Revert switch from pgrep to ps as -F flag not supported on old pgrep versions.
Reinstate kill -9 if the processes don't go away sufficiently quickly.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/cd12e2db
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/cd12e2db
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/cd12e2db

Branch: refs/heads/master
Commit: cd12e2dbcafba3013349df0858fdb07c9c6b30c7
Parents: 8031475
Author: Keith Wall <kw...@apache.org>
Authored: Tue Oct 24 13:16:12 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Oct 24 13:17:54 2017 +0100

----------------------------------------------------------------------
 broker/bin/qpid.stop | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/cd12e2db/broker/bin/qpid.stop
----------------------------------------------------------------------
diff --git a/broker/bin/qpid.stop b/broker/bin/qpid.stop
index c55700e..f91b21f 100755
--- a/broker/bin/qpid.stop
+++ b/broker/bin/qpid.stop
@@ -41,7 +41,7 @@ shutdown_brokers()
     declare -a monitored_pids=()
     for pid in "${pids[@]}"
     do
-        echo "Killing Qpid Broker-J with PID '$pid'"
+        echo "Shutting down Qpid Broker-J with PID '$pid'"
         if kill $pid 2>/dev/null; then
             monitored_pids+=($pid)
         else
@@ -49,7 +49,7 @@ shutdown_brokers()
         fi
     done
 
-    echo "Waiting for up to $MAX_WAIT_PERIOD seconds for process(es) to terminate..."
+    echo "Waiting for up to $MAX_WAIT_PERIOD seconds for process(es) to shutdown..."
     end_time=$(($SECONDS+$MAX_WAIT_PERIOD))
     while [[ ${#monitored_pids[@]} -ne 0 && "${SECONDS}" -lt ${end_time} ]]
     do
@@ -71,7 +71,12 @@ shutdown_brokers()
     done
 
     if [[ ${#monitored_pids[@]} -ne 0 ]]; then
-        echo "Process(es) with PID(s) ${monitored_pids[@]} not terminated within ${MAX_WAIT_PERIOD} seconds. Please, investigate..."
+        echo "Process(es) with PID(s) ${monitored_pids[@]} did not shutdown within ${MAX_WAIT_PERIOD} seconds. Killing processes."
+        for pid in "${monitored_pids[@]}"
+        do
+            echo "Killing Qpid Broker-J with PID '$pid'"
+            kill -9 ${pid} 2>/dev/null
+        done
         exit 1
     else
         echo "Qpid Broker-J process(es) terminated successfully"
@@ -90,17 +95,14 @@ main()
             shutdown_brokers "${pids[@]}"
         fi
     elif [[ $arg_length -eq 1 && "$1" == "-h" ]] ; then
-        echo "$0: script tries to stop instances of Qpid Broker-J with given PIDs or all running Qpid Brokers if no PID is provided."
+        echo "$0: script tries to stop instances of Qpid Broker-J with given PIDs or all running Qpid Broker-J instances if no PID is provided."
         echo "usage: $0 [pid...]"
     else
         pids=( "$@" )
         declare -a broker_pids=()
-        pidfile=$(mktemp)
-        trap "rm -f ${pidfile} || true" EXIT
         for pid in "${pids[@]}"
         do
-            echo ${pid} > ${pidfile}
-            pgrep -f -U "${USER}" -F "${pidfile}" -- "${SEARCH}" >/dev/null
+            ps -f -p $pid | grep -- "${SEARCH}" | grep -v grep >/dev/null
             if [[ $? -eq 0 ]]; then
                 broker_pids+=($pid)
             else


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-broker-j git commit: QPID-7971: [Qpid Broker-J] Ensure that consumers are notified no-work during consumer target close

Posted by kw...@apache.org.
QPID-7971: [Qpid Broker-J] Ensure that consumers are notified no-work during consumer target close


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/d1b74e42
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/d1b74e42
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/d1b74e42

Branch: refs/heads/master
Commit: d1b74e424d2bf71004357cf7b3e968b68b1aed10
Parents: 2acc601
Author: Keith Wall <kw...@apache.org>
Authored: Tue Oct 24 10:40:21 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Oct 24 13:17:54 2017 +0100

----------------------------------------------------------------------
 .../server/consumer/AbstractConsumerTarget.java | 51 ++++++------------
 .../consumer/AbstractConsumerTargetTest.java    | 54 ++++++++++++++++++--
 2 files changed, 67 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d1b74e42/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index 2ef82cf..76d4652 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -51,14 +51,7 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumerTarget.class);
 
-    private static final LogSubject MULTI_QUEUE_LOG_SUBJECT = new LogSubject()
-    {
-        @Override
-        public String toLogString()
-        {
-            return "[(** Multi-Queue **)] ";
-        }
-    };
+    private static final LogSubject MULTI_QUEUE_LOG_SUBJECT = () -> "[(** Multi-Queue **)] ";
     protected final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
     protected final AtomicLong _unacknowledgedCount = new AtomicLong(0);
     private final AtomicReference<State> _state = new AtomicReference<>(State.OPEN);
@@ -66,11 +59,11 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>
     private final boolean _isMultiQueue;
     private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
     private final List<MessageInstanceConsumer> _consumers = new CopyOnWriteArrayList<>();
-
-    private Iterator<MessageInstanceConsumer> _pullIterator;
-    private boolean _notifyWorkDesired;
     private final AtomicBoolean _scheduled = new AtomicBoolean();
 
+    private volatile Iterator<MessageInstanceConsumer> _pullIterator;
+    private volatile boolean _notifyWorkDesired;
+
     protected AbstractConsumerTarget(final boolean isMultiQueue,
                                      final AMQPConnection<?> amqpConnection)
     {
@@ -122,17 +115,14 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>
     {
         if (desired != _notifyWorkDesired)
         {
-            if(_suspendedConsumerLoggingTicker != null)
+            if (desired)
             {
-                if (desired)
-                {
-                    getSession().removeTicker(_suspendedConsumerLoggingTicker);
-                }
-                else
-                {
-                    _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
-                    getSession().addTicker(_suspendedConsumerLoggingTicker);
-                }
+                getSession().removeTicker(_suspendedConsumerLoggingTicker);
+            }
+            else
+            {
+                _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
+                getSession().addTicker(_suspendedConsumerLoggingTicker);
             }
 
             for (MessageInstanceConsumer consumer : _consumers)
@@ -174,14 +164,7 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>
         if(_consumers.contains(sub))
         {
             return doOnIoThreadAsync(
-                    new Runnable()
-                    {
-                        @Override
-                        public void run()
-                        {
-                            consumerRemovedInternal(sub);
-                        }
-                    });
+                    () -> consumerRemovedInternal(sub));
         }
         else
         {
@@ -355,19 +338,17 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>
     {
         if (_state.compareAndSet(State.OPEN, State.CLOSED))
         {
+            setNotifyWorkDesired(false);
+
             List<MessageInstanceConsumer> consumers = new ArrayList<>(_consumers);
             _consumers.clear();
 
-            setNotifyWorkDesired(false);
-
             for (MessageInstanceConsumer consumer : consumers)
             {
                 consumer.close();
             }
-            if (_suspendedConsumerLoggingTicker != null)
-            {
-                getSession().removeTicker(_suspendedConsumerLoggingTicker);
-            }
+
+            getSession().removeTicker(_suspendedConsumerLoggingTicker);
 
             return true;
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d1b74e42/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
index e29458a..8e41a9c 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
@@ -22,11 +22,16 @@ package org.apache.qpid.server.consumer;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import org.mockito.InOrder;
+
 import org.apache.qpid.server.message.MessageContainer;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstanceConsumer;
@@ -49,7 +54,8 @@ public class AbstractConsumerTargetTest extends QpidTestCase
     private TestAbstractConsumerTarget _consumerTarget;
     private Consumer _consumer;
     private MessageSource _messageSource;
-    private AMQPConnection _connection = mock(AMQPConnection.class);
+    private AMQPConnection<?> _connection = mock(AMQPConnection.class);
+    private AMQPSession<?,TestAbstractConsumerTarget> _session = mock(AMQPSession.class);
     private MessageInstance _messageInstance;
 
     @Override
@@ -71,6 +77,48 @@ public class AbstractConsumerTargetTest extends QpidTestCase
         _consumerTarget.consumerAdded(_consumer);
     }
 
+    public void testClose() throws Exception
+    {
+        _consumerTarget = new TestAbstractConsumerTarget();
+        assertEquals("Unexpected number of consumers", 0, _consumerTarget.getConsumers().size());
+
+        _consumerTarget.consumerAdded(_consumer);
+        assertEquals("Unexpected number of consumers after add", 1, _consumerTarget.getConsumers().size());
+
+        _consumerTarget.close();
+        assertEquals("Unexpected number of consumers after close", 0, _consumerTarget.getConsumers().size());
+
+        verify(_consumer, times(1)).close();
+    }
+
+    public void testNotifyWork() throws Exception
+    {
+        InOrder order = inOrder(_consumer);
+
+        _consumerTarget = new TestAbstractConsumerTarget();
+        assertEquals("Unexpected number of consumers", 0, _consumerTarget.getConsumers().size());
+
+        _consumerTarget.consumerAdded(_consumer);
+
+        _consumerTarget.setNotifyWorkDesired(true);
+        order.verify(_consumer, times(1)).setNotifyWorkDesired(true);
+
+        _consumerTarget.setNotifyWorkDesired(false);
+        order.verify(_consumer, times(1)).setNotifyWorkDesired(false);
+
+        _consumerTarget.setNotifyWorkDesired(true);
+        order.verify(_consumer, times(1)).setNotifyWorkDesired(true);
+
+        _consumerTarget.setNotifyWorkDesired(true);
+        // no change of state - should not be propagated to the consumer
+
+        _consumerTarget.close();
+        order.verify(_consumer, times(1)).setNotifyWorkDesired(false);
+        order.verify(_consumer, times(1)).close();
+
+        verifyNoMoreInteractions(_consumer);
+    }
+
     public void testConversionExceptionPolicyClose() throws Exception
     {
         configureBehaviour(true, MessageSource.MessageConversionExceptionHandlingPolicy.CLOSE);
@@ -199,13 +247,13 @@ public class AbstractConsumerTargetTest extends QpidTestCase
         @Override
         public void updateNotifyWorkDesired()
         {
-
+            throw new UnsupportedOperationException();
         }
 
         @Override
         public AMQPSession<?, TestAbstractConsumerTarget> getSession()
         {
-            return null;
+            return _session;
         }
 
         @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-broker-j git commit: QPID-7986: [Qpid Broker-J] [AMQP0-10] Remove redundant casts left by collapse of ServerConnection/SessionSession etc. No functional changes.

Posted by kw...@apache.org.
QPID-7986: [Qpid Broker-J] [AMQP0-10] Remove redundant casts left by collapse of ServerConnection/SessionSession etc. No functional changes.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/80314758
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/80314758
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/80314758

Branch: refs/heads/master
Commit: 8031475838849d6509510981975c3af0533e252f
Parents: d1b74e4
Author: Keith Wall <kw...@apache.org>
Authored: Tue Oct 24 10:46:24 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Oct 24 13:17:54 2017 +0100

----------------------------------------------------------------------
 .../server/protocol/v0_10/ServerSession.java    |   8 +-
 .../protocol/v0_10/ServerSessionDelegate.java   | 113 +++++++++----------
 2 files changed, 60 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/80314758/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 2d4a731..aac916a 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -110,7 +110,7 @@ public class ServerSession extends SessionInvoker
     private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
     private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
 
-    private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
+    private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<>());
 
     private final AtomicBoolean _blocking = new AtomicBoolean(false);
     private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
@@ -136,7 +136,7 @@ public class ServerSession extends SessionInvoker
     private int syncPoint;
     // outgoing command count
     private int commandsOut = 0;
-    private Map<Integer,Method> commands = new HashMap<Integer, Method>();
+    private Map<Integer,Method> commands = new HashMap<>();
     private int commandBytes = 0;
     private int byteLimit = Integer.getInteger("qpid.session.byte_limit", 1024 * 1024);
     private int maxComplete = commandsOut - 1;
@@ -146,11 +146,11 @@ public class ServerSession extends SessionInvoker
     private boolean transacted = false;
     private SessionDetachCode detachCode;
     private boolean _isNoReplay = false;
-    private Map<Integer,ResultFuture<?>> results = new HashMap<Integer,ResultFuture<?>>();
+    private Map<Integer,ResultFuture<?>> results = new HashMap<>();
     private org.apache.qpid.server.protocol.v0_10.transport.ExecutionException exception = null;
 
     private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
-            new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
+            new ConcurrentSkipListMap<>();
 
     private ServerTransaction _transaction;
     private final AtomicLong _txnStarts = new AtomicLong(0);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/80314758/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index fa37c17..ab5581c 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -104,9 +104,9 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
         {
             if(!session.isClosing())
             {
-                Object asyncCommandMark = ((ServerSession)session).getAsyncCommandMark();
+                Object asyncCommandMark = session.getAsyncCommandMark();
                 command(session, method, false);
-                Object newOutstanding = ((ServerSession)session).getAsyncCommandMark();
+                Object newOutstanding = session.getAsyncCommandMark();
                 if(newOutstanding == null || newOutstanding == asyncCommandMark)
                 {
                     session.processed(method);
@@ -114,12 +114,12 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
 
                 if(newOutstanding != null)
                 {
-                    ((ServerSession)session).completeAsyncCommands();
+                    session.completeAsyncCommands();
                 }
 
                 if (method.isSync())
                 {
-                    ((ServerSession)session).awaitCommandCompletion();
+                    session.awaitCommandCompletion();
                     session.flushProcessed();
                 }
             }
@@ -138,7 +138,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
     @Override
     public void messageAccept(ServerSession session, MessageAccept method)
     {
-        final ServerSession serverSession = (ServerSession) session;
+        final ServerSession serverSession = session;
         serverSession.accept(method.getTransfers());
         if(!serverSession.isTransactional())
         {
@@ -150,19 +150,19 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
     @Override
     public void messageReject(ServerSession session, MessageReject method)
     {
-        ((ServerSession)session).reject(method.getTransfers());
+        session.reject(method.getTransfers());
     }
 
     @Override
     public void messageRelease(ServerSession session, MessageRelease method)
     {
-        ((ServerSession)session).release(method.getTransfers(), method.getSetRedelivered());
+        session.release(method.getTransfers(), method.getSetRedelivered());
     }
 
     @Override
     public void messageAcquire(ServerSession session, MessageAcquire method)
     {
-        RangeSet acquiredRanges = ((ServerSession)session).acquire(method.getTransfers());
+        RangeSet acquiredRanges = session.acquire(method.getTransfers());
 
         Acquired result = new Acquired(acquiredRanges);
 
@@ -209,7 +209,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
             {
                 exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Subscriber must provide a destination. The protocol specification marking the destination argument as optional is considered a mistake.");
             }
-            else if(((ServerSession)session).getSubscription(destination) != null)
+            else if(session.getSubscription(destination) != null)
             {
                 exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Subscription already exists with destination '"+destination+"'");
             }
@@ -252,7 +252,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
                 {
                     exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
                 }
-                else if(!verifySessionAccess((ServerSession) session, sources))
+                else if(!verifySessionAccess(session, sources))
                 {
                     exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
                 }
@@ -309,7 +309,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
                     }
 
                     boolean multiQueue = sources.size()>1;
-                    ConsumerTarget_0_10 target = new ConsumerTarget_0_10((ServerSession)session, destination,
+                    ConsumerTarget_0_10 target = new ConsumerTarget_0_10(session, destination,
                                                                          method.getAcceptMode(),
                                                                          method.getAcquireMode(),
                                                                          MessageFlowMode.WINDOW,
@@ -338,7 +338,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
                         }
                     }
 
-                    ((ServerSession)session).register(destination, target);
+                    session.register(destination, target);
                     try
                     {
                         EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
@@ -418,18 +418,17 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
     {
         try
         {
-            ServerSession serverSession = (ServerSession) ssn;
-            if(serverSession.blockingTimeoutExceeded())
+            if(ssn.blockingTimeoutExceeded())
             {
                 getEventLogger(ssn).message(ChannelMessages.FLOW_CONTROL_IGNORED());
 
-                serverSession.close(ErrorCodes.MESSAGE_TOO_LARGE,
-                                    "Session flow control was requested, but not enforced by sender");
+                ssn.close(ErrorCodes.MESSAGE_TOO_LARGE,
+                          "Session flow control was requested, but not enforced by sender");
             }
-            else if(xfr.getBodySize() > serverSession.getConnection().getMaxMessageSize())
+            else if(xfr.getBodySize() > ssn.getConnection().getMaxMessageSize())
             {
                 exception(ssn, xfr, ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED,
-                          "Message size of " + xfr.getBodySize() + " greater than allowed maximum of " + serverSession.getConnection().getMaxMessageSize());
+                          "Message size of " + xfr.getBodySize() + " greater than allowed maximum of " + ssn.getConnection().getMaxMessageSize());
             }
             else
             {
@@ -447,8 +446,9 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
                 final NamedAddressSpace virtualHost = getAddressSpace(ssn);
                 try
                 {
-                    serverSession.getAMQPConnection().checkAuthorizedMessagePrincipal(getMessageUserId(xfr));
-                    serverSession.authorisePublish(destination, messageMetaData.getRoutingKey(), messageMetaData.isImmediate(), serverSession.getAMQPConnection().getLastReadTime());
+                    ssn.getAMQPConnection().checkAuthorizedMessagePrincipal(getMessageUserId(xfr));
+                    ssn.authorisePublish(destination, messageMetaData.getRoutingKey(), messageMetaData.isImmediate(), ssn
+                            .getAMQPConnection().getLastReadTime());
 
                 }
                 catch (AccessControlException e)
@@ -462,7 +462,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
                 final MessageStore store = virtualHost.getMessageStore();
                 final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
                 final MessageTransferMessage message =
-                        new MessageTransferMessage(storeMessage, serverSession.getReference());
+                        new MessageTransferMessage(storeMessage, ssn.getReference());
                 MessageReference<MessageTransferMessage> reference = message.newReference();
 
                 try
@@ -490,12 +490,12 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
                         }
                     };
 
-                    RoutingResult<MessageTransferMessage> routingResult = serverSession.enqueue(message, instanceProperties, destination);
+                    RoutingResult<MessageTransferMessage> routingResult = ssn.enqueue(message, instanceProperties, destination);
 
                     boolean explictlyRejected = routingResult.containsReject(RejectType.LIMIT_EXCEEDED);
                     if (!routingResult.hasRoutes() || explictlyRejected)
                     {
-                        boolean closeWhenNoRoute = serverSession.getAMQPConnection().getPort().getCloseWhenNoRoute();
+                        boolean closeWhenNoRoute = ssn.getAMQPConnection().getPort().getCloseWhenNoRoute();
                         boolean discardUnroutable = delvProps != null && delvProps.getDiscardUnroutable();
                         if (!discardUnroutable && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
                         {
@@ -515,8 +515,8 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
                             ExecutionException ex = new ExecutionException();
                             ex.setErrorCode(code);
                             ex.setDescription(errorMessage);
-                            serverSession.invoke(ex);
-                            serverSession.close(ErrorCodes.RESOURCE_ERROR, errorMessage);
+                            ssn.invoke(ex);
+                            ssn.close(ErrorCodes.RESOURCE_ERROR, errorMessage);
                             return;
                         }
                         else
@@ -527,19 +527,19 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
                     }
 
                     // TODO: we currently do not send MessageAccept when AcceptMode is EXPLICIT
-                    if (serverSession.isTransactional())
+                    if (ssn.isTransactional())
                     {
-                        serverSession.processed(xfr);
+                        ssn.processed(xfr);
                     }
                     else
                     {
-                        serverSession.recordFuture(Futures.immediateFuture(null),
-                                                   new CommandProcessedAction(serverSession, xfr));
+                        ssn.recordFuture(Futures.immediateFuture(null),
+                                         new CommandProcessedAction(ssn, xfr));
                     }
                 }
                 catch (VirtualHostUnavailableException e)
                 {
-                    getServerConnection(serverSession).sendConnectionCloseAsync(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
+                    getServerConnection(ssn).sendConnectionCloseAsync(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
                 }
                 finally
                 {
@@ -571,7 +571,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
     {
         String destination = method.getDestination();
 
-        ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+        ConsumerTarget_0_10 sub = session.getSubscription(destination);
 
         if(sub == null)
         {
@@ -579,7 +579,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
         }
         else
         {
-            ((ServerSession)session).unregister(sub);
+            session.unregister(sub);
         }
     }
 
@@ -588,7 +588,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
     {
         String destination = method.getDestination();
 
-        ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+        ConsumerTarget_0_10 sub = session.getSubscription(destination);
 
         if(sub == null)
         {
@@ -604,28 +604,28 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
     public void txSelect(ServerSession session, TxSelect method)
     {
         // TODO - check current tx mode
-        ((ServerSession)session).selectTx();
+        session.selectTx();
     }
 
     @Override
     public void txCommit(ServerSession session, TxCommit method)
     {
         // TODO - check current tx mode
-        ((ServerSession)session).commit();
+        session.commit();
     }
 
     @Override
     public void txRollback(ServerSession session, TxRollback method)
     {
         // TODO - check current tx mode
-        ((ServerSession)session).rollback();
+        session.rollback();
     }
 
     @Override
     public void dtxSelect(ServerSession session, DtxSelect method)
     {
         // TODO - check current tx mode
-        ((ServerSession)session).selectDtx();
+        session.selectDtx();
     }
 
     @Override
@@ -635,7 +635,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
         result.setStatus(DtxXaStatus.XA_OK);
         try
         {
-            ((ServerSession)session).startDtx(method.getXid(), method.getJoin(), method.getResume());
+            session.startDtx(method.getXid(), method.getJoin(), method.getResume());
             session.executionResult(method.getId(), result);
         }
         catch(JoinAndResumeDtxException e)
@@ -667,7 +667,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
         {
             try
             {
-                ((ServerSession) session).endDtx(method.getXid(), method.getFail(), method.getSuspend());
+                session.endDtx(method.getXid(), method.getFail(), method.getSuspend());
             }
             catch (TimeoutDtxException e)
             {
@@ -703,7 +703,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
         {
             try
             {
-                ((ServerSession)session).commitDtx(method.getXid(), method.getOnePhase());
+                session.commitDtx(method.getXid(), method.getOnePhase());
             }
             catch (RollbackOnlyDtxException e)
             {
@@ -735,7 +735,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
     {
         try
         {
-            ((ServerSession)session).forgetDtx(method.getXid());
+            session.forgetDtx(method.getXid());
         }
         catch(UnknownDtxBranchException e)
         {
@@ -754,7 +754,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
         GetTimeoutResult result = new GetTimeoutResult();
         try
         {
-            result.setTimeout(((ServerSession) session).getTimeoutDtx(method.getXid()));
+            result.setTimeout(session.getTimeoutDtx(method.getXid()));
             session.executionResult(method.getId(), result);
         }
         catch(UnknownDtxBranchException e)
@@ -772,7 +772,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
         {
             try
             {
-                ((ServerSession)session).prepareDtx(method.getXid());
+                session.prepareDtx(method.getXid());
             }
             catch (RollbackOnlyDtxException e)
             {
@@ -803,7 +803,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
     public void dtxRecover(ServerSession session, DtxRecover method)
     {
         RecoverResult result = new RecoverResult();
-        List inDoubt = ((ServerSession)session).recoverDtx();
+        List inDoubt = session.recoverDtx();
         result.setInDoubt(inDoubt);
         session.executionResult(method.getId(), result);
     }
@@ -818,7 +818,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
         {
             try
             {
-                ((ServerSession)session).rollbackDtx(method.getXid());
+                session.rollbackDtx(method.getXid());
             }
             catch (TimeoutDtxException e)
             {
@@ -846,7 +846,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
     {
         try
         {
-            ((ServerSession)session).setTimeoutDtx(method.getXid(), method.getTimeout());
+            session.setTimeoutDtx(method.getXid(), method.getTimeout());
         }
         catch(UnknownDtxBranchException e)
         {
@@ -1003,7 +1003,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
 
         session.invoke(ex);
 
-        ((ServerSession)session).close(errorCode.getValue(), description);
+        session.close(errorCode.getValue(), description);
     }
 
     private Exchange<?> getExchange(ServerSession session, String exchangeName)
@@ -1528,7 +1528,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
                     exception(session, method, errorCode, description);
 
                 }
-                else if (!verifySessionAccess((ServerSession) session, queue))
+                else if (!verifySessionAccess(session, queue))
                 {
                     String description = "Cannot passively declare queue('" + queueName + "'),"
                                          + " as exclusive queue with same name "
@@ -1589,7 +1589,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
             catch(AbstractConfiguredObject.DuplicateNameException qe)
             {
                 queue = (Queue<?>) qe.getExisting();
-                if (!verifySessionAccess((ServerSession) session, queue))
+                if (!verifySessionAccess(session, queue))
                 {
                     String description = "Cannot declare queue('" + queueName + "'),"
                                                                            + " as exclusive queue with same name "
@@ -1640,7 +1640,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
             }
             else
             {
-                if(!verifySessionAccess((ServerSession) session, queue))
+                if(!verifySessionAccess(session, queue))
                 {
                     exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
                 }
@@ -1745,7 +1745,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
     {
         String destination = sfm.getDestination();
 
-        ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+        ConsumerTarget_0_10 sub = session.getSubscription(destination);
 
         if(sub == null)
         {
@@ -1766,7 +1766,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
     {
         String destination = stop.getDestination();
 
-        ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+        ConsumerTarget_0_10 sub = session.getSubscription(destination);
 
         if(sub == null)
         {
@@ -1784,7 +1784,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
     {
         String destination = flow.getDestination();
 
-        ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+        ConsumerTarget_0_10 sub = session.getSubscription(destination);
 
         if(sub == null)
         {
@@ -1799,11 +1799,10 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
 
     public void closed(ServerSession session)
     {
-        ServerSession serverSession = (ServerSession)session;
 
-        serverSession.stopSubscriptions();
-        serverSession.onClose();
-        serverSession.unregisterSubscriptions();
+        session.stopSubscriptions();
+        session.onClose();
+        session.unregisterSubscriptions();
     }
 
     public void detached(ServerSession session)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org