You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/01/24 13:47:05 UTC
svn commit: r1780080 [1/2] - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/transport/
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
broker-plugins/amqp-0-10-protocol/src/test/java/org/apa...
Author: kwall
Date: Tue Jan 24 13:47:05 2017
New Revision: 1780080
URL: http://svn.apache.org/viewvc?rev=1780080&view=rev
Log:
QPID-7633: Extract interfaces AMQPConnection_0_10/AMQPConnection_1_0 (analogue of AMQPConnection_0_8)
* Mechanical refactoring - extract interfaces AMQPConnection_0_10/AMQPConnection_1_0
* Fix up unit tests
Added:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
- copied, changed from r1780079, qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
- copied, changed from r1780079, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (contents, props changed)
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1780080&r1=1780079&r2=1780080&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Tue Jan 24 13:47:05 2017
@@ -32,6 +32,8 @@ import com.google.common.util.concurrent
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.ContextProvider;
+import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.util.Deletable;
@@ -45,6 +47,8 @@ public interface AMQPConnection<C extend
Subject getSubject();
+ int getMessageCompressionThreshold();
+
Principal getAuthorizedPrincipal();
String getRemoteAddressString();
@@ -104,4 +108,7 @@ public interface AMQPConnection<C extend
void notifyWork(AMQSessionModel<?,?> sessionModel);
boolean isTransportBlockedForWriting();
+
+ long getMaxMessageSize();
+
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1780080&r1=1780079&r2=1780080&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Tue Jan 24 13:47:05 2017
@@ -372,6 +372,7 @@ public abstract class AbstractAMQPConnec
return maxMessageSize > 0 ? maxMessageSize : Long.MAX_VALUE;
}
+ @Override
public long getMaxMessageSize()
{
return _maxMessageSize.get();
@@ -818,6 +819,7 @@ public abstract class AbstractAMQPConnec
logConnectionOpen();
}
+ @Override
public int getMessageCompressionThreshold()
{
return _messageCompressionThreshold;
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1780080&r1=1780079&r2=1780080&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Tue Jan 24 13:47:05 2017
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,345 +17,44 @@
* under the License.
*
*/
+
package org.apache.qpid.server.protocol.v0_10;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.logging.messages.ConnectionMessages;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Protocol;
-import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.security.SubjectCreator;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.transport.AbstractAMQPConnection;
-import org.apache.qpid.server.transport.AggregateTicker;
-import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.transport.ServerNetworkConnection;
-import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.transport.ConnectionCloseCode;
-import org.apache.qpid.transport.ConnectionDelegate;
-import org.apache.qpid.transport.Constant;
+import java.security.AccessControlContext;
+import javax.security.auth.Subject;
-public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10, ServerConnection>
+import org.apache.qpid.server.logging.EventLoggerProvider;
+import org.apache.qpid.server.model.ContextProvider;
+import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.transport.ProtocolEngine;
+
+@ManagedObject(category = false, creatable = false, type="AMQP_0_10")
+public interface AMQPConnection_0_10<C extends AMQPConnection_0_10<C>> extends AMQPConnection<C>,
+ ProtocolEngine,
+ EventLoggerProvider
{
- private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_10.class);
- private final ServerInputHandler _inputHandler;
+ // 0-10's current implementation (ServerConnection etc) means we have to break the encapsulation
+
+ void initialiseHeartbeating(long writerIdle, long readerIdle);
+
+ void setClientId(String clientId);
+
+ void setClientProduct(String clientProduct);
+
+ void setClientVersion(String clientVersion);
+
+ void setRemoteProcessPid(String remoteProcessPid);
+
+ void setSubject(Subject authorizedSubject);
+
+ void setAddressSpace(NamedAddressSpace addressSpace);
- private final ServerConnection _connection;
+ ContextProvider getContextProvider();
- private volatile boolean _transportBlockedForWriting;
+ AccessControlContext getAccessControllerContext();
- private final AtomicBoolean _stateChanged = new AtomicBoolean();
- private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
- private ServerDisassembler _disassembler;
-
- private final Set<AMQSessionModel<?,?>> _sessionsWithWork =
- Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?,?>, Boolean>());
-
-
- public AMQPConnection_0_10(final Broker<?> broker,
- ServerNetworkConnection network,
- final AmqpPort<?> port,
- final Transport transport,
- final long id,
- final AggregateTicker aggregateTicker)
- {
- super(broker, network, port, transport, Protocol.AMQP_0_10, id, aggregateTicker);
-
- _connection = new ServerConnection(id, broker, port, transport, this);
-
- SubjectCreator subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
- ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker, subjectCreator);
-
- _connection.setConnectionDelegate(connDelegate);
- _connection.setRemoteAddress(network.getRemoteAddress());
- _connection.setLocalAddress(network.getLocalAddress());
-
- _inputHandler = new ServerInputHandler(new ServerAssembler(_connection));
- _connection.addFrameSizeObserver(_inputHandler);
-
- AccessController.doPrivileged(new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
- _connection.setNetworkConnection(getNetwork());
- _disassembler = new ServerDisassembler(wrapSender(getNetwork().getSender()), Constant.MIN_MAX_FRAME_SIZE);
- _connection.setSender(_disassembler);
- _connection.addFrameSizeObserver(_disassembler);
- return null;
- }
- }, getAccessControllerContext());
- }
-
- private ByteBufferSender wrapSender(final ByteBufferSender sender)
- {
- return new ByteBufferSender()
- {
- @Override
- public boolean isDirectBufferPreferred()
- {
- return sender.isDirectBufferPreferred();
- }
-
- @Override
- public void send(final QpidByteBuffer msg)
- {
- updateLastWriteTime();
- sender.send(msg);
- }
-
- @Override
- public void flush()
- {
- sender.flush();
-
- }
-
- @Override
- public void close()
- {
- sender.close();
- }
- };
- }
-
- public void received(final QpidByteBuffer buf)
- {
- AccessController.doPrivileged(new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
- updateLastReadTime();
- try
- {
- _inputHandler.received(buf);
- _connection.receivedComplete();
- }
- catch (IllegalArgumentException | IllegalStateException e)
- {
- throw new ConnectionScopedRuntimeException(e);
- }
- catch (StoreException e)
- {
- if (getAddressSpace().isActive())
- {
- throw new ServerScopedRuntimeException(e);
- }
- else
- {
- throw new ConnectionScopedRuntimeException(e);
- }
- }
- return null;
- }
- }, getAccessControllerContext());
- }
-
- @Override
- public void encryptedTransport()
- {
- }
-
- public void writerIdle()
- {
- _connection.doHeartBeat();
- }
-
- public void readerIdle()
- {
- AccessController.doPrivileged(new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
- _connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Current connection state: " + _connection.getConnectionDelegate().getState(), true));
- getNetwork().close();
- return null;
- }
- }, getAccessControllerContext());
-
- }
-
- public String getAddress()
- {
- return getNetwork().getRemoteAddress().toString();
- }
-
- @Override
- public void closed()
- {
- try
- {
- AccessController.doPrivileged(new PrivilegedAction<Void>()
- {
- @Override
- public Void run()
- {
- _inputHandler.closed();
- if(_disassembler != null)
- {
- _disassembler.closed();
- }
- return null;
- }
- }, getAccessControllerContext());
- }
- finally
- {
- markTransportClosed();
- }
- }
-
- @Override
- public boolean isTransportBlockedForWriting()
- {
- return _transportBlockedForWriting;
- }
-
- @Override
- public void setTransportBlockedForWriting(final boolean blocked)
- {
- if(_transportBlockedForWriting != blocked)
- {
- _transportBlockedForWriting = blocked;
- _connection.transportStateChanged();
- }
- }
-
- @Override
- public Iterator<Runnable> processPendingIterator()
- {
- if (isIOThread())
- {
- return _connection.processPendingIterator(_sessionsWithWork);
- }
- else
- {
- return Collections.emptyIterator();
- }
- }
-
- @Override
- public boolean hasWork()
- {
- return _stateChanged.get();
- }
-
- @Override
- public void notifyWork()
- {
- _stateChanged.set(true);
-
- final Action<ProtocolEngine> listener = _workListener.get();
- if(listener != null)
- {
- listener.performAction(this);
- }
- }
-
- @Override
- public void notifyWork(final AMQSessionModel<?,?> sessionModel)
- {
- _sessionsWithWork.add(sessionModel);
- notifyWork();
- }
-
- public void clearWork()
- {
- _stateChanged.set(false);
- }
-
- public void setWorkListener(final Action<ProtocolEngine> listener)
- {
- _workListener.set(listener);
- }
-
- public boolean hasSessionWithName(final byte[] name)
- {
- return _connection.hasSessionWithName(name);
- }
-
- @Override
- public void sendConnectionCloseAsync(final CloseReason reason, final String description)
- {
- stopConnection();
- // Best mapping for all reasons is "forced"
- _connection.sendConnectionCloseAsync(ConnectionCloseCode.CONNECTION_FORCED, description);
-
- }
-
- @Override
- public void closeSessionAsync(final AMQSessionModel<?,?> session,
- final CloseReason reason, final String message)
- {
- ServerSession s = ((Session_0_10)session).getServerSession();
- _connection.closeSessionAsync(s, reason, message);
- }
-
- @Override
- protected void addAsyncTask(final Action<? super ServerConnection> action)
- {
- _connection.addAsyncTask(action);
- }
-
- public void block()
- {
- _connection.block();
- }
-
- public String getRemoteContainerName()
- {
- return _connection.getRemoteContainerName();
- }
-
- public Collection<? extends Session_0_10> getSessionModels()
- {
- final Collection<org.apache.qpid.server.model.Session> sessions =
- getChildren(org.apache.qpid.server.model.Session.class);
- final Collection<? extends Session_0_10> session_0_10s = new ArrayList<>((Collection)sessions);
- return session_0_10s;
- }
-
- public void unblock()
- {
- _connection.unblock();
- }
-
- public long getSessionCountLimit()
- {
- return _connection.getSessionCountLimit();
- }
-
- @Override
- protected boolean isOrderlyClose()
- {
- return !_connection.isConnectionLost();
- }
-
- @Override
- public void initialiseHeartbeating(final long writerDelay, final long readerDelay)
- {
- super.initialiseHeartbeating(writerDelay, readerDelay);
- }
+ void performDeleteTasks();
}
Propchange: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
('svn:executable' removed)
Copied: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java (from r1780079, qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java?p2=qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java&p1=qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java&r1=1780079&r2=1780080&rev=1780080&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java Tue Jan 24 13:47:05 2017
@@ -56,9 +56,11 @@ import org.apache.qpid.transport.Connect
import org.apache.qpid.transport.Constant;
-public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10, ServerConnection>
+public class AMQPConnection_0_10Impl extends AbstractAMQPConnection<AMQPConnection_0_10Impl, ServerConnection>
+ implements
+ AMQPConnection_0_10<AMQPConnection_0_10Impl>
{
- private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_10.class);
+ private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_10Impl.class);
private final ServerInputHandler _inputHandler;
private final ServerConnection _connection;
@@ -73,12 +75,12 @@ public class AMQPConnection_0_10 extends
Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?,?>, Boolean>());
- public AMQPConnection_0_10(final Broker<?> broker,
- ServerNetworkConnection network,
- final AmqpPort<?> port,
- final Transport transport,
- final long id,
- final AggregateTicker aggregateTicker)
+ public AMQPConnection_0_10Impl(final Broker<?> broker,
+ ServerNetworkConnection network,
+ final AmqpPort<?> port,
+ final Transport transport,
+ final long id,
+ final AggregateTicker aggregateTicker)
{
super(broker, network, port, transport, Protocol.AMQP_0_10, id, aggregateTicker);
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java?rev=1780080&r1=1780079&r2=1780080&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java Tue Jan 24 13:47:05 2017
@@ -68,8 +68,8 @@ public class ProtocolEngineCreator_0_10
long id, final AggregateTicker aggregateTicker)
{
- final AMQPConnection_0_10 protocolEngine_0_10 =
- new AMQPConnection_0_10(broker, network, port, transport, id, aggregateTicker);
+ final AMQPConnection_0_10Impl protocolEngine_0_10 =
+ new AMQPConnection_0_10Impl(broker, network, port, transport, id, aggregateTicker);
protocolEngine_0_10.create();
return protocolEngine_0_10;
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1780080&r1=1780079&r2=1780080&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Tue Jan 24 13:47:05 2017
@@ -45,6 +45,7 @@ import org.apache.qpid.server.security.S
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
+import org.apache.qpid.server.security.auth.sasl.SaslSettings;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
@@ -479,7 +480,8 @@ public class ServerConnectionDelegate ex
return;
}
- _saslNegotiator = _subjectCreator.createSaslNegotiator(mechanism, serverConnection.getAmqpConnection());
+ _saslNegotiator = _subjectCreator.createSaslNegotiator(mechanism,
+ (SaslSettings) serverConnection.getAmqpConnection());
if (_saslNegotiator == null)
{
serverConnection.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED,
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1780080&r1=1780079&r2=1780080&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Tue Jan 24 13:47:05 2017
@@ -66,7 +66,6 @@ import org.apache.qpid.server.model.Brok
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java?rev=1780080&r1=1780079&r2=1780080&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java Tue Jan 24 13:47:05 2017
@@ -27,18 +27,19 @@ import java.util.List;
import javax.security.auth.Subject;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.model.BrokerTestHelper;
-import org.apache.qpid.server.session.AMQPSession;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.ExecutionErrorCode;
@@ -50,12 +51,15 @@ public class ServerSessionTest extends Q
{
private VirtualHost<?> _virtualHost;
+ private CurrentThreadTaskExecutor _taskExecutor;
@Override
public void setUp() throws Exception
{
super.setUp();
BrokerTestHelper.setUp();
+ _taskExecutor = new CurrentThreadTaskExecutor();
+ _taskExecutor.start();
_virtualHost = BrokerTestHelper.createVirtualHost(getName());
}
@@ -71,30 +75,37 @@ public class ServerSessionTest extends Q
}
finally
{
- BrokerTestHelper.tearDown();
- super.tearDown();
+ try
+ {
+ if (_taskExecutor != null)
+ {
+ _taskExecutor.stop();
+ }
+ }
+ finally
+ {
+ BrokerTestHelper.tearDown();
+ super.tearDown();
+ }
}
}
public void testOverlargeMessageTest() throws Exception
{
- if (true) return;
-
- TaskExecutor taskExecutor = mock(TaskExecutor.class);
-
final Broker<?> broker = mock(Broker.class);
when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);
AmqpPort port = createMockPort();
- final AMQPConnection_0_10 modelConnection = mock(AMQPConnection_0_10.class); // TODO needs to be an interface
+ final AMQPConnection_0_10 modelConnection = mock(AMQPConnection_0_10.class);
when(modelConnection.getAddressSpace()).thenReturn(_virtualHost);
when(modelConnection.getContextProvider()).thenReturn(_virtualHost);
when(modelConnection.getBroker()).thenReturn((Broker)broker);
when(modelConnection.getEventLogger()).thenReturn(mock(EventLogger.class));
when(modelConnection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
when(modelConnection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
- when(modelConnection.getChildExecutor()).thenReturn(taskExecutor);
+ when(modelConnection.getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE)).thenReturn(Connection.DEFAULT_MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+ when(modelConnection.getChildExecutor()).thenReturn(_taskExecutor);
when(modelConnection.getModel()).thenReturn(BrokerModel.getInstance());
Subject subject = new Subject();
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1780080&r1=1780079&r2=1780080&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Tue Jan 24 13:47:05 2017
@@ -69,8 +69,6 @@ public interface AMQPConnection_0_8<C ex
int getBinaryDataLimit();
- long getMaxMessageSize();
-
boolean ignoreAllButCloseOk();
boolean channelAwaitingClosure(int channelId);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org