You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/10/17 16:23:22 UTC
svn commit: r1632583 [3/5] - in /qpid/trunk/qpid/java: ./
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/
bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/
broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/...
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java Fri Oct 17 14:23:19 2014
@@ -24,7 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.store.StoredMessage;
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Fri Oct 17 14:23:19 2014
@@ -21,9 +21,13 @@
package org.apache.qpid.server.protocol.v0_8;
import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
+import java.security.AccessControlException;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
@@ -41,11 +45,11 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.security.auth.Subject;
+import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQDecoder;
@@ -54,7 +58,6 @@ import org.apache.qpid.common.ServerProp
import org.apache.qpid.framing.*;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.connection.ConnectionPrincipal;
@@ -67,15 +70,15 @@ import org.apache.qpid.server.message.In
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.SessionModelListener;
-import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
-import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
-import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry;
-import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
+import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -86,7 +89,9 @@ import org.apache.qpid.transport.Transpo
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.util.BytesDataOutput;
-public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine>
+public class AMQProtocolEngine implements ServerProtocolEngine,
+ AMQConnectionModel<AMQProtocolEngine, AMQChannel>,
+ ServerMethodProcessor<ServerChannelMethodProcessor>
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
@@ -94,6 +99,8 @@ public class AMQProtocolEngine implement
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
+ public static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength";
+ public static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80;
private final Port<?> _port;
private final long _creationTime;
@@ -105,13 +112,12 @@ public class AMQProtocolEngine implement
private VirtualHostImpl<?,?,?> _virtualHost;
- private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap =
- new HashMap<Integer, AMQChannel<AMQProtocolEngine>>();
+ private final Map<Integer, AMQChannel> _channelMap =
+ new HashMap<>();
private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners =
- new CopyOnWriteArrayList<SessionModelListener>();
+ new CopyOnWriteArrayList<>();
- @SuppressWarnings("unchecked")
- private final AMQChannel<AMQProtocolEngine>[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
+ private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
/**
* The channels that the latest call to {@link #received(ByteBuffer)} applied to.
@@ -120,10 +126,8 @@ public class AMQProtocolEngine implement
*
* Thread-safety: guarded by {@link #_receivedLock}.
*/
- private final Set<AMQChannel<AMQProtocolEngine>> _channelsForCurrentMessage =
- new HashSet<AMQChannel<AMQProtocolEngine>>();
-
- private final AMQStateManager _stateManager;
+ private final Set<AMQChannel> _channelsForCurrentMessage =
+ new HashSet<>();
private AMQDecoder _decoder;
@@ -136,14 +140,13 @@ public class AMQProtocolEngine implement
/* AMQP Version for this session */
private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
- private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
+ private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion);
private final List<Action<? super AMQProtocolEngine>> _taskList =
- new CopyOnWriteArrayList<Action<? super AMQProtocolEngine>>();
+ new CopyOnWriteArrayList<>();
- private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
+ private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<>();
private ProtocolOutputConverter _protocolOutputConverter;
private final Subject _authorizedSubject = new Subject();
- private MethodDispatcher _dispatcher;
private final long _connectionID;
private Object _reference = new Object();
@@ -177,6 +180,9 @@ public class AMQProtocolEngine implement
private boolean _authenticated;
private boolean _compressionSupported;
private int _messageCompressionThreshold;
+ private int _currentClassId;
+ private int _currentMethodId;
+ private int _binaryDataLimit;
public AMQProtocolEngine(Broker broker,
final NetworkConnection network,
@@ -189,11 +195,12 @@ public class AMQProtocolEngine implement
_transport = transport;
_maxNoOfChannels = broker.getConnection_sessionCountLimit();
_receivedLock = new ReentrantLock();
- _stateManager = new AMQStateManager(broker, this);
- _decoder = new AMQDecoder(true, this);
+ _decoder = new BrokerDecoder(this);
_connectionID = connectionId;
_logSubject = new ConnectionLogSubject(this);
-
+ _binaryDataLimit = _broker.getContextKeys(false).contains(BROKER_DEBUG_BINARY_DATA_LENGTH)
+ ? _broker.getContextValue(Integer.class, BROKER_DEBUG_BINARY_DATA_LENGTH)
+ : DEFAULT_DEBUG_BINARY_DATA_LENGTH;
_authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this));
runAsSubject(new PrivilegedAction<Void>()
{
@@ -303,28 +310,7 @@ public class AMQProtocolEngine implement
_receivedLock.lock();
try
{
- final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg);
- for (AMQDataBlock dataBlock : dataBlocks)
- {
- try
- {
- dataBlockReceived(dataBlock);
- }
- catch(AMQConnectionException e)
- {
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Caught AMQConnectionException but will simply stop processing data blocks - the connection should already be closed.", e);
- }
- break;
- }
- catch (AMQException e)
- {
- _logger.error("Unexpected exception when processing datablock", e);
- closeProtocolSession();
- break;
- }
- }
+ _decoder.decodeBuffer(msg);
receivedComplete();
}
catch (ConnectionScopedRuntimeException e)
@@ -366,7 +352,7 @@ public class AMQProtocolEngine implement
{
RuntimeException exception = null;
- for (AMQChannel<AMQProtocolEngine> channel : _channelsForCurrentMessage)
+ for (AMQChannel channel : _channelsForCurrentMessage)
{
try
{
@@ -391,112 +377,10 @@ public class AMQProtocolEngine implement
}
}
- /**
- * Process the data block.
- * If the message is for a channel it is added to {@link #_channelsForCurrentMessage}.
- *
- * @throws AMQConnectionException if unable to process the data block. In this case,
- * the connection is already closed by the time the exception is thrown. If any other
- * type of exception is thrown, the connection is not already closed.
- */
- private void dataBlockReceived(AMQDataBlock message) throws AMQException
- {
- if (message instanceof ProtocolInitiation)
- {
- protocolInitiationReceived((ProtocolInitiation) message);
-
- }
- else if (message instanceof AMQFrame)
- {
- AMQFrame frame = (AMQFrame) message;
- frameReceived(frame);
-
- }
- else
- {
- throw new AMQException("Unknown message type: " + message.getClass().getName() + ": " + message);
- }
- }
- /**
- * Handle the supplied frame.
- * Adds this frame's channel to {@link #_channelsForCurrentMessage}.
- *
- * @throws AMQConnectionException if unable to process the data block. In this case,
- * the connection is already closed by the time the exception is thrown. If any other
- * type of exception is thrown, the connection is not already closed.
- */
- private void frameReceived(AMQFrame frame) throws AMQException
+ void channelRequiresSync(final AMQChannel amqChannel)
{
- int channelId = frame.getChannel();
- AMQChannel<AMQProtocolEngine> amqChannel = _channelMap.get(channelId);
- if(amqChannel != null)
- {
- // The _receivedLock is already acquired in the caller
- // It is safe to add channel
- _channelsForCurrentMessage.add(amqChannel);
- }
- else
- {
- // Not an error. The frame is probably a channel Open for this channel id, which
- // does not require asynchronous work therefore its absence from
- // _channelsForCurrentMessage is ok.
- }
-
- AMQBody body = frame.getBodyFrame();
-
- long startTime = 0;
- String frameToString = null;
- if (_logger.isDebugEnabled())
- {
- startTime = System.currentTimeMillis();
- frameToString = frame.toString();
- _logger.debug("RECV: " + frame);
- }
-
- // Check that this channel is not closing
- if (channelAwaitingClosure(channelId))
- {
- if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
- }
- }
- else
- {
- // The channel has been told to close, we don't process any more frames until
- // it's closed.
- return;
- }
- }
-
- try
- {
- body.handle(channelId, this);
- }
- catch(AMQConnectionException e)
- {
- _logger.info(e.getMessage() + " whilst processing frame: " + body);
- closeConnection(channelId, e);
- throw e;
- }
- catch (AMQException e)
- {
- closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage());
- throw e;
- }
- catch (TransportException e)
- {
- closeChannel(channelId, AMQConstant.CHANNEL_ERROR, e.getMessage());
- throw e;
- }
-
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString);
- }
+ _channelsForCurrentMessage.add(amqChannel);
}
private synchronized void protocolInitiationReceived(ProtocolInitiation pi)
@@ -615,87 +499,6 @@ public class AMQProtocolEngine implement
return buf;
}
- public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
- {
- final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
-
- try
- {
- try
- {
- boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
-
- if (!wasAnyoneInterested)
- {
- throw new AMQNoMethodHandlerException(evt);
- }
- }
- catch (AMQChannelException e)
- {
- if (getChannel(channelId) != null)
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Closing channel due to: " + e.getMessage());
- }
-
- writeFrame(e.getCloseFrame(channelId));
- closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage());
- }
- else
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("ChannelException occurred on non-existent channel:" + e.getMessage());
- }
-
- if (_logger.isInfoEnabled())
- {
- _logger.info("Closing connection due to: " + e.getMessage());
- }
-
- AMQConnectionException ce =
- evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
- AMQConstant.CHANNEL_ERROR.getName().toString());
-
- _logger.info(e.getMessage() + " whilst processing:" + methodBody);
- closeConnection(channelId, ce);
- }
- }
- catch (AMQConnectionException e)
- {
- _logger.info(e.getMessage() + " whilst processing:" + methodBody);
- closeConnection(channelId, e);
- }
- }
- catch (Exception e)
- {
- _logger.error("Unexpected exception while processing frame. Closing connection.", e);
-
- closeProtocolSession();
- }
- }
-
- public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
- {
-
- AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId);
-
- channel.publishContentHeader(body);
-
- }
-
- public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
- {
- AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId);
-
- channel.publishContentBody(body);
- }
-
- public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
- {
- // NO - OP
- }
/**
* Convenience method that writes a frame to the protocol session. Equivalent to calling
@@ -735,28 +538,17 @@ public class AMQProtocolEngine implement
_contextKey = contextKey;
}
- public List<AMQChannel<AMQProtocolEngine>> getChannels()
+ public List<AMQChannel> getChannels()
{
synchronized (_channelMap)
{
- return new ArrayList<AMQChannel<AMQProtocolEngine>>(_channelMap.values());
- }
- }
-
- public AMQChannel<AMQProtocolEngine> getAndAssertChannel(int channelId) throws AMQException
- {
- AMQChannel<AMQProtocolEngine> channel = getChannel(channelId);
- if (channel == null)
- {
- throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
+ return new ArrayList<>(_channelMap.values());
}
-
- return channel;
}
- public AMQChannel<AMQProtocolEngine> getChannel(int channelId)
+ public AMQChannel getChannel(int channelId)
{
- final AMQChannel<AMQProtocolEngine> channel =
+ final AMQChannel channel =
((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
if ((channel == null) || channel.isClosing())
{
@@ -773,38 +565,17 @@ public class AMQProtocolEngine implement
return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId);
}
- public void addChannel(AMQChannel<AMQProtocolEngine> channel) throws AMQException
+ public void addChannel(AMQChannel channel)
{
- if (_closed)
- {
- throw new AMQException("Session is closed");
- }
-
final int channelId = channel.getChannelId();
- if (_closingChannelsList.containsKey(channelId))
- {
- throw new AMQException("Session is marked awaiting channel close");
- }
-
- if (_channelMap.size() == _maxNoOfChannels)
- {
- String errorMessage =
- toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
- + "); can't create channel";
- _logger.error(errorMessage);
- throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage);
- }
- else
+ synchronized (_channelMap)
{
- synchronized (_channelMap)
+ _channelMap.put(channel.getChannelId(), channel);
+ sessionAdded(channel);
+ if(_blocking)
{
- _channelMap.put(channel.getChannelId(), channel);
- sessionAdded(channel);
- if(_blocking)
- {
- channel.block();
- }
+ channel.block();
}
}
@@ -830,7 +601,7 @@ public class AMQProtocolEngine implement
}
}
- public Long getMaximumNumberOfChannels()
+ public long getMaximumNumberOfChannels()
{
return _maxNoOfChannels;
}
@@ -840,52 +611,52 @@ public class AMQProtocolEngine implement
_maxNoOfChannels = value;
}
- /**
- * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
- * subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
- *
- * @param channelId id of the channel to close
- *
- * @throws IllegalArgumentException if the channel id is not valid
- */
- @Override
- public void closeChannel(int channelId)
+
+ void closeChannel(AMQChannel channel)
+ {
+ closeChannel(channel, null, null, false);
+ }
+
+ public void closeChannelAndWriteFrame(AMQChannel channel, AMQConstant cause, String message)
{
- closeChannel(channelId, null, null);
+ writeFrame(new AMQFrame(channel.getChannelId(),
+ getMethodRegistry().createChannelCloseBody(cause.getCode(),
+ AMQShortString.validValueOf(message),
+ _currentClassId,
+ _currentMethodId)));
+ closeChannel(channel, cause, message, true);
}
public void closeChannel(int channelId, AMQConstant cause, String message)
{
- final AMQChannel<AMQProtocolEngine> channel = getChannel(channelId);
+ final AMQChannel channel = getChannel(channelId);
if (channel == null)
{
throw new IllegalArgumentException("Unknown channel id");
}
- else
+ closeChannel(channel, cause, message, true);
+ }
+
+ void closeChannel(AMQChannel channel, AMQConstant cause, String message, boolean mark)
+ {
+ int channelId = channel.getChannelId();
+ try
{
- try
+ channel.close(cause, message);
+ if(mark)
{
- channel.close(cause, message);
markChannelAwaitingCloseOk(channelId);
}
- finally
- {
- removeChannel(channelId);
- }
+ }
+ finally
+ {
+ removeChannel(channelId);
}
}
+
public void closeChannelOk(int channelId)
{
- // todo QPID-847 - This is called from two locations ChannelCloseHandler and ChannelCloseOkHandler.
- // When it is the CC_OK_Handler then it makes sense to remove the channel else we will leak memory.
- // We do it from the Close Handler as we are sending the OK back to the client.
- // While this is AMQP spec compliant. The Java client in the event of an IllegalArgumentException
- // will send a close-ok.. Where we should call removeChannel.
- // However, due to the poor exception handling on the client. The client-user will be notified of the
- // InvalidArgument and if they then decide to close the session/connection then the there will be time
- // for that to occur i.e. a new close method be sent before the exception handling can mark the session closed.
-
_closingChannelsList.remove(channelId);
}
@@ -901,7 +672,7 @@ public class AMQProtocolEngine implement
*/
public void removeChannel(int channelId)
{
- AMQChannel<AMQProtocolEngine> session;
+ AMQChannel session;
synchronized (_channelMap)
{
session = _channelMap.remove(channelId);
@@ -937,7 +708,7 @@ public class AMQProtocolEngine implement
*/
private void closeAllChannels()
{
- for (AMQChannel<AMQProtocolEngine> channel : getChannels())
+ for (AMQChannel channel : getChannels())
{
channel.close();
}
@@ -952,7 +723,6 @@ public class AMQProtocolEngine implement
}
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
- @Override
public void closeSession()
{
@@ -1042,13 +812,30 @@ public class AMQProtocolEngine implement
private void closeConnection(int channelId, AMQConnectionException e)
{
- try
+
+ if (_logger.isInfoEnabled())
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Closing connection due to: " + e);
- }
+ _logger.info("Closing connection due to: " + e);
+ }
+ closeConnection(channelId, e.getCloseFrame());
+ }
+
+ void closeConnection(AMQConstant errorCode,
+ String message, int channelId)
+ {
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing connection due to: " + message);
+ }
+ closeConnection(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), _currentClassId, _currentMethodId)));
+ }
+
+ private void closeConnection(int channelId, AMQFrame frame)
+ {
+ try
+ {
markChannelAwaitingCloseOk(channelId);
closeSession();
}
@@ -1056,8 +843,7 @@ public class AMQProtocolEngine implement
{
try
{
- _stateManager.changeState(AMQState.CONNECTION_CLOSING);
- writeFrame(e.getCloseFrame(channelId));
+ writeFrame(frame);
}
finally
{
@@ -1068,23 +854,9 @@ public class AMQProtocolEngine implement
}
- @Override
public void closeProtocolSession()
{
_network.close();
-
- try
- {
- _stateManager.changeState(AMQState.CONNECTION_CLOSED);
- }
- catch (ConnectionScopedRuntimeException e)
- {
- _logger.info(e.getMessage());
- }
- catch (TransportException e)
- {
- _logger.info(e.getMessage());
- }
}
public String toString()
@@ -1174,9 +946,8 @@ public class AMQProtocolEngine implement
private void setProtocolVersion(ProtocolVersion pv)
{
_protocolVersion = pv;
- _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
- _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this);
- _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion);
+ _methodRegistry.setProtocolVersion(_protocolVersion);
+ _protocolOutputConverter = new ProtocolOutputConverterImpl(this);
}
public byte getProtocolMajorVersion()
@@ -1204,7 +975,7 @@ public class AMQProtocolEngine implement
return _virtualHost;
}
- public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) throws AMQException
+ public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost)
{
_virtualHost = virtualHost;
@@ -1285,11 +1056,6 @@ public class AMQProtocolEngine implement
return _methodRegistry;
}
- public MethodDispatcher getMethodDispatcher()
- {
- return _dispatcher;
- }
-
public void closed()
{
try
@@ -1303,14 +1069,10 @@ public class AMQProtocolEngine implement
closeProtocolSession();
}
}
- catch (ConnectionScopedRuntimeException e)
+ catch (ConnectionScopedRuntimeException | TransportException e)
{
_logger.error("Could not close protocol engine", e);
}
- catch (TransportException e)
- {
- _logger.error("Could not close protocol engine", e);
- }
}
public void readerIdle()
@@ -1351,9 +1113,11 @@ public class AMQProtocolEngine implement
{
_logger.error("Exception caught in " + this + ", closing connection explicitly: " + throwable, throwable);
-
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion());
- ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200, AMQShortString.validValueOf(throwable.getMessage()),0,0);
+ ConnectionCloseBody closeBody = _methodRegistry.createConnectionCloseBody(AMQConstant.INTERNAL_ERROR.getCode(),
+ AMQShortString.validValueOf(
+ throwable.getMessage()),
+ _currentClassId,
+ _currentMethodId);
writeFrame(closeBody.generateFrame(0));
@@ -1374,11 +1138,6 @@ public class AMQProtocolEngine implement
}
}
- public void setSender(Sender<ByteBuffer> sender)
- {
- // Do nothing
- }
-
public long getReadBytes()
{
return _readBytes;
@@ -1460,7 +1219,7 @@ public class AMQProtocolEngine implement
return String.valueOf(getRemoteAddress());
}
- public void closeSession(AMQChannel<AMQProtocolEngine> session, AMQConstant cause, String message)
+ public void closeSession(AMQChannel session, AMQConstant cause, String message)
{
int channelId = session.getChannelId();
closeChannel(channelId, cause, message);
@@ -1470,7 +1229,7 @@ public class AMQProtocolEngine implement
methodRegistry.createChannelCloseBody(
cause.getCode(),
AMQShortString.validValueOf(message),
- 0,0);
+ 0, 0);
writeFrame(responseBody.generateFrame(channelId));
}
@@ -1478,9 +1237,8 @@ public class AMQProtocolEngine implement
public void close(AMQConstant cause, String message)
{
closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
- getProtocolOutputConverter().getProtocolMajorVersion(),
- getProtocolOutputConverter().getProtocolMinorVersion(),
- null));
+ getMethodRegistry(),
+ null));
}
public void block()
@@ -1490,7 +1248,7 @@ public class AMQProtocolEngine implement
if(!_blocking)
{
_blocking = true;
- for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values())
+ for(AMQChannel channel : _channelMap.values())
{
channel.block();
}
@@ -1505,7 +1263,7 @@ public class AMQProtocolEngine implement
if(_blocking)
{
_blocking = false;
- for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values())
+ for(AMQChannel channel : _channelMap.values())
{
channel.unblock();
}
@@ -1518,9 +1276,9 @@ public class AMQProtocolEngine implement
return _closed;
}
- public List<AMQChannel<AMQProtocolEngine>> getSessionModels()
+ public List<AMQChannel> getSessionModels()
{
- return new ArrayList<AMQChannel<AMQProtocolEngine>>(getChannels());
+ return new ArrayList<>(getChannels());
}
public LogSubject getLogSubject()
@@ -1609,6 +1367,382 @@ public class AMQProtocolEngine implement
_deferFlush = deferFlush;
}
+ @Override
+ public void receiveChannelOpen(final int channelId)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + channelId + "] ChannelOpen");
+ }
+
+ // Protect the broker against out of order frame request.
+ if (_virtualHost == null)
+ {
+ closeConnection(AMQConstant.COMMAND_INVALID,
+ "Virtualhost has not yet been set. ConnectionOpen has not been called.", channelId);
+ }
+ else if(getChannel(channelId) != null || channelAwaitingClosure(channelId))
+ {
+ closeConnection(AMQConstant.CHANNEL_ERROR, "Channel " + channelId + " already exists", channelId);
+ }
+ else if(channelId > getMaximumNumberOfChannels())
+ {
+ closeConnection(AMQConstant.CHANNEL_ERROR,
+ "Channel " + channelId + " cannot be created as the max allowed channel id is "
+ + getMaximumNumberOfChannels(),
+ channelId);
+ }
+ else
+ {
+ _logger.info("Connecting to: " + _virtualHost.getName());
+
+ final AMQChannel channel = new AMQChannel(this, channelId, _virtualHost.getMessageStore());
+
+ addChannel(channel);
+
+ ChannelOpenOkBody response;
+
+
+ response = getMethodRegistry().createChannelOpenOkBody();
+
+
+ writeFrame(response.generateFrame(channelId));
+ }
+ }
+
+ @Override
+ public void receiveConnectionOpen(AMQShortString virtualHostName,
+ AMQShortString capabilities,
+ boolean insist)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV ConnectionOpen[" +" virtualHost: " + virtualHostName + " capabilities: " + capabilities + " insist: " + insist + " ]");
+ }
+
+ String virtualHostStr;
+ if ((virtualHostName != null) && virtualHostName.charAt(0) == '/')
+ {
+ virtualHostStr = virtualHostName.toString().substring(1);
+ }
+ else
+ {
+ virtualHostStr = virtualHostName == null ? null : virtualHostName.toString();
+ }
+
+ VirtualHostImpl virtualHost = ((AmqpPort)getPort()).getVirtualHost(virtualHostStr);
+
+ if (virtualHost == null)
+ {
+ closeConnection(AMQConstant.NOT_FOUND,
+ "Unknown virtual host: '" + virtualHostName + "'",0);
+
+ }
+ else
+ {
+ // Check virtualhost access
+ if (virtualHost.getState() != State.ACTIVE)
+ {
+ closeConnection(AMQConstant.CONNECTION_FORCED,
+ "Virtual host '" + virtualHost.getName() + "' is not active",0);
+
+ }
+ else
+ {
+ setVirtualHost(virtualHost);
+ try
+ {
+ virtualHost.getSecurityManager().authoriseCreateConnection(this);
+ if (getContextKey() == null)
+ {
+ setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis())));
+ }
+
+ MethodRegistry methodRegistry = getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(virtualHostName);
+
+ writeFrame(responseBody.generateFrame(0));
+ }
+ catch (AccessControlException e)
+ {
+ closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(),0);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void receiveConnectionClose(final int replyCode,
+ final AMQShortString replyText,
+ final int classId,
+ final int methodId)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV ConnectionClose[" +" replyCode: " + replyCode + " replyText: " + replyText + " classId: " + classId + " methodId: " + methodId + " ]");
+ }
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("ConnectionClose received with reply code/reply text " + replyCode + "/" +
+ replyText + " for " + this);
+ }
+ try
+ {
+ closeSession();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error closing protocol session: " + e, e);
+ }
+
+ MethodRegistry methodRegistry = getMethodRegistry();
+ ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
+ writeFrame(responseBody.generateFrame(0));
+
+ closeProtocolSession();
+
+ }
+
+ @Override
+ public void receiveConnectionCloseOk()
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV ConnectionCloseOk");
+ }
+
+ _logger.info("Received Connection-close-ok");
+
+ try
+ {
+ closeSession();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error closing protocol session: " + e, e);
+ }
+ }
+
+ @Override
+ public void receiveConnectionSecureOk(final byte[] response)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV ConnectionSecureOk[ response: ******** ] ");
+ }
+
+ Broker<?> broker = getBroker();
+
+ SubjectCreator subjectCreator = getSubjectCreator();
+
+ SaslServer ss = getSaslServer();
+ if (ss == null)
+ {
+ closeConnection(AMQConstant.INTERNAL_ERROR, "No SASL context set up in session",0 );
+ }
+ MethodRegistry methodRegistry = getMethodRegistry();
+ SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, response);
+ switch (authResult.getStatus())
+ {
+ case ERROR:
+ Exception cause = authResult.getCause();
+
+ _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
+
+ closeConnection(AMQConstant.NOT_ALLOWED, "Authentication failed",0);
+
+ disposeSaslServer();
+ break;
+ case SUCCESS:
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Connected as: " + authResult.getSubject());
+ }
+
+ int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+
+ if (frameMax <= 0)
+ {
+ frameMax = Integer.MAX_VALUE;
+ }
+
+ ConnectionTuneBody tuneBody =
+ methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
+ frameMax,
+ broker.getConnection_heartBeatDelay());
+ writeFrame(tuneBody.generateFrame(0));
+ setAuthorizedSubject(authResult.getSubject());
+ disposeSaslServer();
+ break;
+ case CONTINUE:
+
+ ConnectionSecureBody
+ secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
+ writeFrame(secureBody.generateFrame(0));
+ }
+ }
+
+
+ private void disposeSaslServer()
+ {
+ SaslServer ss = getSaslServer();
+ if (ss != null)
+ {
+ setSaslServer(null);
+ try
+ {
+ ss.dispose();
+ }
+ catch (SaslException e)
+ {
+ _logger.error("Error disposing of Sasl server: " + e);
+ }
+ }
+ }
+
+ @Override
+ public void receiveConnectionStartOk(final FieldTable clientProperties,
+ final AMQShortString mechanism,
+ final byte[] response,
+ final AMQShortString locale)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV ConnectionStartOk["
+ + " clientProperties: "
+ + clientProperties
+ + " mechanism: "
+ + mechanism
+ + " response: ********"
+ + " locale: "
+ + locale
+ + " ]");
+ }
+
+ Broker<?> broker = getBroker();
+
+ _logger.info("SASL Mechanism selected: " + mechanism);
+ _logger.info("Locale selected: " + locale);
+
+ SubjectCreator subjectCreator = getSubjectCreator();
+ SaslServer ss = null;
+ try
+ {
+ ss = subjectCreator.createSaslServer(String.valueOf(mechanism),
+ getLocalFQDN(),
+ getPeerPrincipal());
+
+ if (ss == null)
+ {
+ closeConnection(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + mechanism, 0);
+
+ }
+ else
+ {
+ //save clientProperties
+ setClientProperties(clientProperties);
+
+ setSaslServer(ss);
+
+ final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, response);
+
+ MethodRegistry methodRegistry = getMethodRegistry();
+
+ switch (authResult.getStatus())
+ {
+ case ERROR:
+ Exception cause = authResult.getCause();
+
+ _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
+
+ closeConnection(AMQConstant.NOT_ALLOWED, "Authentication failed", 0);
+
+ disposeSaslServer();
+ break;
+
+ case SUCCESS:
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Connected as: " + authResult.getSubject());
+ }
+ setAuthorizedSubject(authResult.getSubject());
+
+ int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+
+ if (frameMax <= 0)
+ {
+ frameMax = Integer.MAX_VALUE;
+ }
+
+ ConnectionTuneBody
+ tuneBody =
+ methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
+ frameMax,
+ broker.getConnection_heartBeatDelay());
+ writeFrame(tuneBody.generateFrame(0));
+ break;
+ case CONTINUE:
+ ConnectionSecureBody
+ secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
+ writeFrame(secureBody.generateFrame(0));
+ }
+ }
+ }
+ catch (SaslException e)
+ {
+ disposeSaslServer();
+ closeConnection(AMQConstant.INTERNAL_ERROR, "SASL error: " + e, 0);
+ }
+ }
+
+ @Override
+ public void receiveConnectionTuneOk(final int channelMax, final long frameMax, final int heartbeat)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV ConnectionTuneOk[" +" channelMax: " + channelMax + " frameMax: " + frameMax + " heartbeat: " + heartbeat + " ]");
+ }
+
+ initHeartbeats(heartbeat);
+
+ int brokerFrameMax = getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+ if (brokerFrameMax <= 0)
+ {
+ brokerFrameMax = Integer.MAX_VALUE;
+ }
+
+ if (frameMax > (long) brokerFrameMax)
+ {
+ closeConnection(AMQConstant.SYNTAX_ERROR,
+ "Attempt to set max frame size to " + frameMax
+ + " greater than the broker will allow: "
+ + brokerFrameMax, 0);
+ }
+ else if (frameMax > 0 && frameMax < AMQConstant.FRAME_MIN_SIZE.getCode())
+ {
+ closeConnection(AMQConstant.SYNTAX_ERROR,
+ "Attempt to set max frame size to " + frameMax
+ + " which is smaller than the specification defined minimum: "
+ + AMQConstant.FRAME_MIN_SIZE.getCode(), 0);
+ }
+ else
+ {
+ int calculatedFrameMax = frameMax == 0 ? brokerFrameMax : (int) frameMax;
+ setMaxFrameSize(calculatedFrameMax);
+
+ //0 means no implied limit, except that forced by protocol limitations (0xFFFF)
+ setMaximumNumberOfChannels( ((channelMax == 0l) || (channelMax > 0xFFFFL))
+ ? 0xFFFFL
+ : channelMax);
+ }
+ }
+
+ public int getBinaryDataLimit()
+ {
+ return _binaryDataLimit;
+ }
+
public final class WriteDeliverMethod
implements ClientDeliveryMethod
{
@@ -1656,24 +1790,31 @@ public class AMQProtocolEngine implement
return _lastWriteTime.get();
}
- @Override
public boolean isCloseWhenNoRoute()
{
return _closeWhenNoRoute;
}
- @Override
public boolean isCompressionSupported()
{
return _compressionSupported && _broker.isMessageCompressionEnabled();
}
- @Override
public int getMessageCompressionThreshold()
{
return _messageCompressionThreshold;
}
+ public Broker<?> getBroker()
+ {
+ return _broker;
+ }
+
+ public SubjectCreator getSubjectCreator()
+ {
+ return _broker.getSubjectCreator(getLocalAddress(), getTransport().isSecure());
+ }
+
public EventLogger getEventLogger()
{
if(_virtualHost != null)
@@ -1685,4 +1826,72 @@ public class AMQProtocolEngine implement
return _broker.getEventLogger();
}
}
+
+ @Override
+ public ServerChannelMethodProcessor getChannelMethodProcessor(final int channelId)
+ {
+ ServerChannelMethodProcessor channelMethodProcessor = getChannel(channelId);
+ if(channelMethodProcessor == null)
+ {
+ channelMethodProcessor = (ServerChannelMethodProcessor) Proxy.newProxyInstance(ServerMethodDispatcher.class.getClassLoader(),
+ new Class[] { ServerChannelMethodProcessor.class }, new InvocationHandler()
+ {
+ @Override
+ public Object invoke(final Object proxy, final Method method, final Object[] args)
+ throws Throwable
+ {
+ if(method.getName().startsWith("receive"))
+ {
+ closeConnection(AMQConstant.CHANNEL_ERROR,
+ "Unknown channel id: " + channelId,
+ channelId);
+ return null;
+ }
+ else if(method.getName().equals("ignoreAllButCloseOk"))
+ {
+ return false;
+ }
+ return null;
+ }
+ });
+ }
+ return channelMethodProcessor;
+ }
+
+ @Override
+ public void receiveHeartbeat()
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV Heartbeat");
+ }
+
+ // No op
+ }
+
+ @Override
+ public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation)
+ {
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV ProtocolHeader [" + protocolInitiation + " ]");
+ }
+
+ protocolInitiationReceived(protocolInitiation);
+ }
+
+ @Override
+ public void setCurrentMethod(final int classId, final int methodId)
+ {
+ _currentClassId = classId;
+ _currentMethodId = methodId;
+ }
+
+ @Override
+ public boolean ignoreAllButCloseOk()
+ {
+ return _closing.get();
+ }
+
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Fri Oct 17 14:23:19 2014
@@ -39,7 +39,6 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -80,7 +79,7 @@ public abstract class ConsumerTarget_0_8
public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager) throws AMQException
+ FlowCreditManager creditManager)
{
return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
}
@@ -90,7 +89,7 @@ public abstract class ConsumerTarget_0_8
final FieldTable filters,
final FlowCreditManager creditManager,
final ClientDeliveryMethod deliveryMethod,
- final RecordDeliveryMethod recordMethod) throws AMQException
+ final RecordDeliveryMethod recordMethod)
{
return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
}
@@ -107,7 +106,6 @@ public abstract class ConsumerTarget_0_8
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
RecordDeliveryMethod recordMethod)
- throws AMQException
{
super(channel, consumerTag,
filters, creditManager, deliveryMethod, recordMethod);
@@ -148,7 +146,7 @@ public abstract class ConsumerTarget_0_8
public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager) throws AMQException
+ FlowCreditManager creditManager)
{
return new NoAckConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
}
@@ -171,7 +169,6 @@ public abstract class ConsumerTarget_0_8
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
RecordDeliveryMethod recordMethod)
- throws AMQException
{
super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
@@ -207,7 +204,7 @@ public abstract class ConsumerTarget_0_8
long size;
synchronized (getChannel())
{
- getChannel().getProtocolSession().setDeferFlush(batch);
+ getChannel().getConnection().setDeferFlush(batch);
long deliveryTag = getChannel().getNextDeliveryTag();
size = sendToClient(consumer, message, props, deliveryTag);
@@ -249,7 +246,6 @@ public abstract class ConsumerTarget_0_8
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
RecordDeliveryMethod recordMethod)
- throws AMQException
{
super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
}
@@ -265,7 +261,6 @@ public abstract class ConsumerTarget_0_8
public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
FlowCreditManager creditManager)
- throws AMQException
{
return new AckConsumer(channel,consumerTag,filters,creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
}
@@ -276,7 +271,6 @@ public abstract class ConsumerTarget_0_8
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
RecordDeliveryMethod recordMethod)
- throws AMQException
{
return new AckConsumer(channel,consumerTag,filters,creditManager, deliveryMethod, recordMethod);
}
@@ -288,7 +282,6 @@ public abstract class ConsumerTarget_0_8
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
RecordDeliveryMethod recordMethod)
- throws AMQException
{
super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
}
@@ -308,7 +301,7 @@ public abstract class ConsumerTarget_0_8
synchronized (getChannel())
{
- getChannel().getProtocolSession().setDeferFlush(batch);
+ getChannel().getConnection().setDeferFlush(batch);
long deliveryTag = getChannel().getNextDeliveryTag();
addUnacknowledgedMessage(entry);
@@ -346,7 +339,6 @@ public abstract class ConsumerTarget_0_8
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
RecordDeliveryMethod recordMethod)
- throws AMQException
{
super(State.ACTIVE);
@@ -474,9 +466,9 @@ public abstract class ConsumerTarget_0_8
return _consumerTag;
}
- public AMQProtocolSession getProtocolSession()
+ public AMQProtocolEngine getProtocolSession()
{
- return _channel.getProtocolSession();
+ return _channel.getConnection();
}
public void restoreCredit(final ServerMessage message)
@@ -525,7 +517,7 @@ public abstract class ConsumerTarget_0_8
public void confirmAutoClose()
{
- ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter();
+ ProtocolOutputConverter converter = getChannel().getConnection().getProtocolOutputConverter();
converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag());
}
@@ -540,9 +532,9 @@ public abstract class ConsumerTarget_0_8
public void flushBatched()
{
- _channel.getProtocolSession().setDeferFlush(false);
+ _channel.getConnection().setDeferFlush(false);
- _channel.getProtocolSession().flushBatched();
+ _channel.getConnection().flushBatched();
}
protected void addUnacknowledgedMessage(MessageInstance entry)
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java Fri Oct 17 14:23:19 2014
@@ -20,14 +20,13 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.util.Map;
+
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
-import java.util.Map;
-
public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
{
private static final Logger _log = Logger.getLogger(ExtractResendAndRequeue.class);
@@ -45,7 +44,7 @@ public class ExtractResendAndRequeue imp
_msgToResend = msgToResend;
}
- public boolean callback(final long deliveryTag, MessageInstance message) throws AMQException
+ public boolean callback(final long deliveryTag, MessageInstance message)
{
message.setRedelivered();
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java Fri Oct 17 14:23:19 2014
@@ -20,16 +20,15 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.AMQException;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.message.MessageDestination;
-import java.util.ArrayList;
-import java.util.List;
-
public class IncomingMessage
{
@@ -58,7 +57,7 @@ public class IncomingMessage
return _messagePublishInfo;
}
- public void addContentBodyFrame(final ContentBody contentChunk) throws AMQException
+ public void addContentBodyFrame(final ContentBody contentChunk)
{
_bodyLengthReceived += contentChunk.getSize();
_contentChunks.add(contentChunk);
@@ -94,7 +93,7 @@ public class IncomingMessage
_messageDestination = e;
}
- public int getBodyCount() throws AMQException
+ public int getBodyCount()
{
return _contentChunks.size();
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java Fri Oct 17 14:23:19 2014
@@ -33,7 +33,7 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.plugin.PluggableService;
@@ -135,38 +135,10 @@ public class MessageConverter_Internal_t
private MessageMetaData convertMetaData(final InternalMessage serverMsg, final String bodyMimeType, final int size)
{
- MessagePublishInfo publishInfo = new MessagePublishInfo()
- {
- @Override
- public AMQShortString getExchange()
- {
- return AMQShortString.EMPTY_STRING;
- }
-
- @Override
- public void setExchange(final AMQShortString amqShortString)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isImmediate()
- {
- return false;
- }
-
- @Override
- public boolean isMandatory()
- {
- return false;
- }
-
- @Override
- public AMQShortString getRoutingKey()
- {
- return AMQShortString.valueOf(serverMsg.getInitialRoutingAddress());
- }
- };
+ MessagePublishInfo publishInfo = new MessagePublishInfo(AMQShortString.EMPTY_STRING,
+ false,
+ false,
+ AMQShortString.valueOf(serverMsg.getInitialRoutingAddress()));
final BasicContentHeaderProperties props = new BasicContentHeaderProperties();
@@ -191,7 +163,7 @@ public class MessageConverter_Internal_t
props.setHeaders(FieldTable.convertToFieldTable(headerProps));
- final ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID);
+ final ContentHeaderBody chb = new ContentHeaderBody(props);
chb.setBodySize(size);
return new MessageMetaData(publishInfo, chb, serverMsg.getArrivalTime());
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java Fri Oct 17 14:23:19 2014
@@ -20,8 +20,13 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Collection;
-import org.apache.qpid.AMQException;
+import java.util.Set;
+
import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.AMQShortString;
@@ -29,7 +34,7 @@ import org.apache.qpid.framing.BasicCont
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.EncodingUtils;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -37,12 +42,6 @@ import org.apache.qpid.server.util.ByteB
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.util.ByteBufferInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Set;
-
/**
* Encapsulates a publish body and a content header. In the context of the message store these are treated as a
* single unit.
@@ -170,33 +169,11 @@ public class MessageMetaData implements
long arrivalTime = EncodingUtils.readLong(dais);
MessagePublishInfo publishBody =
- new MessagePublishInfo()
- {
+ new MessagePublishInfo(exchange,
+ (flags & IMMEDIATE_FLAG) != 0,
+ (flags & MANDATORY_FLAG) != 0,
+ routingKey);
- public AMQShortString getExchange()
- {
- return exchange;
- }
-
- public void setExchange(AMQShortString exchange)
- {
- }
-
- public boolean isImmediate()
- {
- return (flags & IMMEDIATE_FLAG) != 0;
- }
-
- public boolean isMandatory()
- {
- return (flags & MANDATORY_FLAG) != 0;
- }
-
- public AMQShortString getRoutingKey()
- {
- return routingKey;
- }
- };
return new MessageMetaData(publishBody, chb, arrivalTime);
}
catch (IOException e)
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java Fri Oct 17 14:23:19 2014
@@ -20,13 +20,12 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.queue.QueueEntry;
-
import java.util.Collection;
import java.util.Set;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
+
public interface UnacknowledgedMessageMap
{
@@ -37,12 +36,12 @@ public interface UnacknowledgedMessageMa
*@param message the message being iterated over @return true to stop iteration, false to continue
* @throws AMQException
*/
- boolean callback(final long deliveryTag, MessageInstance message) throws AMQException;
+ boolean callback(final long deliveryTag, MessageInstance message);
void visitComplete();
}
- void visit(Visitor visitor) throws AMQException;
+ void visit(Visitor visitor);
void add(long deliveryTag, MessageInstance message);
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Fri Oct 17 14:23:19 2014
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.MessageInstance;
public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
@@ -82,7 +81,7 @@ public class UnacknowledgedMessageMapImp
}
}
- public void visit(Visitor visitor) throws AMQException
+ public void visit(Visitor visitor)
{
synchronized (_lock)
{
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java Fri Oct 17 14:23:19 2014
@@ -29,7 +29,7 @@ import java.util.Map;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.message.MessageContentSource;
@@ -41,7 +41,7 @@ import org.apache.qpid.test.utils.QpidTe
public class AMQChannelTest extends QpidTestCase
{
private VirtualHostImpl _virtualHost;
- private AMQProtocolSession _protocolSession;
+ private AMQProtocolEngine _protocolSession;
private Map<Integer,String> _replies;
private Broker _broker;
@@ -98,18 +98,17 @@ public class AMQChannelTest extends Qpid
AMQChannel channel = new AMQChannel(_protocolSession, 1, _virtualHost.getMessageStore());
channel.setLocalTransactional();
- MessagePublishInfo info = mock(MessagePublishInfo.class);
+ MessagePublishInfo info = new MessagePublishInfo(new AMQShortString("test"), false, false, null);
ExchangeImpl e = mock(ExchangeImpl.class);
ContentHeaderBody contentHeaderBody= mock(ContentHeaderBody.class);
BasicContentHeaderProperties properties = mock(BasicContentHeaderProperties.class);
when(contentHeaderBody.getProperties()).thenReturn(properties);
- when(info.getExchange()).thenReturn(new AMQShortString("test"));
when(properties.getUserId()).thenReturn(new AMQShortString(_protocolSession.getAuthorizedPrincipal().getName() + "_incorrect"));
channel.setPublishFrame(info, e);
channel.publishContentHeader(contentHeaderBody);
- channel.commit();
+ channel.commit(null, false);
assertEquals("Unexpected number of replies", 1, _replies.size());
assertEquals("Message authorization passed", "Access Refused", _replies.get(403));
@@ -121,18 +120,17 @@ public class AMQChannelTest extends Qpid
AMQChannel channel = new AMQChannel(_protocolSession, 1, _virtualHost.getMessageStore());
channel.setLocalTransactional();
- MessagePublishInfo info = mock(MessagePublishInfo.class);
+ MessagePublishInfo info = new MessagePublishInfo(new AMQShortString("test"), false, false, null);
ExchangeImpl e = mock(ExchangeImpl.class);
ContentHeaderBody contentHeaderBody= mock(ContentHeaderBody.class);
BasicContentHeaderProperties properties = mock(BasicContentHeaderProperties.class);
when(contentHeaderBody.getProperties()).thenReturn(properties);
- when(info.getExchange()).thenReturn(new AMQShortString("test"));
when(properties.getUserId()).thenReturn(new AMQShortString(_protocolSession.getAuthorizedPrincipal().getName()));
channel.setPublishFrame(info, e);
channel.publishContentHeader(contentHeaderBody);
- channel.commit();
+ channel.commit(null, false);
assertEquals("Unexpected number of replies", 0, _replies.size());
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java Fri Oct 17 14:23:19 2014
@@ -29,7 +29,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.flow.LimitlessCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
@@ -51,7 +51,7 @@ public class AckTest extends QpidTestCas
private ConsumerTarget_0_8 _subscriptionTarget;
private ConsumerImpl _consumer;
- private AMQProtocolSession _protocolSession;
+ private AMQProtocolEngine _protocolEngine;
private TestMemoryMessageStore _messageStore;
@@ -68,8 +68,8 @@ public class AckTest extends QpidTestCas
super.setUp();
BrokerTestHelper.setUp();
_channel = BrokerTestHelper_0_8.createChannel(5);
- _protocolSession = _channel.getProtocolSession();
- _virtualHost = _protocolSession.getVirtualHost();
+ _protocolEngine = _channel.getConnection();
+ _virtualHost = _protocolEngine.getVirtualHost();
_queue = BrokerTestHelper.createQueue(getTestName(), _virtualHost);
_messageStore = (TestMemoryMessageStore)_virtualHost.getMessageStore();
}
@@ -90,37 +90,10 @@ public class AckTest extends QpidTestCas
{
for (int i = 1; i <= count; i++)
{
- MessagePublishInfo publishBody = new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return new AMQShortString("someExchange");
- }
-
- public void setExchange(AMQShortString exchange)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isImmediate()
- {
- return false;
- }
-
- public boolean isMandatory()
- {
- return false;
- }
-
- public AMQShortString getRoutingKey()
- {
- return new AMQShortString("rk");
- }
- };
+ MessagePublishInfo publishBody = new MessagePublishInfo(new AMQShortString("someExchange"), false, false,
+ new AMQShortString("rk"));
BasicContentHeaderProperties b = new BasicContentHeaderProperties();
- ContentHeaderBody cb = new ContentHeaderBody();
- cb.setProperties(b);
+ ContentHeaderBody cb = new ContentHeaderBody(b);
if (persistent)
{
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java Fri Oct 17 14:23:19 2014
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.MessageCounter;
import org.apache.qpid.server.store.MessageStore;
@@ -75,7 +76,7 @@ public class AcknowledgeTest extends Qpi
private InternalTestProtocolSession getSession()
{
- return (InternalTestProtocolSession)_channel.getProtocolSession();
+ return (InternalTestProtocolSession)_channel.getConnection();
}
private AMQQueue getQueue()
@@ -129,7 +130,7 @@ public class AcknowledgeTest extends Qpi
if (getChannel().isTransactional())
{
- getChannel().commit();
+ getChannel().commit(null, false);
}
//Ensure they are stored
@@ -140,7 +141,7 @@ public class AcknowledgeTest extends Qpi
//Subscribe to the queue
AMQShortString subscriber = _channel.consumeFromSource(null,
- Collections.singleton(_queue),
+ Collections.<MessageSource>singleton(_queue),
true, null, true, false);
getQueue().deliverAsync();
@@ -164,7 +165,7 @@ public class AcknowledgeTest extends Qpi
if (getChannel().isTransactional())
{
- getChannel().commit();
+ getChannel().commit(null, false);
}
// Check Remaining Acknowledgements
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java Fri Oct 17 14:23:19 2014
@@ -24,18 +24,15 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
public class BrokerTestHelper_0_8 extends BrokerTestHelper
{
- public static AMQChannel createChannel(int channelId, AMQProtocolSession session) throws AMQException
+ public static AMQChannel createChannel(int channelId, AMQProtocolEngine session) throws AMQException
{
AMQChannel channel = new AMQChannel(session, channelId, session.getVirtualHost().getMessageStore());
session.addChannel(channel);
@@ -69,9 +66,7 @@ public class BrokerTestHelper_0_8 extend
{
AMQShortString routingKey = new AMQShortString(queueName);
AMQShortString exchangeNameAsShortString = new AMQShortString(exchangeName);
- MessagePublishInfo info = mock(MessagePublishInfo.class);
- when(info.getExchange()).thenReturn(exchangeNameAsShortString);
- when(info.getRoutingKey()).thenReturn(routingKey);
+ MessagePublishInfo info = new MessagePublishInfo(exchangeNameAsShortString, false, false, routingKey);
MessageDestination destination;
if(exchangeName == null || "".equals(exchangeName))
@@ -86,22 +81,20 @@ public class BrokerTestHelper_0_8 extend
{
channel.setPublishFrame(info, destination);
- // Set the body size
- ContentHeaderBody _headerBody = new ContentHeaderBody();
- _headerBody.setBodySize(0);
// Set Minimum properties
BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+
properties.setExpiration(0L);
properties.setTimestamp(System.currentTimeMillis());
// Make Message Persistent
properties.setDeliveryMode((byte) 2);
- _headerBody.setProperties(properties);
+ ContentHeaderBody headerBody = new ContentHeaderBody(properties, 0);
- channel.publishContentHeader(_headerBody);
+ channel.publishContentHeader(headerBody);
}
channel.sync();
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Fri Oct 17 14:23:19 2014
@@ -39,14 +39,13 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -231,7 +230,7 @@ public class InternalTestProtocolSession
//Simulate the Client responding with a CloseOK
// should really update the StateManger but we don't have access here
// changeState(AMQState.CONNECTION_CLOSED);
- ((AMQChannel)session).getProtocolSession().closeSession();
+ ((AMQChannel)session).getConnection().closeSession();
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java Fri Oct 17 14:23:19 2014
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -46,23 +44,16 @@ public class MaxChannelsTest extends Qpi
long maxChannels = 10L;
_session.setMaximumNumberOfChannels(maxChannels);
- assertEquals("Number of channels not correctly set.", new Long(maxChannels), _session.getMaximumNumberOfChannels());
+ assertEquals("Number of channels not correctly set.", maxChannels, _session.getMaximumNumberOfChannels());
- for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++)
+ for (long currentChannel = 1L; currentChannel <= maxChannels; currentChannel++)
{
- _session.addChannel(new AMQChannel(_session, (int) currentChannel, null));
+ _session.receiveChannelOpen( (int) currentChannel);
}
-
- try
- {
- _session.addChannel(new AMQChannel(_session, (int) maxChannels, null));
- fail("Cannot create more channels then maximum");
- }
- catch (AMQException e)
- {
- assertEquals("Wrong exception received.", e.getErrorCode(), AMQConstant.NOT_ALLOWED);
- }
- assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_session.getChannels().size()));
+ assertFalse("Connection should not be closed after opening " + maxChannels + " channels",_session.isClosed());
+ assertEquals("Maximum number of channels not set.", maxChannels, _session.getChannels().size());
+ _session.receiveChannelOpen((int) maxChannels+1);
+ assertTrue("Connection should be closed after opening " + (maxChannels + 1) + " channels",_session.isClosed());
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org