You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/01/24 13:47:05 UTC

svn commit: r1780080 [1/2] - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/transport/ broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ broker-plugins/amqp-0-10-protocol/src/test/java/org/apa...

Author: kwall
Date: Tue Jan 24 13:47:05 2017
New Revision: 1780080

URL: http://svn.apache.org/viewvc?rev=1780080&view=rev
Log:
QPID-7633: Extract interfaces AMQPConnection_0_10/AMQPConnection_1_0 (analogue of AMQPConnection_0_8)

* Mechanical refactoring - extract interfaces AMQPConnection_0_10/AMQPConnection_1_0
* Fix up unit tests

Added:
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
      - copied, changed from r1780079, qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
      - copied, changed from r1780079, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java   (contents, props changed)
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1780080&r1=1780079&r2=1780080&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Tue Jan 24 13:47:05 2017
@@ -32,6 +32,8 @@ import com.google.common.util.concurrent
 import org.apache.qpid.server.logging.EventLoggerProvider;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.ContextProvider;
+import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.util.Deletable;
 
@@ -45,6 +47,8 @@ public interface AMQPConnection<C extend
 
     Subject getSubject();
 
+    int getMessageCompressionThreshold();
+
     Principal getAuthorizedPrincipal();
 
     String getRemoteAddressString();
@@ -104,4 +108,7 @@ public interface AMQPConnection<C extend
     void notifyWork(AMQSessionModel<?,?> sessionModel);
 
     boolean isTransportBlockedForWriting();
+
+    long getMaxMessageSize();
+
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1780080&r1=1780079&r2=1780080&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Tue Jan 24 13:47:05 2017
@@ -372,6 +372,7 @@ public abstract class AbstractAMQPConnec
         return maxMessageSize > 0 ? maxMessageSize : Long.MAX_VALUE;
     }
 
+    @Override
     public long getMaxMessageSize()
     {
         return _maxMessageSize.get();
@@ -818,6 +819,7 @@ public abstract class AbstractAMQPConnec
         logConnectionOpen();
     }
 
+    @Override
     public int getMessageCompressionThreshold()
     {
         return _messageCompressionThreshold;

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1780080&r1=1780079&r2=1780080&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Tue Jan 24 13:47:05 2017
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,345 +17,44 @@
  * under the License.
  *
  */
+
 package org.apache.qpid.server.protocol.v0_10;
 
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.logging.messages.ConnectionMessages;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Protocol;
-import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.security.SubjectCreator;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.transport.AbstractAMQPConnection;
-import org.apache.qpid.server.transport.AggregateTicker;
-import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.transport.ServerNetworkConnection;
-import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.transport.ConnectionCloseCode;
-import org.apache.qpid.transport.ConnectionDelegate;
-import org.apache.qpid.transport.Constant;
+import java.security.AccessControlContext;
 
+import javax.security.auth.Subject;
 
-public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10, ServerConnection>
+import org.apache.qpid.server.logging.EventLoggerProvider;
+import org.apache.qpid.server.model.ContextProvider;
+import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.transport.ProtocolEngine;
+
+@ManagedObject(category = false, creatable = false, type="AMQP_0_10")
+public interface AMQPConnection_0_10<C extends AMQPConnection_0_10<C>> extends AMQPConnection<C>,
+                                                                             ProtocolEngine,
+                                                                             EventLoggerProvider
 {
-    private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_10.class);
-    private final ServerInputHandler _inputHandler;
+    // 0-10's current implementation (ServerConnection etc) means we have to break the encapsulation
+
+    void initialiseHeartbeating(long writerIdle, long readerIdle);
+
+    void setClientId(String clientId);
+
+    void setClientProduct(String clientProduct);
+
+    void setClientVersion(String clientVersion);
+
+    void setRemoteProcessPid(String remoteProcessPid);
+
+    void setSubject(Subject authorizedSubject);
+
+    void setAddressSpace(NamedAddressSpace addressSpace);
 
-    private final ServerConnection _connection;
+    ContextProvider getContextProvider();
 
-    private volatile boolean _transportBlockedForWriting;
+    AccessControlContext getAccessControllerContext();
 
-    private final AtomicBoolean _stateChanged = new AtomicBoolean();
-    private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
-    private ServerDisassembler _disassembler;
-
-    private final Set<AMQSessionModel<?,?>> _sessionsWithWork =
-            Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?,?>, Boolean>());
-
-
-    public AMQPConnection_0_10(final Broker<?> broker,
-                               ServerNetworkConnection network,
-                               final AmqpPort<?> port,
-                               final Transport transport,
-                               final long id,
-                               final AggregateTicker aggregateTicker)
-    {
-        super(broker, network, port, transport, Protocol.AMQP_0_10, id, aggregateTicker);
-
-        _connection = new ServerConnection(id, broker, port, transport, this);
-
-        SubjectCreator subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
-        ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker, subjectCreator);
-
-        _connection.setConnectionDelegate(connDelegate);
-        _connection.setRemoteAddress(network.getRemoteAddress());
-        _connection.setLocalAddress(network.getLocalAddress());
-
-        _inputHandler = new ServerInputHandler(new ServerAssembler(_connection));
-        _connection.addFrameSizeObserver(_inputHandler);
-
-        AccessController.doPrivileged(new PrivilegedAction<Object>()
-        {
-            @Override
-            public Object run()
-            {
-                _connection.setNetworkConnection(getNetwork());
-                _disassembler = new ServerDisassembler(wrapSender(getNetwork().getSender()), Constant.MIN_MAX_FRAME_SIZE);
-                _connection.setSender(_disassembler);
-                _connection.addFrameSizeObserver(_disassembler);
-                return null;
-            }
-        }, getAccessControllerContext());
-    }
-
-    private ByteBufferSender wrapSender(final ByteBufferSender sender)
-    {
-        return new ByteBufferSender()
-        {
-            @Override
-            public boolean isDirectBufferPreferred()
-            {
-                return sender.isDirectBufferPreferred();
-            }
-
-            @Override
-            public void send(final QpidByteBuffer msg)
-            {
-                updateLastWriteTime();
-                sender.send(msg);
-            }
-
-            @Override
-            public void flush()
-            {
-                sender.flush();
-
-            }
-
-            @Override
-            public void close()
-            {
-                sender.close();
-            }
-        };
-    }
-
-    public void received(final QpidByteBuffer buf)
-    {
-        AccessController.doPrivileged(new PrivilegedAction<Object>()
-        {
-            @Override
-            public Object run()
-            {
-                updateLastReadTime();
-                try
-                {
-                    _inputHandler.received(buf);
-                    _connection.receivedComplete();
-                }
-                catch (IllegalArgumentException | IllegalStateException e)
-                {
-                    throw new ConnectionScopedRuntimeException(e);
-                }
-                catch (StoreException e)
-                {
-                    if (getAddressSpace().isActive())
-                    {
-                        throw new ServerScopedRuntimeException(e);
-                    }
-                    else
-                    {
-                        throw new ConnectionScopedRuntimeException(e);
-                    }
-                }
-                return null;
-            }
-        }, getAccessControllerContext());
-    }
-
-    @Override
-    public void encryptedTransport()
-    {
-    }
-
-    public void writerIdle()
-    {
-        _connection.doHeartBeat();
-    }
-
-    public void readerIdle()
-    {
-        AccessController.doPrivileged(new PrivilegedAction<Object>()
-        {
-            @Override
-            public Object run()
-            {
-                _connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Current connection state: " + _connection.getConnectionDelegate().getState(), true));
-                getNetwork().close();
-                return null;
-            }
-        }, getAccessControllerContext());
-
-    }
-
-    public String getAddress()
-    {
-        return getNetwork().getRemoteAddress().toString();
-    }
-
-    @Override
-    public void closed()
-    {
-        try
-        {
-            AccessController.doPrivileged(new PrivilegedAction<Void>()
-            {
-                @Override
-                public Void run()
-                {
-                    _inputHandler.closed();
-                    if(_disassembler != null)
-                    {
-                        _disassembler.closed();
-                    }
-                    return null;
-                }
-            }, getAccessControllerContext());
-        }
-        finally
-        {
-            markTransportClosed();
-        }
-    }
-
-    @Override
-    public boolean isTransportBlockedForWriting()
-    {
-        return _transportBlockedForWriting;
-    }
-
-    @Override
-    public void setTransportBlockedForWriting(final boolean blocked)
-    {
-        if(_transportBlockedForWriting != blocked)
-        {
-            _transportBlockedForWriting = blocked;
-            _connection.transportStateChanged();
-        }
-    }
-
-    @Override
-    public Iterator<Runnable> processPendingIterator()
-    {
-        if (isIOThread())
-        {
-            return _connection.processPendingIterator(_sessionsWithWork);
-        }
-        else
-        {
-            return Collections.emptyIterator();
-        }
-    }
-
-    @Override
-    public boolean hasWork()
-    {
-        return _stateChanged.get();
-    }
-
-    @Override
-    public void notifyWork()
-    {
-        _stateChanged.set(true);
-
-        final Action<ProtocolEngine> listener = _workListener.get();
-        if(listener != null)
-        {
-            listener.performAction(this);
-        }
-    }
-
-    @Override
-    public void notifyWork(final AMQSessionModel<?,?> sessionModel)
-    {
-        _sessionsWithWork.add(sessionModel);
-        notifyWork();
-    }
-
-    public void clearWork()
-    {
-        _stateChanged.set(false);
-    }
-
-    public void setWorkListener(final Action<ProtocolEngine> listener)
-    {
-        _workListener.set(listener);
-    }
-
-    public boolean hasSessionWithName(final byte[] name)
-    {
-        return _connection.hasSessionWithName(name);
-    }
-
-    @Override
-    public void sendConnectionCloseAsync(final CloseReason reason, final String description)
-    {
-        stopConnection();
-        // Best mapping for all reasons is "forced"
-        _connection.sendConnectionCloseAsync(ConnectionCloseCode.CONNECTION_FORCED, description);
-
-    }
-
-    @Override
-    public void closeSessionAsync(final AMQSessionModel<?,?> session,
-                                  final CloseReason reason, final String message)
-    {
-        ServerSession s = ((Session_0_10)session).getServerSession();
-        _connection.closeSessionAsync(s, reason, message);
-    }
-
-    @Override
-    protected void addAsyncTask(final Action<? super ServerConnection> action)
-    {
-        _connection.addAsyncTask(action);
-    }
-
-    public void block()
-    {
-        _connection.block();
-    }
-
-    public String getRemoteContainerName()
-    {
-        return _connection.getRemoteContainerName();
-    }
-
-    public Collection<? extends Session_0_10> getSessionModels()
-    {
-        final Collection<org.apache.qpid.server.model.Session> sessions =
-                getChildren(org.apache.qpid.server.model.Session.class);
-        final Collection<? extends Session_0_10> session_0_10s = new ArrayList<>((Collection)sessions);
-        return session_0_10s;
-    }
-
-    public void unblock()
-    {
-        _connection.unblock();
-    }
-
-    public long getSessionCountLimit()
-    {
-        return _connection.getSessionCountLimit();
-    }
-
-    @Override
-    protected boolean isOrderlyClose()
-    {
-        return !_connection.isConnectionLost();
-    }
-
-    @Override
-    public void initialiseHeartbeating(final long writerDelay, final long readerDelay)
-    {
-        super.initialiseHeartbeating(writerDelay, readerDelay);
-    }
+    void performDeleteTasks();
 }

Propchange: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
            ('svn:executable' removed)

Copied: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java (from r1780079, qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java?p2=qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java&p1=qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java&r1=1780079&r2=1780080&rev=1780080&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java Tue Jan 24 13:47:05 2017
@@ -56,9 +56,11 @@ import org.apache.qpid.transport.Connect
 import org.apache.qpid.transport.Constant;
 
 
-public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10, ServerConnection>
+public class AMQPConnection_0_10Impl extends AbstractAMQPConnection<AMQPConnection_0_10Impl, ServerConnection>
+        implements
+        AMQPConnection_0_10<AMQPConnection_0_10Impl>
 {
-    private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_10.class);
+    private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_10Impl.class);
     private final ServerInputHandler _inputHandler;
 
     private final ServerConnection _connection;
@@ -73,12 +75,12 @@ public class AMQPConnection_0_10 extends
             Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?,?>, Boolean>());
 
 
-    public AMQPConnection_0_10(final Broker<?> broker,
-                               ServerNetworkConnection network,
-                               final AmqpPort<?> port,
-                               final Transport transport,
-                               final long id,
-                               final AggregateTicker aggregateTicker)
+    public AMQPConnection_0_10Impl(final Broker<?> broker,
+                                   ServerNetworkConnection network,
+                                   final AmqpPort<?> port,
+                                   final Transport transport,
+                                   final long id,
+                                   final AggregateTicker aggregateTicker)
     {
         super(broker, network, port, transport, Protocol.AMQP_0_10, id, aggregateTicker);
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java?rev=1780080&r1=1780079&r2=1780080&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java Tue Jan 24 13:47:05 2017
@@ -68,8 +68,8 @@ public class ProtocolEngineCreator_0_10
                                             long id, final AggregateTicker aggregateTicker)
     {
 
-        final AMQPConnection_0_10 protocolEngine_0_10 =
-                new AMQPConnection_0_10(broker, network, port, transport, id, aggregateTicker);
+        final AMQPConnection_0_10Impl protocolEngine_0_10 =
+                new AMQPConnection_0_10Impl(broker, network, port, transport, id, aggregateTicker);
         protocolEngine_0_10.create();
         return protocolEngine_0_10;
     }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1780080&r1=1780079&r2=1780080&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Tue Jan 24 13:47:05 2017
@@ -45,6 +45,7 @@ import org.apache.qpid.server.security.S
 import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
 import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
 import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
+import org.apache.qpid.server.security.auth.sasl.SaslSettings;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
@@ -479,7 +480,8 @@ public class ServerConnectionDelegate ex
             return;
         }
 
-        _saslNegotiator = _subjectCreator.createSaslNegotiator(mechanism, serverConnection.getAmqpConnection());
+        _saslNegotiator = _subjectCreator.createSaslNegotiator(mechanism,
+                                                               (SaslSettings) serverConnection.getAmqpConnection());
         if (_saslNegotiator == null)
         {
             serverConnection.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED,

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1780080&r1=1780079&r2=1780080&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Tue Jan 24 13:47:05 2017
@@ -66,7 +66,6 @@ import org.apache.qpid.server.model.Brok
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreException;

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java?rev=1780080&r1=1780079&r2=1780080&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java Tue Jan 24 13:47:05 2017
@@ -27,18 +27,19 @@ import java.util.List;
 
 import javax.security.auth.Subject;
 
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.model.BrokerTestHelper;
-import org.apache.qpid.server.session.AMQPSession;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.transport.Binary;
 import org.apache.qpid.transport.ExecutionErrorCode;
@@ -50,12 +51,15 @@ public class ServerSessionTest extends Q
 {
 
     private VirtualHost<?> _virtualHost;
+    private CurrentThreadTaskExecutor _taskExecutor;
 
     @Override
     public void setUp() throws Exception
     {
         super.setUp();
         BrokerTestHelper.setUp();
+        _taskExecutor = new CurrentThreadTaskExecutor();
+        _taskExecutor.start();
         _virtualHost = BrokerTestHelper.createVirtualHost(getName());
     }
 
@@ -71,30 +75,37 @@ public class ServerSessionTest extends Q
         }
         finally
         {
-            BrokerTestHelper.tearDown();
-            super.tearDown();
+            try
+            {
+                if (_taskExecutor != null)
+                {
+                    _taskExecutor.stop();
+                }
+            }
+            finally
+            {
+                BrokerTestHelper.tearDown();
+                super.tearDown();
+            }
         }
     }
 
     public void testOverlargeMessageTest() throws Exception
     {
-        if (true) return;
-
-        TaskExecutor taskExecutor = mock(TaskExecutor.class);
-
         final Broker<?> broker = mock(Broker.class);
         when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);
 
         AmqpPort port = createMockPort();
 
-        final AMQPConnection_0_10 modelConnection = mock(AMQPConnection_0_10.class); // TODO needs to be an interface
+        final AMQPConnection_0_10 modelConnection = mock(AMQPConnection_0_10.class);
         when(modelConnection.getAddressSpace()).thenReturn(_virtualHost);
         when(modelConnection.getContextProvider()).thenReturn(_virtualHost);
         when(modelConnection.getBroker()).thenReturn((Broker)broker);
         when(modelConnection.getEventLogger()).thenReturn(mock(EventLogger.class));
         when(modelConnection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
         when(modelConnection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
-        when(modelConnection.getChildExecutor()).thenReturn(taskExecutor);
+        when(modelConnection.getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE)).thenReturn(Connection.DEFAULT_MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+        when(modelConnection.getChildExecutor()).thenReturn(_taskExecutor);
         when(modelConnection.getModel()).thenReturn(BrokerModel.getInstance());
 
         Subject subject = new Subject();

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1780080&r1=1780079&r2=1780080&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Tue Jan 24 13:47:05 2017
@@ -69,8 +69,6 @@ public interface AMQPConnection_0_8<C ex
 
     int getBinaryDataLimit();
 
-    long getMaxMessageSize();
-
     boolean ignoreAllButCloseOk();
 
     boolean channelAwaitingClosure(int channelId);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org