You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/14 20:23:18 UTC
svn commit: r1568495 [7/8] - in
/qpid/branches/java-broker-amqp-1-0-management/java: ./
amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/
bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/
bdbstore/jmx/src/test/java...
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Fri Feb 14 19:23:14 2014
@@ -114,7 +114,7 @@ public abstract class ConsumerTarget_0_8
* @throws org.apache.qpid.AMQException
*/
@Override
- public void send(MessageInstance entry, boolean batch) throws AMQException
+ public void send(MessageInstance entry, boolean batch)
{
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
@@ -177,7 +177,7 @@ public abstract class ConsumerTarget_0_8
* @throws org.apache.qpid.AMQException
*/
@Override
- public void send(MessageInstance entry, boolean batch) throws AMQException
+ public void send(MessageInstance entry, boolean batch)
{
// if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
@@ -293,7 +293,7 @@ public abstract class ConsumerTarget_0_8
* @throws org.apache.qpid.AMQException
*/
@Override
- public void send(MessageInstance entry, boolean batch) throws AMQException
+ public void send(MessageInstance entry, boolean batch)
{
@@ -505,7 +505,6 @@ public abstract class ConsumerTarget_0_8
}
protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag)
- throws AMQException
{
_deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag);
@@ -524,7 +523,7 @@ public abstract class ConsumerTarget_0_8
converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag());
}
- public void queueEmpty() throws AMQException
+ public void queueEmpty()
{
if (isAutoClose())
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java Fri Feb 14 19:23:14 2014
@@ -29,6 +29,7 @@ import org.apache.qpid.server.message.in
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.codec.BBEncoder;
@@ -236,7 +237,7 @@ public class MessageConverter_Internal_t
}
catch (IOException e)
{
- throw new RuntimeException(e);
+ throw new ConnectionScopedRuntimeException(e);
}
}
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java Fri Feb 14 19:23:14 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.typedmessage.TypedBytesContentReader;
@@ -124,11 +125,11 @@ public class MessageConverter_v0_8_to_In
}
catch (TypedBytesFormatException e)
{
- throw new RuntimeException(e); // TODO - Implement
+ throw new ConnectionScopedRuntimeException(e);
}
catch (EOFException e)
{
- throw new RuntimeException(e); // TODO - Implement
+ throw new ConnectionScopedRuntimeException(e);
}
}
return list;
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java Fri Feb 14 19:23:14 2014
@@ -22,6 +22,8 @@ package org.apache.qpid.server.protocol.
import java.util.Collection;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -32,6 +34,7 @@ import org.apache.qpid.server.message.AM
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.util.ByteBufferOutputStream;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.util.ByteBufferInputStream;
import java.io.DataInputStream;
@@ -132,7 +135,7 @@ public class MessageMetaData implements
catch (IOException e)
{
// This shouldn't happen as we are not actually using anything that can throw an IO Exception
- throw new RuntimeException(e);
+ throw new ConnectionScopedRuntimeException(e);
}
return dest.position()-oldPosition;
@@ -196,17 +199,21 @@ public class MessageMetaData implements
};
return new MessageMetaData(publishBody, chb, arrivalTime);
}
- catch (AMQException e)
+ catch (IOException e)
{
- throw new RuntimeException(e);
+ throw new ConnectionScopedRuntimeException(e);
}
- catch (IOException e)
+ catch (AMQProtocolVersionException e)
{
- throw new RuntimeException(e);
+ throw new ConnectionScopedRuntimeException(e);
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ throw new ConnectionScopedRuntimeException(e);
}
}
- };
+ }
public AMQMessageHeader getMessageHeader()
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java Fri Feb 14 19:23:14 2014
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -35,6 +36,7 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
+import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class BasicConsumeMethodHandler implements StateAwareMethodListener<BasicConsumeBody>
@@ -148,7 +150,7 @@ public class BasicConsumeMethodHandler i
}
}
- catch (org.apache.qpid.AMQInvalidArgumentException ise)
+ catch (AMQInvalidArgumentException ise)
{
_logger.debug("Closing connection due to invalid selector");
@@ -175,6 +177,13 @@ public class BasicConsumeMethodHandler i
+ queue.getName()
+ " exclusively as it already has a consumer");
}
+ catch (QpidSecurityException e)
+ {
+ throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " permission denied");
+ }
}
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java Fri Feb 14 19:23:14 2014
@@ -31,6 +31,7 @@ import org.apache.qpid.framing.MethodReg
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.flow.FlowCreditManager;
@@ -45,6 +46,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.EnumSet;
@@ -106,14 +108,33 @@ public class BasicGetMethodHandler imple
}
}
- if (!performGet(queue,protocolConnection, channel, !body.getNoAck()))
+ try
{
- MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
- // TODO - set clusterId
- BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
+ if (!performGet(queue,protocolConnection, channel, !body.getNoAck()))
+ {
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ // TODO - set clusterId
+ BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
- protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+ }
+ }
+ catch (QpidSecurityException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage());
+ }
+ catch (MessageSource.ExistingExclusiveConsumer e)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue has an exclusive consumer");
+ }
+ catch (MessageSource.ExistingConsumerPreventsExclusive e)
+ {
+ throw body.getConnectionException(AMQConstant.INTERNAL_ERROR,
+ "The GET request has been evaluated as an exclusive consumer, " +
+ "this is likely due to a programming error in the Qpid broker");
}
}
}
@@ -123,7 +144,8 @@ public class BasicGetMethodHandler imple
final AMQProtocolSession session,
final AMQChannel channel,
final boolean acks)
- throws AMQException
+ throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive,
+ MessageSource.ExistingExclusiveConsumer
{
final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
@@ -186,7 +208,7 @@ public class BasicGetMethodHandler imple
@Override
public void deliverToClient(final Consumer sub, final ServerMessage message,
- final InstanceProperties props, final long deliveryTag) throws AMQException
+ final InstanceProperties props, final long deliveryTag)
{
_singleMessageCredit.useCreditForMessage(message.getSize());
_session.getProtocolOutputConverter().writeGetOk(message,
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java Fri Feb 14 19:23:14 2014
@@ -34,6 +34,7 @@ import org.apache.qpid.server.exchange.E
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
+import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class BasicPublishMethodHandler implements StateAwareMethodListener<BasicPublishBody>
@@ -88,7 +89,14 @@ public class BasicPublishMethodHandler i
MessagePublishInfo info = session.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
info.setExchange(exchangeName);
- channel.setPublishFrame(info, exch);
+ try
+ {
+ channel.setPublishFrame(info, exch);
+ }
+ catch (QpidSecurityException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ }
}
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java Fri Feb 14 19:23:14 2014
@@ -35,6 +35,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.io.ByteArrayOutputStream;
@@ -99,7 +100,7 @@ public class ChannelOpenHandler implemen
catch (IOException e)
{
// This *really* shouldn't happen as we're not doing any I/O
- throw new RuntimeException("I/O exception when writing to byte array", e);
+ throw new ConnectionScopedRuntimeException("I/O exception when writing to byte array", e);
}
// should really associate this channelId to the session
@@ -123,7 +124,7 @@ public class ChannelOpenHandler implemen
catch (IOException e)
{
// This *really* shouldn't happen as we're not doing any I/O
- throw new RuntimeException("I/O exception when writing to byte array", e);
+ throw new ConnectionScopedRuntimeException("I/O exception when writing to byte array", e);
}
// should really associate this channelId to the session
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java Fri Feb 14 19:23:14 2014
@@ -24,7 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQUnknownExchangeType;
+import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ExchangeDeclareBody;
@@ -35,8 +35,10 @@ import org.apache.qpid.server.exchange.E
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
+import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody>
@@ -124,6 +126,15 @@ public class ExchangeDeclareHandler impl
{
throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e);
}
+ catch (QpidSecurityException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ }
+ catch (UnknownExchangeException e)
+ {
+ // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e);
+ }
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java Fri Feb 14 19:23:14 2014
@@ -26,10 +26,10 @@ import org.apache.qpid.framing.ExchangeD
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeInUseException;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
+import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -73,11 +73,6 @@ public class ExchangeDeleteHandler imple
session.writeFrame(responseBody.generateFrame(channelId));
}
- catch (ExchangeInUseException e)
- {
- throw body.getChannelException(AMQConstant.IN_USE, "Exchange in use");
- // TODO: sort out consistent channel close mechanism that does all clean up etc.
- }
catch (ExchangeIsAlternateException e)
{
@@ -88,5 +83,9 @@ public class ExchangeDeleteHandler imple
{
throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange '"+body.getExchange()+"' cannot be deleted");
}
+ catch (QpidSecurityException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ }
}
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java Fri Feb 14 19:23:14 2014
@@ -41,6 +41,7 @@ import org.apache.qpid.server.queue.AMQQ
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
+import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
@@ -147,6 +148,10 @@ public class QueueBindHandler implements
{
throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
}
+ catch (QpidSecurityException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ }
if (_log.isInfoEnabled())
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java Fri Feb 14 19:23:14 2014
@@ -34,19 +34,17 @@ import org.apache.qpid.server.model.UUID
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
-import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.server.virtualhost.plugins.QueueExistsException;
+import org.apache.qpid.server.virtualhost.QueueExistsException;
public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
{
@@ -129,7 +127,7 @@ public class QueueDeclareHandler impleme
final AMQQueue q = queue;
final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
{
- public void doTask(AMQProtocolSession session) throws AMQException
+ public void doTask(AMQProtocolSession session)
{
q.setExclusiveOwningSession(null);
}
@@ -185,6 +183,10 @@ public class QueueDeclareHandler impleme
}
}
+ catch (QpidSecurityException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ }
//set this as the default queue on the channel:
channel.setDefaultQueue(queue);
@@ -213,7 +215,7 @@ public class QueueDeclareHandler impleme
QueueDeclareBody body,
final VirtualHost virtualHost,
final AMQProtocolSession session)
- throws AMQException
+ throws AMQException, QpidSecurityException, QueueExistsException
{
final boolean durable = body.getDurable();
@@ -235,11 +237,18 @@ public class QueueDeclareHandler impleme
final AMQProtocolSession.Task deleteQueueTask =
new AMQProtocolSession.Task()
{
- public void doTask(AMQProtocolSession session) throws AMQException
+ public void doTask(AMQProtocolSession session)
{
if (virtualHost.getQueue(queueName.toString()) == queue)
{
- virtualHost.removeQueue(queue);
+ try
+ {
+ virtualHost.removeQueue(queue);
+ }
+ catch (QpidSecurityException e)
+ {
+ throw new ConnectionScopedRuntimeException("Permission exception: Unable to remove a temporary queue created by a session which has now removed itself", e);
+ }
}
}
};
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java Fri Feb 14 19:23:14 2014
@@ -32,6 +32,7 @@ import org.apache.qpid.server.queue.AMQQ
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
+import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -111,7 +112,15 @@ public class QueueDeleteHandler implemen
"Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
}
- int purged = virtualHost.removeQueue(queue);
+ int purged = 0;
+ try
+ {
+ purged = virtualHost.removeQueue(queue);
+ }
+ catch (QpidSecurityException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ }
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java Fri Feb 14 19:23:14 2014
@@ -33,6 +33,7 @@ import org.apache.qpid.server.queue.AMQQ
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
+import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
@@ -103,10 +104,18 @@ public class QueuePurgeHandler implement
"Queue is exclusive, but not created on this Connection.");
}
- long purged = queue.clearQueue();
+ long purged = 0;
+ try
+ {
+ purged = queue.clearQueue();
+ }
+ catch (QpidSecurityException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ }
- if(!body.getNowait())
+ if(!body.getNowait())
{
channel.sync();
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java Fri Feb 14 19:23:14 2014
@@ -38,6 +38,7 @@ import org.apache.qpid.server.queue.AMQQ
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
+import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindBody>
@@ -105,7 +106,14 @@ public class QueueUnbindHandler implemen
}
else
{
- exch.removeBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(body.getArguments()));
+ try
+ {
+ exch.removeBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(body.getArguments()));
+ }
+ catch (QpidSecurityException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ }
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java Fri Feb 14 19:23:14 2014
@@ -49,21 +49,19 @@ public interface ProtocolOutputConverter
void writeDeliver(final ServerMessage msg,
final InstanceProperties props, int channelId,
long deliveryTag,
- AMQShortString consumerTag)
- throws AMQException;
+ AMQShortString consumerTag);
void writeGetOk(final ServerMessage msg,
final InstanceProperties props,
int channelId,
long deliveryTag,
- int queueSize) throws AMQException;
+ int queueSize);
byte getProtocolMinorVersion();
byte getProtocolMajorVersion();
- void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource msgContent, int channelId, int replyCode, AMQShortString replyText)
- throws AMQException;
+ void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource msgContent, int channelId, int replyCode, AMQShortString replyText);
void writeFrame(AMQDataBlock block);
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java Fri Feb 14 19:23:14 2014
@@ -68,7 +68,6 @@ class ProtocolOutputConverterImpl implem
final InstanceProperties props, int channelId,
long deliveryTag,
AMQShortString consumerTag)
- throws AMQException
{
final AMQMessage msg = convertToAMQMessage(m);
final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED));
@@ -95,13 +94,11 @@ class ProtocolOutputConverterImpl implem
}
private void writeMessageDelivery(AMQMessage message, int channelId, AMQBody deliverBody)
- throws AMQException
{
writeMessageDelivery(message, message.getContentHeaderBody(), channelId, deliverBody);
}
private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
- throws AMQException
{
@@ -193,7 +190,7 @@ class ProtocolOutputConverterImpl implem
final InstanceProperties props,
int channelId,
long deliveryTag,
- int queueSize) throws AMQException
+ int queueSize)
{
AMQBody deliver = createEncodedGetOkBody(msg, props, deliveryTag, queueSize);
writeMessageDelivery(convertToAMQMessage(msg), channelId, deliver);
@@ -204,7 +201,6 @@ class ProtocolOutputConverterImpl implem
boolean isRedelivered,
final long deliveryTag,
final AMQShortString consumerTag)
- throws AMQException
{
final AMQShortString exchangeName;
@@ -282,7 +278,6 @@ class ProtocolOutputConverterImpl implem
}
private AMQBody createEncodedGetOkBody(ServerMessage msg, InstanceProperties props, long deliveryTag, int queueSize)
- throws AMQException
{
final AMQShortString exchangeName;
final AMQShortString routingKey;
@@ -316,7 +311,7 @@ class ProtocolOutputConverterImpl implem
private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
int replyCode,
- AMQShortString replyText) throws AMQException
+ AMQShortString replyText)
{
BasicReturnBody basicReturnBody =
@@ -330,7 +325,6 @@ class ProtocolOutputConverterImpl implem
}
public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
- throws AMQException
{
AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java Fri Feb 14 19:23:14 2014
@@ -52,8 +52,6 @@ public class AMQStateManager implements
/** The current state */
private AMQState _currentState;
- private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>();
-
public AMQStateManager(Broker broker, AMQProtocolSession protocolSession)
{
_broker = broker;
@@ -72,30 +70,17 @@ public class AMQStateManager implements
return _broker;
}
- public AMQState getCurrentState()
- {
- return _currentState;
- }
-
- public void changeState(AMQState newState) throws AMQException
+ public void changeState(AMQState newState)
{
_logger.debug("State changing to " + newState + " from old state " + _currentState);
final AMQState oldState = _currentState;
_currentState = newState;
- for (StateListener l : _stateListeners)
- {
- l.stateChanged(oldState, newState);
- }
}
public void error(Exception e)
{
_logger.error("State manager received error notification[Current State:" + _currentState + "]: " + e, e);
- for (StateListener l : _stateListeners)
- {
- l.error(e);
- }
}
public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
@@ -121,28 +106,6 @@ public class AMQStateManager implements
}
- private <B extends AMQMethodBody> void checkChannel(AMQMethodEvent<B> evt, AMQProtocolSession protocolSession)
- throws AMQException
- {
- if ((evt.getChannelId() != 0) && !(evt.getMethod() instanceof ChannelOpenBody)
- && (protocolSession.getChannel(evt.getChannelId()) == null)
- && !protocolSession.channelAwaitingClosure(evt.getChannelId()))
- {
- throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
- }
- }
-
- public void addStateListener(StateListener listener)
- {
- _logger.debug("Adding state listener");
- _stateListeners.add(listener);
- }
-
- public void removeStateListener(StateListener listener)
- {
- _stateListeners.remove(listener);
- }
-
public VirtualHostRegistry getVirtualHostRegistry()
{
return _broker.getVirtualHostRegistry();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java Fri Feb 14 19:23:14 2014
@@ -61,7 +61,7 @@ public class AMQChannelTest extends Qpid
MessageContentSource msgContent,
int channelId,
int replyCode,
- AMQShortString replyText) throws AMQException
+ AMQShortString replyText)
{
_replies.put(replyCode, replyText.asString());
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java Fri Feb 14 19:23:14 2014
@@ -139,28 +139,20 @@ public class AckTest extends QpidTestCas
final StoredMessage storedMessage = _messageStore.addMessage(mmd);
final AMQMessage message = new AMQMessage(storedMessage);
ServerTransaction txn = new AutoCommitTransaction(_messageStore);
- txn.enqueue(_queue, message, new ServerTransaction.Action() {
- public void postCommit()
- {
- try
- {
-
- _queue.enqueue(message,null);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void onRollback()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
- });
+ txn.enqueue(_queue, message,
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ _queue.enqueue(message,null);
+ }
+
+ public void onRollback()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ });
- // we manually send the message to the subscription
- //_subscription.send(new QueueEntry(_queue,msg), _queue);
}
try
{
@@ -177,7 +169,7 @@ public class AckTest extends QpidTestCas
* Tests that the acknowledgements are correctly associated with a channel and
* order is preserved when acks are enabled
*/
- public void testAckChannelAssociationTest() throws AMQException
+ public void testAckChannelAssociationTest() throws Exception
{
_subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
DEFAULT_CONSUMER_TAG,
@@ -206,7 +198,7 @@ public class AckTest extends QpidTestCas
/**
* Tests that in no-ack mode no messages are retained
*/
- public void testNoAckMode() throws AMQException
+ public void testNoAckMode() throws Exception
{
// false arg means no acks expected
_subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel,
@@ -231,7 +223,7 @@ public class AckTest extends QpidTestCas
/**
* Tests that in no-ack mode no messages are retained
*/
- public void testPersistentNoAckMode() throws AMQException
+ public void testPersistentNoAckMode() throws Exception
{
// false arg means no acks expected
@@ -255,7 +247,7 @@ public class AckTest extends QpidTestCas
* Tests that a single acknowledgement is handled correctly (i.e multiple flag not
* set case)
*/
- public void testSingleAckReceivedTest() throws AMQException
+ public void testSingleAckReceivedTest() throws Exception
{
_subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
@@ -292,7 +284,7 @@ public class AckTest extends QpidTestCas
* Tests that a single acknowledgement is handled correctly (i.e multiple flag not
* set case)
*/
- public void testMultiAckReceivedTest() throws AMQException
+ public void testMultiAckReceivedTest() throws Exception
{
_subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
@@ -326,7 +318,7 @@ public class AckTest extends QpidTestCas
/**
* Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs.
*/
- public void testMultiAckAllReceivedTest() throws AMQException
+ public void testMultiAckAllReceivedTest() throws Exception
{
_subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java Fri Feb 14 19:23:14 2014
@@ -84,40 +84,40 @@ public class AcknowledgeTest extends Qpi
return _queue;
}
- public void testTransactionalSingleAck() throws AMQException
+ public void testTransactionalSingleAck() throws Exception
{
getChannel().setLocalTransactional();
runMessageAck(1, 1, 1, false, 0);
}
- public void testTransactionalMultiAck() throws AMQException
+ public void testTransactionalMultiAck() throws Exception
{
getChannel().setLocalTransactional();
runMessageAck(10, 1, 5, true, 5);
}
- public void testTransactionalAckAll() throws AMQException
+ public void testTransactionalAckAll() throws Exception
{
getChannel().setLocalTransactional();
runMessageAck(10, 1, 0, true, 0);
}
- public void testNonTransactionalSingleAck() throws AMQException
+ public void testNonTransactionalSingleAck() throws Exception
{
runMessageAck(1, 1, 1, false, 0);
}
- public void testNonTransactionalMultiAck() throws AMQException
+ public void testNonTransactionalMultiAck() throws Exception
{
runMessageAck(10, 1, 5, true, 5);
}
- public void testNonTransactionalAckAll() throws AMQException
+ public void testNonTransactionalAckAll() throws Exception
{
runMessageAck(10, 1, 0, true, 0);
}
- protected void runMessageAck(int sendMessageCount, long firstDeliveryTag, long acknowledgeDeliveryTag, boolean acknowledgeMultiple, int remainingUnackedMessages) throws AMQException
+ protected void runMessageAck(int sendMessageCount, long firstDeliveryTag, long acknowledgeDeliveryTag, boolean acknowledgeMultiple, int remainingUnackedMessages) throws Exception
{
//Check store is empty
checkStoreContents(0);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java Fri Feb 14 19:23:14 2014
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.BasicCont
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -64,7 +65,8 @@ public class BrokerTestHelper_0_8 extend
return new InternalTestProtocolSession(virtualHost, createBrokerMock());
}
- public static void publishMessages(AMQChannel channel, int numberOfMessages, String queueName, String exchangeName) throws AMQException
+ public static void publishMessages(AMQChannel channel, int numberOfMessages, String queueName, String exchangeName)
+ throws AMQException, QpidSecurityException
{
AMQShortString routingKey = new AMQShortString(queueName);
AMQShortString exchangeNameAsShortString = new AMQShortString(exchangeName);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Fri Feb 14 19:23:14 2014
@@ -99,7 +99,7 @@ public class InternalTestProtocolSession
MessageContentSource msgContent,
int channelId,
int replyCode,
- AMQShortString replyText) throws AMQException
+ AMQShortString replyText)
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -145,7 +145,7 @@ public class InternalTestProtocolSession
public void writeDeliver(final ServerMessage msg,
final InstanceProperties props, int channelId,
long deliveryTag,
- AMQShortString consumerTag) throws AMQException
+ AMQShortString consumerTag)
{
_deliveryCount.incrementAndGet();
@@ -175,7 +175,7 @@ public class InternalTestProtocolSession
final InstanceProperties props,
int channelId,
long deliveryTag,
- int queueSize) throws AMQException
+ int queueSize)
{
}
@@ -223,7 +223,7 @@ public class InternalTestProtocolSession
// Then the AMQMinaProtocolSession can join on the returning future without a NPE.
}
- public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
{
super.closeSession(session, cause, message);
@@ -246,7 +246,7 @@ public class InternalTestProtocolSession
@Override
public void deliverToClient(Consumer sub, ServerMessage message,
- InstanceProperties props, long deliveryTag) throws AMQException
+ InstanceProperties props, long deliveryTag)
{
_deliveryCount.incrementAndGet();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java Fri Feb 14 19:23:14 2014
@@ -85,7 +85,7 @@ public class QueueBrowserUsesNoAckTest e
return _queue;
}
- public void testQueueBrowserUsesNoAck() throws AMQException
+ public void testQueueBrowserUsesNoAck() throws Exception
{
int sendMessageCount = 2;
int prefetch = 1;
@@ -136,7 +136,7 @@ public class QueueBrowserUsesNoAckTest e
assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount());
}
- private AMQShortString browse(AMQChannel channel, AMQQueue queue) throws AMQException
+ private AMQShortString browse(AMQChannel channel, AMQQueue queue) throws Exception
{
FieldTable filters = new FieldTable();
filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Fri Feb 14 19:23:14 2014
@@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.
import java.text.MessageFormat;
import java.util.Collection;
-import org.apache.qpid.AMQException;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener;
import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
@@ -152,7 +151,7 @@ public class Connection_1_0 implements C
private volatile boolean _stopped;
@Override
- public void close(AMQConstant cause, String message) throws AMQException
+ public void close(AMQConstant cause, String message)
{
_conn.close();
}
@@ -170,7 +169,7 @@ public class Connection_1_0 implements C
}
@Override
- public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
{
// TODO
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Fri Feb 14 19:23:14 2014
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import org.apache.qpid.AMQException;
import org.apache.qpid.amqp_1_0.codec.ValueHandler;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
@@ -46,6 +45,7 @@ import org.apache.qpid.server.protocol.M
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import java.nio.ByteBuffer;
import java.util.List;
@@ -112,7 +112,7 @@ class ConsumerTarget_1_0 extends Abstrac
}
}
- public void send(MessageInstance entry, boolean batch) throws AMQException
+ public void send(MessageInstance entry, boolean batch)
{
// TODO
send(entry);
@@ -123,7 +123,7 @@ class ConsumerTarget_1_0 extends Abstrac
// TODO
}
- public void send(final MessageInstance queueEntry) throws AMQException
+ public void send(final MessageInstance queueEntry)
{
ServerMessage serverMessage = queueEntry.getMessage();
Message_1_0 message;
@@ -187,7 +187,7 @@ class ConsumerTarget_1_0 extends Abstrac
catch (AmqpErrorException e)
{
//TODO
- throw new RuntimeException(e);
+ throw new ConnectionScopedRuntimeException(e);
}
Header header = new Header();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Fri Feb 14 19:23:14 2014
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import java.util.List;
-import org.apache.qpid.AMQException;
import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
import org.apache.qpid.amqp_1_0.type.messaging.Rejected;
@@ -29,8 +27,6 @@ import org.apache.qpid.amqp_1_0.type.mes
import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.txn.ServerTransaction;
public class ExchangeDestination implements ReceivingDestination, SendingDestination
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java Fri Feb 14 19:23:14 2014
@@ -31,6 +31,7 @@ import org.apache.qpid.amqp_1_0.type.mes
import org.apache.qpid.amqp_1_0.type.messaging.Header;
import org.apache.qpid.amqp_1_0.type.messaging.Properties;
import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -132,7 +133,7 @@ public class MessageConverter_Internal_t
}
catch (IOException e)
{
- throw new RuntimeException(e);
+ throw new ConnectionScopedRuntimeException(e);
}
}
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Fri Feb 14 19:23:14 2014
@@ -39,6 +39,7 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.typedmessage.TypedBytesContentReader;
@@ -140,11 +141,11 @@ public abstract class MessageConverter_t
}
catch (TypedBytesFormatException e)
{
- throw new RuntimeException(e); // TODO - Implement
+ throw new ConnectionScopedRuntimeException(e);
}
catch (EOFException e)
{
- throw new RuntimeException(e); // TODO - Implement
+ throw new ConnectionScopedRuntimeException(e);
}
}
return new AmqpValue(list);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java Fri Feb 14 19:23:14 2014
@@ -30,6 +30,7 @@ import org.apache.qpid.amqp_1_0.type.mes
import org.apache.qpid.amqp_1_0.type.messaging.Data;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.typedmessage.TypedBytesContentReader;
@@ -96,7 +97,7 @@ public class MessageConverter_v1_0_to_In
{
if(previousSection != null && (previousSection.getClass() != section.getClass() || section instanceof AmqpValue))
{
- throw new RuntimeException("Message is badly formed and has multiple body section which are not all Data or not all AmqpSequence");
+ throw new ConnectionScopedRuntimeException("Message is badly formed and has multiple body section which are not all Data or not all AmqpSequence");
}
else
{
@@ -149,7 +150,7 @@ public class MessageConverter_v1_0_to_In
}
catch (AmqpErrorException e)
{
- throw new RuntimeException(e);
+ throw new ConnectionScopedRuntimeException(e);
}
@@ -257,11 +258,11 @@ public class MessageConverter_v1_0_to_In
}
catch (TypedBytesFormatException e)
{
- throw new RuntimeException(e); // TODO - Implement
+ throw new ConnectionScopedRuntimeException(e);
}
catch (EOFException e)
{
- throw new RuntimeException(e); // TODO - Implement
+ throw new ConnectionScopedRuntimeException(e);
}
}
return list;
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Fri Feb 14 19:23:14 2014
@@ -43,6 +43,7 @@ import org.apache.qpid.amqp_1_0.type.mes
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
public class MessageMetaData_1_0 implements StorableMessageMetaData
{
@@ -394,7 +395,7 @@ public class MessageMetaData_1_0 impleme
catch (AmqpErrorException e)
{
//TODO
- throw new RuntimeException(e);
+ throw new ConnectionScopedRuntimeException(e);
}
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java Fri Feb 14 19:23:14 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.io.IOException;
import java.io.PrintWriter;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -42,21 +43,27 @@ import org.apache.qpid.amqp_1_0.transpor
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.FrameBody;
import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOutputHandler
{
+ private static final org.apache.log4j.Logger
+ _logger = org.apache.log4j.Logger.getLogger(ProtocolEngine_1_0_0_SASL.class);
+
private final Port _port;
private final Transport _transport;
private long _readBytes;
@@ -250,123 +257,165 @@ public class ProtocolEngine_1_0_0_SASL i
public synchronized void received(ByteBuffer msg)
{
- _lastReadTime = System.currentTimeMillis();
- if(RAW_LOGGER.isLoggable(Level.FINE))
+ try
{
- ByteBuffer dup = msg.duplicate();
- byte[] data = new byte[dup.remaining()];
- dup.get(data);
- Binary bin = new Binary(data);
- RAW_LOGGER.fine("RECV[" + getRemoteAddress() + "] : " + bin.toString());
- }
- _readBytes += msg.remaining();
- switch(_state)
- {
- case A:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- break;
- }
- case M:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.M;
- break;
- }
-
- case Q:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.Q;
- break;
- }
- case P:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.P;
- break;
- }
- case PROTOCOL:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.PROTOCOL;
- break;
- }
- case MAJOR:
- if(msg.hasRemaining())
- {
- _major = msg.get();
- }
- else
- {
- _state = State.MAJOR;
- break;
- }
- case MINOR:
- if(msg.hasRemaining())
- {
- _minor = msg.get();
- }
- else
- {
- _state = State.MINOR;
- break;
- }
- case REVISION:
- if(msg.hasRemaining())
- {
- _revision = msg.get();
-
- _state = State.FRAME;
- }
- else
- {
- _state = State.REVISION;
- break;
- }
- case FRAME:
- if(msg.hasRemaining())
- {
+ _lastReadTime = System.currentTimeMillis();
+ if(RAW_LOGGER.isLoggable(Level.FINE))
+ {
+ ByteBuffer dup = msg.duplicate();
+ byte[] data = new byte[dup.remaining()];
+ dup.get(data);
+ Binary bin = new Binary(data);
+ RAW_LOGGER.fine("RECV[" + getRemoteAddress() + "] : " + bin.toString());
+ }
+ _readBytes += msg.remaining();
+ switch(_state)
+ {
+ case A:
+ if (msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ break;
+ }
+ case M:
+ if (msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.M;
+ break;
+ }
+
+ case Q:
+ if (msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.Q;
+ break;
+ }
+ case P:
+ if (msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.P;
+ break;
+ }
+ case PROTOCOL:
+ if (msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.PROTOCOL;
+ break;
+ }
+ case MAJOR:
+ if (msg.hasRemaining())
+ {
+ _major = msg.get();
+ }
+ else
+ {
+ _state = State.MAJOR;
+ break;
+ }
+ case MINOR:
+ if (msg.hasRemaining())
+ {
+ _minor = msg.get();
+ }
+ else
+ {
+ _state = State.MINOR;
+ break;
+ }
+ case REVISION:
+ if (msg.hasRemaining())
+ {
+ _revision = msg.get();
+
+ _state = State.FRAME;
+ }
+ else
+ {
+ _state = State.REVISION;
+ break;
+ }
+ case FRAME:
+ if (msg.hasRemaining())
+ {
_frameHandler = _frameHandler.parse(msg);
- }
- }
-
+ }
+ }
+ }
+ catch(RuntimeException e)
+ {
+ exception(e);
+ }
}
- public void exception(Throwable t)
+ public void exception(Throwable throwable)
{
- t.printStackTrace();
+ if (throwable instanceof IOException)
+ {
+ _logger.info("IOException caught in " + this + ", connection closed implicitly: " + throwable);
+ }
+ else
+ {
+
+ try
+ {
+ final Error err = new Error();
+ err.setCondition(AmqpError.INTERNAL_ERROR);
+ err.setDescription(throwable.getMessage());
+ _conn.close(err);
+ close();
+ }
+ catch(TransportException e)
+ {
+ _logger.info("Error when handling exception",e);
+ }
+ finally
+ {
+ if(throwable instanceof java.lang.Error)
+ {
+ throw (java.lang.Error) throwable;
+ }
+ if(throwable instanceof ServerScopedRuntimeException)
+ {
+ throw (ServerScopedRuntimeException) throwable;
+ }
+ }
+ }
}
public void closed()
{
- // todo
- _conn.inputClosed();
- if (_conn != null && _conn.getConnectionEventListener() != null)
+ try
{
- ((Connection_1_0) _conn.getConnectionEventListener()).closed();
+ // todo
+ _conn.inputClosed();
+ if (_conn != null && _conn.getConnectionEventListener() != null)
+ {
+ ((Connection_1_0) _conn.getConnectionEventListener()).closed();
+ }
+ }
+ catch(RuntimeException e)
+ {
+ exception(e);
}
-
}
public long getCreateTime()
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java?rev=1568495&r1=1568494&r2=1568495&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java Fri Feb 14 19:23:14 2014
@@ -49,37 +49,22 @@ public class QueueDestination extends Me
public Outcome send(final Message_1_0 message, ServerTransaction txn)
{
- try
+ txn.enqueue(getQueue(),message, new ServerTransaction.Action()
{
- txn.enqueue(getQueue(),message, new ServerTransaction.Action()
+
+
+ public void postCommit()
{
+ getQueue().enqueue(message,null);
+ }
+
+ public void onRollback()
+ {
+ // NO-OP
+ }
+ });
- public void postCommit()
- {
- try
- {
- getQueue().enqueue(message,null);
- }
- catch (Exception e)
- {
- // TODO
- throw new RuntimeException(e);
- }
-
- }
-
- public void onRollback()
- {
- // NO-OP
- }
- });
- }
- catch(Exception e)
- {
- _logger.error("Send error", e);
- throw new RuntimeException(e);
- }
return ACCEPTED;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org