You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/02/28 17:14:57 UTC
svn commit: r1451244 [35/45] - in /qpid/branches/asyncstore: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf2/rub...
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Feb 28 16:14:30 2013
@@ -20,11 +20,6 @@
*/
package org.apache.qpid.client;
-import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE;
-import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
-import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE;
-import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +35,7 @@ import org.apache.qpid.client.failover.F
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+import org.apache.qpid.client.message.AMQPEncodedListMessage;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.CloseConsumerMessage;
import org.apache.qpid.client.message.JMSBytesMessage;
@@ -49,14 +45,13 @@ import org.apache.qpid.client.message.JM
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.jms.ListMessage;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.SessionException;
@@ -122,27 +117,16 @@ public abstract class AMQSession<C exten
/** Immediate message prefetch default. */
public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
- /**
- * The period to wait while flow controlled before sending a log message confirming that the session is still
- * waiting on flow control being revoked
- */
- private final long _flowControlWaitPeriod = Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD,
- DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD);
-
- /**
- * The period to wait while flow controlled before declaring a failure
- */
- private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE,
- DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
-
private final boolean _delareQueues =
- Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
+ Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "true"));
private final boolean _declareExchanges =
- Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
+ Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_EXCHANGES_PROP_NAME, "true"));
private final boolean _useAMQPEncodedMapMessage;
+ private final boolean _useAMQPEncodedStreamMessage;
+
/**
* Flag indicating to start dispatcher as a daemon thread
*/
@@ -265,11 +249,6 @@ public abstract class AMQSession<C exten
/** Has failover occured on this session with outstanding actions to commit? */
private boolean _failedOverDirty;
- /** Flow control */
- private FlowControlIndicator _flowControl = new FlowControlIndicator();
-
-
-
/** Holds the highest received delivery tag. */
protected AtomicLong getHighestDeliveryTag()
{
@@ -408,22 +387,6 @@ public abstract class AMQSession<C exten
}
}
- private static final class FlowControlIndicator
- {
- private volatile boolean _flowControl = true;
-
- public synchronized void setFlowControl(boolean flowControl)
- {
- _flowControl = flowControl;
- notify();
- }
-
- public boolean getFlowControl()
- {
- return _flowControl;
- }
- }
-
/**
* Creates a new session on a connection.
*
@@ -439,6 +402,7 @@ public abstract class AMQSession<C exten
MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
_useAMQPEncodedMapMessage = con == null ? true : !con.isUseLegacyMapMessageFormat();
+ _useAMQPEncodedStreamMessage = con == null ? false : !con.isUseLegacyStreamMessageFormat();
_strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
_strictAMQPFATAL =
Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
@@ -649,12 +613,6 @@ public abstract class AMQSession<C exten
*/
public abstract void acknowledgeMessage(long deliveryTag, boolean multiple);
- public MethodRegistry getMethodRegistry()
- {
- MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
- return methodRegistry;
- }
-
/**
* Binds the named queue, with the specified routing key, to the named exchange.
*
@@ -1041,12 +999,11 @@ public abstract class AMQSession<C exten
{
try
{
- handleAddressBasedDestination(dest,false,noLocal,true);
+ resolveAddress(dest,false,noLocal);
if (dest.getAddressType() != AMQDestination.TOPIC_TYPE)
{
throw new JMSException("Durable subscribers can only be created for Topics");
}
- dest.getSourceNode().setDurable(true);
}
catch(AMQException e)
{
@@ -1158,6 +1115,14 @@ public abstract class AMQSession<C exten
}
}
+ public ListMessage createListMessage() throws JMSException
+ {
+ checkNotClosed();
+ AMQPEncodedListMessage msg = new AMQPEncodedListMessage(getMessageDelegateFactory());
+ msg.setAMQSession(this);
+ return msg;
+ }
+
public MapMessage createMapMessage() throws JMSException
{
checkNotClosed();
@@ -1400,17 +1365,15 @@ public abstract class AMQSession<C exten
public StreamMessage createStreamMessage() throws JMSException
{
- // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived
- // calls through connection.closeAllSessions which is also called by the public connection.close()
- // with a null cause
- // When we are closing the Session due to a protocol session error we simply create a new AMQException
- // with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
- // We need to determin here if the connection should be
-
- synchronized (getFailoverMutex())
+ checkNotClosed();
+ if (_useAMQPEncodedStreamMessage)
+ {
+ AMQPEncodedListMessage msg = new AMQPEncodedListMessage(getMessageDelegateFactory());
+ msg.setAMQSession(this);
+ return msg;
+ }
+ else
{
- checkNotClosed();
-
JMSStreamMessage msg = new JMSStreamMessage(getMessageDelegateFactory());
msg.setAMQSession(this);
return msg;
@@ -1550,7 +1513,7 @@ public abstract class AMQSession<C exten
public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException
{
- declareExchange(name, type, getProtocolHandler(), nowait);
+ declareExchange(name, type, nowait, false, false, false);
}
abstract public void sync() throws AMQException;
@@ -1690,8 +1653,7 @@ public abstract class AMQSession<C exten
throws
AMQException
{
- AMQProtocolHandler protocolHandler = getProtocolHandler();
- declareExchange(amqd, protocolHandler, false);
+ declareExchange(amqd, false);
AMQShortString queueName = declareQueue(amqd, false);
bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd);
}
@@ -2582,11 +2544,9 @@ public abstract class AMQSession<C exten
/**
* Register to consume from the queue.
- *
* @param queueName
*/
- private void consumeFromQueue(C consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException, FailoverException
+ private void consumeFromQueue(C consumer, AMQShortString queueName, boolean nowait) throws AMQException, FailoverException
{
int tagId = _nextTag++;
@@ -2603,7 +2563,7 @@ public abstract class AMQSession<C exten
try
{
- sendConsume(consumer, queueName, protocolHandler, nowait, tagId);
+ sendConsume(consumer, queueName, nowait, tagId);
}
catch (AMQException e)
{
@@ -2614,7 +2574,7 @@ public abstract class AMQSession<C exten
}
public abstract void sendConsume(C consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException;
+ boolean nowait, int tag) throws AMQException, FailoverException;
private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate)
throws JMSException
@@ -2648,9 +2608,10 @@ public abstract class AMQSession<C exten
public abstract P createMessageProducer(final Destination destination, final Boolean mandatory,
final Boolean immediate, final long producerId) throws JMSException;
- private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
+ private void declareExchange(AMQDestination amqd, boolean nowait) throws AMQException
{
- declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
+ declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), nowait, amqd.isExchangeDurable(),
+ amqd.isExchangeAutoDelete(), amqd.isExchangeInternal());
}
/**
@@ -2707,33 +2668,29 @@ public abstract class AMQSession<C exten
*
* @param name The name of the exchange to declare.
* @param type The type of the exchange to declare.
- * @param protocolHandler The protocol handler to process the communication through.
* @param nowait
- *
+ * @param durable
+ * @param autoDelete
+ * @param internal
* @throws AMQException If the exchange cannot be declared for any reason.
* @todo Be aware of possible changes to parameter order as versions change.
*/
private void declareExchange(final AMQShortString name, final AMQShortString type,
- final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException
+ final boolean nowait, final boolean durable,
+ final boolean autoDelete, final boolean internal) throws AMQException
{
new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
{
public Object execute() throws AMQException, FailoverException
{
- sendExchangeDeclare(name, type, protocolHandler, nowait);
+ sendExchangeDeclare(name, type, nowait, durable, autoDelete, internal);
return null;
}
}, _connection).execute();
}
- public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
- final boolean nowait) throws AMQException, FailoverException;
-
-
- void declareQueuePassive(AMQDestination queue) throws AMQException
- {
- declareQueue(queue,false,false,true);
- }
+ public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
+ boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException;
/**
* Declares a queue for a JMS destination.
@@ -2768,31 +2725,8 @@ public abstract class AMQSession<C exten
return declareQueue(amqd, noLocal, nowait, false);
}
- protected AMQShortString declareQueue(final AMQDestination amqd,
- final boolean noLocal, final boolean nowait, final boolean passive)
- throws AMQException
- {
- final AMQProtocolHandler protocolHandler = getProtocolHandler();
- return new FailoverNoopSupport<AMQShortString, AMQException>(
- new FailoverProtectedOperation<AMQShortString, AMQException>()
- {
- public AMQShortString execute() throws AMQException, FailoverException
- {
- // Generate the queue name if the destination indicates that a client generated name is to be used.
- if (amqd.isNameRequired())
- {
- amqd.setQueueName(protocolHandler.generateQueueName());
- }
-
- sendQueueDeclare(amqd, protocolHandler, nowait, passive);
-
- return amqd.getAMQQueueName();
- }
- }, _connection).execute();
- }
-
- public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean nowait, boolean passive) throws AMQException, FailoverException;
+ protected abstract AMQShortString declareQueue(final AMQDestination amqd,
+ final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException;
/**
* Undeclares the specified queue.
@@ -2845,21 +2779,6 @@ public abstract class AMQSession<C exten
return ++_nextProducerId;
}
- protected AMQProtocolHandler getProtocolHandler()
- {
- return _connection.getProtocolHandler();
- }
-
- public byte getProtocolMajorVersion()
- {
- return getProtocolHandler().getProtocolMajorVersion();
- }
-
- public byte getProtocolMinorVersion()
- {
- return getProtocolHandler().getProtocolMinorVersion();
- }
-
protected boolean hasMessageListeners()
{
return _hasMessageListeners;
@@ -2918,17 +2837,15 @@ public abstract class AMQSession<C exten
{
AMQDestination amqd = consumer.getDestination();
- AMQProtocolHandler protocolHandler = getProtocolHandler();
-
if (amqd.getDestSyntax() == DestSyntax.ADDR)
{
- handleAddressBasedDestination(amqd,true,consumer.isNoLocal(),nowait);
+ resolveAddress(amqd,true,consumer.isNoLocal());
}
else
{
if (_declareExchanges)
{
- declareExchange(amqd, protocolHandler, nowait);
+ declareExchange(amqd, nowait);
}
if (_delareQueues || amqd.isNameRequired())
@@ -2973,7 +2890,7 @@ public abstract class AMQSession<C exten
try
{
- consumeFromQueue(consumer, queueName, protocolHandler, nowait);
+ consumeFromQueue(consumer, queueName, nowait);
}
catch (FailoverException e)
{
@@ -2981,10 +2898,9 @@ public abstract class AMQSession<C exten
}
}
- public abstract void handleAddressBasedDestination(AMQDestination dest,
+ public abstract void resolveAddress(AMQDestination dest,
boolean isConsumer,
- boolean noLocal,
- boolean noWait) throws AMQException;
+ boolean noLocal) throws AMQException;
private void registerProducer(long producerId, MessageProducer producer)
{
@@ -3141,47 +3057,14 @@ public abstract class AMQSession<C exten
_ticket = ticket;
}
- public boolean isFlowBlocked()
- {
- synchronized (_flowControl)
- {
- return !_flowControl.getFlowControl();
- }
- }
-
- public void setFlowControl(final boolean active)
- {
- _flowControl.setFlowControl(active);
- if (_logger.isInfoEnabled())
- {
- _logger.info("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
- }
- }
-
- public void checkFlowControl() throws InterruptedException, JMSException
- {
- long expiryTime = 0L;
- synchronized (_flowControl)
- {
- while (!_flowControl.getFlowControl() &&
- (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + _flowControlWaitFailure)
- : expiryTime) >= System.currentTimeMillis() )
- {
-
- _flowControl.wait(_flowControlWaitPeriod);
- if (_logger.isInfoEnabled())
- {
- _logger.info("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control");
- }
- }
- if(!_flowControl.getFlowControl())
- {
- _logger.error("Message send failed due to timeout waiting on broker enforced flow control");
- throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control");
- }
- }
+ /**
+ * Tests whether flow to this session is blocked.
+ *
+ * @return true if flow is blocked or false otherwise.
+ */
+ public abstract boolean isFlowBlocked();
- }
+ public abstract void setFlowControl(final boolean active);
public interface Dispatchable
{
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Feb 28 16:14:30 2013
@@ -17,6 +17,11 @@
*/
package org.apache.qpid.client;
+import static org.apache.qpid.transport.Option.BATCH;
+import static org.apache.qpid.transport.Option.NONE;
+import static org.apache.qpid.transport.Option.SYNC;
+import static org.apache.qpid.transport.Option.UNRELIABLE;
+
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
@@ -29,8 +34,10 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
+
import javax.jms.Destination;
import javax.jms.JMSException;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.Binding;
@@ -44,18 +51,32 @@ import org.apache.qpid.client.message.Me
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.client.messaging.address.AddressHelper;
import org.apache.qpid.client.messaging.address.Link;
-import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue;
+import org.apache.qpid.client.messaging.address.Node;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.*;
-import static org.apache.qpid.transport.Option.BATCH;
-import static org.apache.qpid.transport.Option.NONE;
-import static org.apache.qpid.transport.Option.SYNC;
-import static org.apache.qpid.transport.Option.UNRELIABLE;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ExchangeBoundResult;
+import org.apache.qpid.transport.ExchangeQueryResult;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.transport.ExecutionException;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.QueueQueryResult;
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.RangeSetFactory;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.Serial;
import org.apache.qpid.util.Strings;
import org.slf4j.Logger;
@@ -347,15 +368,22 @@ public class AMQSession_0_10 extends AMQ
}
else
{
+ // Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected.
List<Binding> bindings = new ArrayList<Binding>();
- bindings.addAll(destination.getSourceNode().getBindings());
- bindings.addAll(destination.getTargetNode().getBindings());
+ bindings.addAll(destination.getNode().getBindings());
String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ?
destination.getAddressName(): "amq.topic";
for (Binding binding: bindings)
{
+ // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link.
+ // The null check below is a way to side step that issue while fixing QPID-4146
+ // Note this issue only affects producers.
+ if (binding.getQueue() == null && queueName == null)
+ {
+ continue;
+ }
String queue = binding.getQueue() == null?
queueName.asString(): binding.getQueue();
@@ -523,11 +551,9 @@ public class AMQSession_0_10 extends AMQ
final FieldTable rawSelector, final boolean noConsume,
final boolean autoClose) throws JMSException
{
-
- final AMQProtocolHandler protocolHandler = getProtocolHandler();
return new BasicMessageConsumer_0_10(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal,
- getMessageFactoryRegistry(), this, protocolHandler, rawSelector, prefetchHigh,
- prefetchLow, exclusive, getAcknowledgeMode(), noConsume, autoClose);
+ getMessageFactoryRegistry(), this, rawSelector, prefetchHigh, prefetchLow,
+ exclusive, getAcknowledgeMode(), noConsume, autoClose);
}
/**
@@ -558,7 +584,7 @@ public class AMQSession_0_10 extends AMQ
rk = routingKey.toString();
}
- return isQueueBound(exchangeName.toString(),queueName.toString(),rk,null);
+ return isQueueBound(exchangeName == null ? null : exchangeName.toString(),queueName == null ? null : queueName.toString(),rk,null);
}
public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args)
@@ -591,10 +617,22 @@ public class AMQSession_0_10 extends AMQ
* This method is invoked when a consumer is created
* Registers the consumer with the broker
*/
- public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
+ public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName,
boolean nowait, int tag)
throws AMQException, FailoverException
- {
+ {
+ if (AMQDestination.DestSyntax.ADDR == consumer.getDestination().getDestSyntax())
+ {
+ if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType())
+ {
+ String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector();
+
+ createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector);
+ queueName = consumer.getDestination().getAMQQueueName();
+ consumer.setQueuename(queueName);
+ }
+ handleLinkCreation(consumer.getDestination());
+ }
boolean preAcquire = consumer.isPreAcquire();
AMQDestination destination = consumer.getDestination();
@@ -637,11 +675,7 @@ public class AMQSession_0_10 extends AMQ
capacity,
Option.UNRELIABLE);
}
-
- if (!nowait)
- {
- sync();
- }
+ sync();
}
/**
@@ -653,7 +687,7 @@ public class AMQSession_0_10 extends AMQ
try
{
return new BasicMessageProducer_0_10(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(), this,
- getProtocolHandler(), producerId, immediate, mandatory);
+ producerId, immediate, mandatory);
}
catch (AMQException e)
{
@@ -673,26 +707,25 @@ public class AMQSession_0_10 extends AMQ
/**
* creates an exchange if it does not already exist
*/
- public void sendExchangeDeclare(final AMQShortString name,
- final AMQShortString type,
- final AMQProtocolHandler protocolHandler, final boolean nowait)
- throws AMQException, FailoverException
+ public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
+ boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException
{
- sendExchangeDeclare(name.asString(), type.asString(), null, null,
- nowait);
+ //The 'internal' parameter is ignored on the 0-10 path, the protocol does not support it
+ sendExchangeDeclare(name.asString(), type.asString(), null, null, nowait, durable, autoDelete);
}
public void sendExchangeDeclare(final String name, final String type,
final String alternateExchange, final Map<String, Object> args,
- final boolean nowait) throws AMQException
+ final boolean nowait, boolean durable, boolean autoDelete) throws AMQException
{
getQpidSession().exchangeDeclare(
name,
type,
alternateExchange,
args,
- name.toString().startsWith("amq.") ? Option.PASSIVE
- : Option.NONE);
+ name.toString().startsWith("amq.") ? Option.PASSIVE : Option.NONE,
+ durable ? Option.DURABLE : Option.NONE,
+ autoDelete ? Option.AUTO_DELETE : Option.NONE);
// We need to sync so that we get notify of an error.
if (!nowait)
{
@@ -717,18 +750,8 @@ public class AMQSession_0_10 extends AMQ
/**
* Declare a queue with the given queueName
*/
- public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean nowait, boolean passive)
- throws AMQException, FailoverException
- {
- // do nothing this is only used by 0_8
- }
-
- /**
- * Declare a queue with the given queueName
- */
- public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal, final boolean nowait, boolean passive)
+ public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final boolean noLocal,
+ final boolean nowait, boolean passive)
throws AMQException
{
AMQShortString queueName;
@@ -759,7 +782,8 @@ public class AMQSession_0_10 extends AMQ
}
else
{
- QueueNode node = (QueueNode)amqd.getSourceNode();
+ // This code is here to ensure address based destination work with the declareQueue public method in AMQSession.java
+ Node node = amqd.getNode();
Map<String,Object> arguments = new HashMap<String,Object>();
arguments.putAll((Map<? extends String, ? extends Object>) node.getDeclareArgs());
if (arguments == null || arguments.get(AddressHelper.NO_LOCAL) == null)
@@ -925,12 +949,11 @@ public class AMQSession_0_10 extends AMQ
return getCurrentException();
}
+ @Override
protected AMQShortString declareQueue(final AMQDestination amqd,
final boolean noLocal, final boolean nowait, final boolean passive)
throws AMQException
{
- final AMQProtocolHandler protocolHandler = getProtocolHandler();
-
return new FailoverNoopSupport<AMQShortString, AMQException>(
new FailoverProtectedOperation<AMQShortString, AMQException>()
{
@@ -947,7 +970,7 @@ public class AMQSession_0_10 extends AMQ
amqd.setQueueName(new AMQShortString( binddingKey + "@"
+ amqd.getExchangeName().toString() + "_" + UUID.randomUUID()));
}
- return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait, passive);
+ return send0_10QueueDeclare(amqd, noLocal, nowait, passive);
}
}, getAMQConnection()).execute();
}
@@ -1072,11 +1095,12 @@ public class AMQSession_0_10 extends AMQ
return AMQMessageDelegateFactory.FACTORY_0_10;
}
- public boolean isExchangeExist(AMQDestination dest,ExchangeNode node,boolean assertNode)
+ public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException
{
boolean match = true;
ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get();
match = !result.getNotFound();
+ Node node = dest.getNode();
if (match)
{
@@ -1086,16 +1110,6 @@ public class AMQSession_0_10 extends AMQ
(node.getExchangeType() != null &&
node.getExchangeType().equals(result.getType())) &&
(matchProps(result.getArguments(),node.getDeclareArgs()));
- }
- else if (node.getExchangeType() != null)
- {
- // even if assert is false, better to verify this
- match = node.getExchangeType().equals(result.getType());
- if (!match)
- {
- _logger.debug("Exchange type doesn't match. Expected : " + node.getExchangeType() +
- " actual " + result.getType());
- }
}
else
{
@@ -1104,18 +1118,27 @@ public class AMQSession_0_10 extends AMQ
dest.setExchangeClass(new AMQShortString(result.getType()));
}
}
-
+
+ if (assertNode)
+ {
+ if (!match)
+ {
+ throw new AMQException("Assert failed for address : " + dest +", Result was : " + result);
+ }
+ }
+
return match;
}
- public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException
+ public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException
{
boolean match = true;
try
{
QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
match = dest.getAddressName().equals(result.getQueue());
-
+ Node node = dest.getNode();
+
if (match && assertNode)
{
match = (result.getDurable() == node.isDurable()) &&
@@ -1123,9 +1146,13 @@ public class AMQSession_0_10 extends AMQ
(result.getExclusive() == node.isExclusive()) &&
(matchProps(result.getArguments(),node.getDeclareArgs()));
}
- else if (match)
+
+ if (assertNode)
{
- // should I use the queried details to update the local data structure.
+ if (!match)
+ {
+ throw new AMQException("Assert failed for address : " + dest +", Result was : " + result);
+ }
}
}
catch(SessionException e)
@@ -1140,7 +1167,6 @@ public class AMQSession_0_10 extends AMQ
"Error querying queue",e);
}
}
-
return match;
}
@@ -1179,17 +1205,13 @@ public class AMQSession_0_10 extends AMQ
*/
@SuppressWarnings("deprecation")
- public void handleAddressBasedDestination(AMQDestination dest,
+ public void resolveAddress(AMQDestination dest,
boolean isConsumer,
- boolean noLocal,
- boolean noWait) throws AMQException
+ boolean noLocal) throws AMQException
{
if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime()))
{
- if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType())
- {
- createSubscriptionQueue(dest,noLocal);
- }
+ return;
}
else
{
@@ -1209,46 +1231,32 @@ public class AMQSession_0_10 extends AMQ
{
case AMQDestination.QUEUE_TYPE:
{
- if (isQueueExist(dest,(QueueNode)dest.getSourceNode(),assertNode))
+ if(createNode)
{
- setLegacyFiledsForQueueType(dest);
+ setLegacyFieldsForQueueType(dest);
+ handleQueueNodeCreation(dest,noLocal);
break;
}
- else if(createNode)
+ else if (isQueueExist(dest,assertNode))
{
- setLegacyFiledsForQueueType(dest);
- send0_10QueueDeclare(dest,null,noLocal,noWait, false);
- sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
- null,dest.getExchangeName(),dest, false);
+ setLegacyFieldsForQueueType(dest);
break;
- }
+ }
}
case AMQDestination.TOPIC_TYPE:
{
- if (isExchangeExist(dest,(ExchangeNode)dest.getTargetNode(),assertNode))
+ if(createNode)
{
setLegacyFiledsForTopicType(dest);
verifySubject(dest);
- if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true))
- {
- createSubscriptionQueue(dest, noLocal);
- }
+ handleExchangeNodeCreation(dest);
break;
}
- else if(createNode)
+ else if (isExchangeExist(dest,assertNode))
{
setLegacyFiledsForTopicType(dest);
verifySubject(dest);
- sendExchangeDeclare(dest.getAddressName(),
- dest.getExchangeClass().asString(),
- dest.getTargetNode().getAlternateExchange(),
- dest.getTargetNode().getDeclareArgs(),
- false);
- if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true))
- {
- createSubscriptionQueue(dest,noLocal);
- }
break;
}
}
@@ -1287,7 +1295,6 @@ public class AMQSession_0_10 extends AMQ
throw new AMQException("Ambiguous address, please specify queue or topic as node type");
}
dest.setAddressType(type);
- dest.rebuildTargetAndSourceNodes(type);
return type;
}
}
@@ -1309,30 +1316,45 @@ public class AMQSession_0_10 extends AMQ
}
}
}
-
- private void createSubscriptionQueue(AMQDestination dest, boolean noLocal) throws AMQException
+
+ void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException
{
- QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null
-
- if (dest.getQueueName() == null)
+ Link link = dest.getLink();
+ String queueName = dest.getQueueName();
+
+ if (queueName == null)
{
- if (dest.getLink() != null && dest.getLink().getName() != null)
- {
- dest.setQueueName(new AMQShortString(dest.getLink().getName()));
- }
+ queueName = link.getName() == null ? "TempQueue" + UUID.randomUUID() : link.getName();
+ dest.setQueueName(new AMQShortString(queueName));
+ }
+
+ SubscriptionQueue queueProps = link.getSubscriptionQueue();
+ Map<String,Object> arguments = queueProps.getDeclareArgs();
+ if (!arguments.containsKey((AddressHelper.NO_LOCAL)))
+ {
+ arguments.put(AddressHelper.NO_LOCAL, noLocal);
+ }
+
+ if (link.isDurable() && queueName.startsWith("TempQueue"))
+ {
+ throw new AMQException("You cannot mark a subscription queue as durable without providing a name for the link.");
}
- node.setExclusive(true);
- node.setAutoDelete(!node.isDurable());
- send0_10QueueDeclare(dest,null,noLocal,true, false);
- getQpidSession().exchangeBind(dest.getQueueName(),
- dest.getAddressName(),
- dest.getSubject(),
- Collections.<String,Object>emptyMap());
- sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
- null,dest.getExchangeName(),dest, false);
+
+ getQpidSession().queueDeclare(queueName,
+ queueProps.getAlternateExchange(), arguments,
+ queueProps.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
+ link.isDurable() ? Option.DURABLE : Option.NONE,
+ queueProps.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+
+ Map<String,Object> bindingArguments = new HashMap<String, Object>();
+ bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
+ getQpidSession().exchangeBind(queueName,
+ dest.getAddressName(),
+ dest.getSubject(),
+ bindingArguments);
}
-
- public void setLegacyFiledsForQueueType(AMQDestination dest)
+
+ public void setLegacyFieldsForQueueType(AMQDestination dest)
{
// legacy support
dest.setQueueName(new AMQShortString(dest.getAddressName()));
@@ -1345,7 +1367,7 @@ public class AMQSession_0_10 extends AMQ
{
// legacy support
dest.setExchangeName(new AMQShortString(dest.getAddressName()));
- ExchangeNode node = (ExchangeNode)dest.getTargetNode();
+ Node node = dest.getNode();
dest.setExchangeClass(node.getExchangeType() == null?
ExchangeDefaults.TOPIC_EXCHANGE_CLASS:
new AMQShortString(node.getExchangeType()));
@@ -1424,6 +1446,13 @@ public class AMQSession_0_10 extends AMQ
return _qpidSession.isFlowBlocked();
}
+ @Override
+ public void setFlowControl(boolean active)
+ {
+ // Supported by 0-8..0-9-1 only
+ throw new UnsupportedOperationException("Operation not supported by this protocol");
+ }
+
private void cancelTimerTask()
{
if (flushTask != null)
@@ -1432,5 +1461,148 @@ public class AMQSession_0_10 extends AMQ
flushTask = null;
}
}
-}
+ private void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException
+ {
+ Node node = dest.getNode();
+ Map<String,Object> arguments = node.getDeclareArgs();
+ if (!arguments.containsKey((AddressHelper.NO_LOCAL)))
+ {
+ arguments.put(AddressHelper.NO_LOCAL, noLocal);
+ }
+ getQpidSession().queueDeclare(dest.getAddressName(),
+ node.getAlternateExchange(), arguments,
+ node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
+ node.isDurable() ? Option.DURABLE : Option.NONE,
+ node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+
+ createBindings(dest, dest.getNode().getBindings());
+ sync();
+ }
+
+ void handleExchangeNodeCreation(AMQDestination dest) throws AMQException
+ {
+ Node node = dest.getNode();
+ sendExchangeDeclare(dest.getAddressName(),
+ node.getExchangeType(),
+ node.getAlternateExchange(),
+ node.getDeclareArgs(),
+ false,
+ node.isDurable(),
+ node.isAutoDelete());
+
+ // If bindings are specified without a queue name and is called by the producer,
+ // the broker will send an exception as expected.
+ createBindings(dest, dest.getNode().getBindings());
+ sync();
+ }
+
+ void handleLinkCreation(AMQDestination dest) throws AMQException
+ {
+ createBindings(dest, dest.getLink().getBindings());
+ }
+
+ void createBindings(AMQDestination dest, List<Binding> bindings)
+ {
+ String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
+ .getAddressName() : "amq.topic";
+
+ String defaultQueueName = null;
+ if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
+ {
+ defaultQueueName = dest.getQueueName();
+ }
+ else
+ {
+ defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
+ }
+
+ for (Binding binding: bindings)
+ {
+ String queue = binding.getQueue() == null?
+ defaultQueueName: binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ defaultExchangeForBinding :
+ binding.getExchange();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Binding queue : " + queue +
+ " exchange: " + exchange +
+ " using binding key " + binding.getBindingKey() +
+ " with args " + Strings.printMap(binding.getArgs()));
+ }
+ getQpidSession().exchangeBind(queue,
+ exchange,
+ binding.getBindingKey(),
+ binding.getArgs());
+ }
+ }
+
+ void handleLinkDelete(AMQDestination dest) throws AMQException
+ {
+ // We need to destroy link bindings
+ String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
+ .getAddressName() : "amq.topic";
+
+ String defaultQueueName = null;
+ if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
+ {
+ defaultQueueName = dest.getQueueName();
+ }
+ else
+ {
+ defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
+ }
+
+ for (Binding binding: dest.getLink().getBindings())
+ {
+ String queue = binding.getQueue() == null?
+ defaultQueueName: binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ defaultExchangeForBinding :
+ binding.getExchange();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Unbinding queue : " + queue +
+ " exchange: " + exchange +
+ " using binding key " + binding.getBindingKey() +
+ " with args " + Strings.printMap(binding.getArgs()));
+ }
+ getQpidSession().exchangeUnbind(queue, exchange,
+ binding.getBindingKey());
+ }
+ }
+
+ void deleteSubscriptionQueue(AMQDestination dest) throws AMQException
+ {
+ // We need to delete the subscription queue.
+ if (dest.getAddressType() == AMQDestination.TOPIC_TYPE &&
+ dest.getLink().getSubscriptionQueue().isExclusive() &&
+ isQueueExist(dest, false))
+ {
+ getQpidSession().queueDelete(dest.getQueueName());
+ }
+ }
+
+ void handleNodeDelete(AMQDestination dest) throws AMQException
+ {
+ if (AMQDestination.TOPIC_TYPE == dest.getAddressType())
+ {
+ if (isExchangeExist(dest,false))
+ {
+ getQpidSession().exchangeDelete(dest.getAddressName());
+ }
+ }
+ else
+ {
+ if (isQueueExist(dest,false))
+ {
+ getQpidSession().queueDelete(dest.getAddressName());
+ }
+ }
+ }
+}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Feb 28 16:14:30 2013
@@ -21,12 +21,18 @@
package org.apache.qpid.client;
+import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE;
+import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
+import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE;
+import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
@@ -58,6 +64,27 @@ public class AMQSession_0_8 extends AMQS
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
+ public static final String QPID_SYNC_AFTER_CLIENT_ACK = "qpid.sync_after_client.ack";
+
+ private final boolean _syncAfterClientAck =
+ Boolean.parseBoolean(System.getProperty(QPID_SYNC_AFTER_CLIENT_ACK, "true"));
+
+ /**
+ * The period to wait while flow controlled before sending a log message confirming that the session is still
+ * waiting on flow control being revoked
+ */
+ private final long _flowControlWaitPeriod = Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD,
+ DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD);
+
+ /**
+ * The period to wait while flow controlled before declaring a failure
+ */
+ private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE,
+ DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
+
+ /** Flow control */
+ private FlowControlIndicator _flowControl = new FlowControlIndicator();
+
/**
* Creates a new session on a connection.
*
@@ -98,8 +125,9 @@ public class AMQSession_0_8 extends AMQS
return getProtocolHandler().getProtocolVersion();
}
- protected void acknowledgeImpl()
+ protected void acknowledgeImpl() throws JMSException
{
+ boolean syncRequired = false;
while (true)
{
Long tag = getUnacknowledgedMessageTags().poll();
@@ -109,6 +137,19 @@ public class AMQSession_0_8 extends AMQS
}
acknowledgeMessage(tag, false);
+ syncRequired = true;
+ }
+
+ try
+ {
+ if (syncRequired && _syncAfterClientAck)
+ {
+ sync();
+ }
+ }
+ catch (AMQException a)
+ {
+ throw new JMSAMQException("Failed to sync after acknowledge", a);
}
}
@@ -359,9 +400,9 @@ public class AMQSession_0_8 extends AMQS
}
}
- @Override public void sendConsume(BasicMessageConsumer_0_8 consumer,
+ @Override
+ public void sendConsume(BasicMessageConsumer_0_8 consumer,
AMQShortString queueName,
- AMQProtocolHandler protocolHandler,
boolean nowait,
int tag) throws AMQException, FailoverException
{
@@ -380,27 +421,29 @@ public class AMQSession_0_8 extends AMQS
if (nowait)
{
- protocolHandler.writeFrame(jmsConsume);
+ getProtocolHandler().writeFrame(jmsConsume);
}
else
{
- protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
+ getProtocolHandler().syncWrite(jmsConsume, BasicConsumeOkBody.class);
}
}
- public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
- final boolean nowait) throws AMQException, FailoverException
+ @Override
+ public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
+ boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException
{
+ //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path
+
ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,
name.toString().startsWith("amq."),
- false,false,false,false,null);
+ durable, autoDelete, internal, false, null);
AMQFrame exchangeDeclare = body.generateFrame(getChannelId());
- protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+ getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
- public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean nowait, boolean passive) throws AMQException, FailoverException
+ private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws AMQException, FailoverException
{
QueueDeclareBody body =
getMethodRegistry().createQueueDeclareBody(getTicket(),
@@ -414,7 +457,32 @@ public class AMQSession_0_8 extends AMQS
AMQFrame queueDeclare = body.generateFrame(getChannelId());
- protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
+ getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+ }
+
+ @Override
+ protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal,
+ final boolean nowait, final boolean passive) throws AMQException
+ {
+ //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path
+
+ final AMQProtocolHandler protocolHandler = getProtocolHandler();
+ return new FailoverNoopSupport<AMQShortString, AMQException>(
+ new FailoverProtectedOperation<AMQShortString, AMQException>()
+ {
+ public AMQShortString execute() throws AMQException, FailoverException
+ {
+ // Generate the queue name if the destination indicates that a client generated name is to be used.
+ if (amqd.isNameRequired())
+ {
+ amqd.setQueueName(protocolHandler.generateQueueName());
+ }
+
+ sendQueueDeclare(amqd, passive);
+
+ return amqd.getAMQQueueName();
+ }
+ }, getAMQConnection()).execute();
}
public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException
@@ -440,10 +508,8 @@ public class AMQSession_0_8 extends AMQS
final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable arguments,
final boolean noConsume, final boolean autoClose) throws JMSException
{
-
- final AMQProtocolHandler protocolHandler = getProtocolHandler();
return new BasicMessageConsumer_0_8(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal,
- getMessageFactoryRegistry(),this, protocolHandler, arguments, prefetchHigh, prefetchLow,
+ getMessageFactoryRegistry(),this, arguments, prefetchHigh, prefetchLow,
exclusive, getAcknowledgeMode(), noConsume, autoClose);
}
@@ -629,12 +695,11 @@ public class AMQSession_0_8 extends AMQS
declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false);
}
- public void handleAddressBasedDestination(AMQDestination dest,
+ public void resolveAddress(AMQDestination dest,
boolean isConsumer,
- boolean noLocal,
- boolean noWait) throws AMQException
+ boolean noLocal) throws AMQException
{
- throw new UnsupportedOperationException("The new addressing based sytanx is "
+ throw new UnsupportedOperationException("The new addressing based syntax is "
+ "not supported for AMQP 0-8/0-9 versions");
}
@@ -662,14 +727,23 @@ public class AMQSession_0_8 extends AMQS
queueName == null ? null : new AMQShortString(queueName),
bindingKey == null ? null : new AMQShortString(bindingKey));
}
-
+
+ private AMQProtocolHandler getProtocolHandler()
+ {
+ return getAMQConnection().getProtocolHandler();
+ }
+
+ public MethodRegistry getMethodRegistry()
+ {
+ MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
+ return methodRegistry;
+ }
public AMQException getLastException()
{
// if the Connection has closed then we should throw any exception that
// has occurred that we were not waiting for
- AMQStateManager manager = getAMQConnection().getProtocolHandler()
- .getStateManager();
+ AMQStateManager manager = getProtocolHandler().getStateManager();
Exception e = manager.getLastException();
if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED)
@@ -693,6 +767,49 @@ public class AMQSession_0_8 extends AMQS
}
}
+ public boolean isFlowBlocked()
+ {
+ synchronized (_flowControl)
+ {
+ return !_flowControl.getFlowControl();
+ }
+ }
+
+ public void setFlowControl(final boolean active)
+ {
+ _flowControl.setFlowControl(active);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
+ }
+ }
+
+ void checkFlowControl() throws InterruptedException, JMSException
+ {
+ long expiryTime = 0L;
+ synchronized (_flowControl)
+ {
+ while (!_flowControl.getFlowControl() &&
+ (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + _flowControlWaitFailure)
+ : expiryTime) >= System.currentTimeMillis() )
+ {
+
+ _flowControl.wait(_flowControlWaitPeriod);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control");
+ }
+ }
+ if(!_flowControl.getFlowControl())
+ {
+ _logger.error("Message send failed due to timeout waiting on broker enforced flow control");
+ throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control");
+ }
+ }
+ }
+
+
+
public abstract static class DestinationCache<T extends AMQDestination>
{
private final Map<AMQShortString, Map<AMQShortString, T>> cache = new HashMap<AMQShortString, Map<AMQShortString, T>>();
@@ -740,6 +857,22 @@ public class AMQSession_0_8 extends AMQS
}
}
+ private static final class FlowControlIndicator
+ {
+ private volatile boolean _flowControl = true;
+
+ public synchronized void setFlowControl(boolean flowControl)
+ {
+ _flowControl = flowControl;
+ notify();
+ }
+
+ public boolean getFlowControl()
+ {
+ return _flowControl;
+ }
+ }
+
private final TopicDestinationCache _topicDestinationCache = new TopicDestinationCache();
private final QueueDestinationCache _queueDestinationCache = new QueueDestinationCache();
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Thu Feb 28 16:14:30 2013
@@ -114,8 +114,8 @@ public class AMQTopic extends AMQDestina
AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection);
// link is never null if dest was created using an address string.
t.getLink().setName(queueName.asString());
- t.getSourceNode().setAutoDelete(false);
- t.getSourceNode().setDurable(true);
+ t.getLink().getSubscriptionQueue().setAutoDelete(false);
+ t.getLink().setDurable(true);
// The legacy fields are also populated just in case.
t.setQueueName(queueName);
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Feb 28 16:14:30 2013
@@ -31,7 +31,6 @@ import org.apache.qpid.client.message.AM
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.CloseConsumerMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.client.filter.JMSSelectorFilter;
import org.apache.qpid.framing.AMQShortString;
@@ -87,8 +86,6 @@ public abstract class BasicMessageConsum
private final AMQSession _session;
- private final AMQProtocolHandler _protocolHandler;
-
/**
* We need to store the "raw" field table so that we can resubscribe in the event of failover being required
*/
@@ -140,9 +137,9 @@ public abstract class BasicMessageConsum
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
- AMQSession session, AMQProtocolHandler protocolHandler,
- FieldTable rawSelector, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
+ AMQSession session, FieldTable rawSelector,
+ int prefetchHigh, int prefetchLow, boolean exclusive,
+ int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
{
_channelId = channelId;
_connection = connection;
@@ -150,7 +147,6 @@ public abstract class BasicMessageConsum
_destination = destination;
_messageFactory = messageFactory;
_session = session;
- _protocolHandler = protocolHandler;
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
@@ -597,7 +593,6 @@ public abstract class BasicMessageConsum
{
sendCancel();
}
- cleanupQueue();
}
}
catch (AMQException e)
@@ -635,8 +630,6 @@ public abstract class BasicMessageConsum
}
abstract void sendCancel() throws AMQException, FailoverException;
-
- abstract void cleanupQueue() throws AMQException, FailoverException;
/**
* Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has
@@ -1042,10 +1035,4 @@ public abstract class BasicMessageConsum
{
return _messageFactory;
}
-
- protected AMQProtocolHandler getProtocolHandler()
- {
- return _protocolHandler;
- }
-
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Feb 28 16:14:30 2013
@@ -28,7 +28,6 @@ import org.apache.qpid.client.message.AM
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.Session;
@@ -82,13 +81,13 @@ public class BasicMessageConsumer_0_10 e
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
- AMQSession<?,?> session, AMQProtocolHandler protocolHandler,
- FieldTable rawSelector, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose)
+ AMQSession<?,?> session, FieldTable rawSelector,
+ int prefetchHigh, int prefetchLow, boolean exclusive,
+ int acknowledgeMode, boolean browseOnly, boolean autoClose)
throws JMSException
{
- super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
- rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
+ super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, rawSelector,
+ prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
_0_10session = (AMQSession_0_10) session;
_serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
@@ -96,6 +95,7 @@ public class BasicMessageConsumer_0_10 e
_capacity = evaluateCapacity(destination);
+ // This is due to the Destination carrying the temporary subscription name which is incorrect.
if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType())
{
boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ;
@@ -164,6 +164,7 @@ public class BasicMessageConsumer_0_10 e
@Override void sendCancel() throws AMQException
{
_0_10session.getQpidSession().messageCancel(getConsumerTagString());
+ postSubscription();
try
{
_0_10session.getQpidSession().sync();
@@ -500,7 +501,7 @@ public class BasicMessageConsumer_0_10 e
}
}
- void cleanupQueue() throws AMQException, FailoverException
+ void postSubscription() throws AMQException
{
AMQDestination dest = this.getDestination();
if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
@@ -508,9 +509,11 @@ public class BasicMessageConsumer_0_10 e
if (dest.getDelete() == AddressOption.ALWAYS ||
dest.getDelete() == AddressOption.RECEIVER )
{
- ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
- this.getDestination().getQueueName());
+ ((AMQSession_0_10) getSession()).handleNodeDelete(dest);
+ ((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest);
}
+ // Subscription queue is handled as part of linkDelete method.
+ ((AMQSession_0_10) getSession()).handleLinkDelete(dest);
}
}
@@ -560,4 +563,4 @@ public class BasicMessageConsumer_0_10 e
return capacity;
}
-}
+}
\ No newline at end of file
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Thu Feb 28 16:14:30 2013
@@ -29,7 +29,6 @@ import org.apache.qpid.client.message.AM
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_8;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQFrame;
@@ -52,12 +51,12 @@ public class BasicMessageConsumer_0_8 ex
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession_0_8 session,
- AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
+ FieldTable rawSelector, int prefetchHigh, int prefetchLow, boolean exclusive,
+ int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
{
super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
- protocolHandler, rawSelector, prefetchHigh, prefetchLow, exclusive,
- acknowledgeMode, browseOnly, autoClose);
+ rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode,
+ browseOnly, autoClose);
final FieldTable consumerArguments = getArguments();
if (isAutoClose())
{
@@ -93,13 +92,19 @@ public class BasicMessageConsumer_0_8 ex
}
}
+ @Override
+ public AMQSession_0_8 getSession()
+ {
+ return (AMQSession_0_8) super.getSession();
+ }
+
void sendCancel() throws AMQException, FailoverException
{
BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(getConsumerTag())), false);
final AMQFrame cancelFrame = body.generateFrame(getChannelId());
- getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class);
+ getConnection().getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class);
if (_logger.isDebugEnabled())
{
@@ -122,11 +127,6 @@ public class BasicMessageConsumer_0_8 ex
return receive();
}
- void cleanupQueue() throws AMQException, FailoverException
- {
-
- }
-
public RejectBehaviour getRejectBehaviour()
{
return _rejectBehaviour;
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Thu Feb 28 16:14:30 2013
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.client;
-import java.io.UnsupportedEncodingException;
import java.util.UUID;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
@@ -36,15 +35,15 @@ import javax.jms.Topic;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
+
+
enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL };
private final Logger _logger ;
@@ -71,18 +70,6 @@ public abstract class BasicMessageProduc
private AMQDestination _destination;
/**
- * Default encoding used for messages produced by this producer.
- */
- private String _encoding;
-
- /**
- * Default encoding used for message produced by this producer.
- */
- private String _mimeType;
-
- private AMQProtocolHandler _protocolHandler;
-
- /**
* True if this producer was created from a transacted session
*/
private boolean _transacted;
@@ -135,14 +122,12 @@ public abstract class BasicMessageProduc
private PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
protected BasicMessageProducer(Logger logger,AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
- Boolean immediate, Boolean mandatory) throws AMQException
+ AMQSession session, long producerId, Boolean immediate, Boolean mandatory) throws AMQException
{
_logger = logger;
_connection = connection;
_destination = destination;
_transacted = transacted;
- _protocolHandler = protocolHandler;
_channelId = channelId;
_session = session;
_producerId = producerId;
@@ -163,6 +148,11 @@ public abstract class BasicMessageProduc
setPublishMode();
}
+ protected AMQConnection getConnection()
+ {
+ return _connection;
+ }
+
void setPublishMode()
{
// Publish mode could be configured at destination level as well.
@@ -303,7 +293,6 @@ public abstract class BasicMessageProduc
checkPreConditions();
checkInitialDestination();
-
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
@@ -467,7 +456,7 @@ public abstract class BasicMessageProduc
JMSException ex = new JMSException("Error validating destination");
ex.initCause(e);
ex.setLinkedException(e);
-
+
throw ex;
}
amqDestination.setExchangeExistsChecked(true);
@@ -558,19 +547,7 @@ public abstract class BasicMessageProduc
}
}
- public void setMimeType(String mimeType) throws JMSException
- {
- checkNotClosed();
- _mimeType = mimeType;
- }
-
- public void setEncoding(String encoding) throws JMSException, UnsupportedEncodingException
- {
- checkNotClosed();
- _encoding = encoding;
- }
-
- private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException
+ private void checkPreConditions() throws JMSException
{
checkNotClosed();
@@ -584,15 +561,16 @@ public abstract class BasicMessageProduc
}
}
- private void checkInitialDestination()
+ private void checkInitialDestination() throws JMSException
{
if (_destination == null)
{
throw new UnsupportedOperationException("Destination is null");
}
+ checkValidQueue();
}
- private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException
+ private void checkDestination(Destination suppliedDestination) throws JMSException
{
if ((_destination != null) && (suppliedDestination != null))
{
@@ -600,6 +578,11 @@ public abstract class BasicMessageProduc
"This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
}
+ if(suppliedDestination instanceof AMQQueue)
+ {
+ AMQQueue destination = (AMQQueue) suppliedDestination;
+ checkValidQueue(destination);
+ }
if (suppliedDestination == null)
{
throw new InvalidDestinationException("Supplied Destination was invalid");
@@ -607,6 +590,43 @@ public abstract class BasicMessageProduc
}
+ private void checkValidQueue() throws JMSException
+ {
+ if(_destination instanceof AMQQueue)
+ {
+ checkValidQueue((AMQQueue) _destination);
+ }
+ }
+
+ private void checkValidQueue(AMQQueue destination) throws JMSException
+ {
+ if (!destination.isCheckedForQueueBinding() && validateQueueOnSend())
+ {
+ if (getSession().isStrictAMQP())
+ {
+ getLogger().warn("AMQP does not support destination validation before publish");
+ destination.setCheckedForQueueBinding(true);
+ }
+ else
+ {
+ if (isBound(destination))
+ {
+ destination.setCheckedForQueueBinding(true);
+ }
+ else
+ {
+ throw new InvalidDestinationException("Queue: " + destination.getQueueName()
+ + " is not a valid destination (no binding on server)");
+ }
+ }
+ }
+ }
+
+ private boolean validateQueueOnSend()
+ {
+ return _connection.validateQueueOnSend();
+ }
+
/**
* The session used to create this producer
*/
@@ -645,16 +665,6 @@ public abstract class BasicMessageProduc
_destination = destination;
}
- protected AMQProtocolHandler getProtocolHandler()
- {
- return _protocolHandler;
- }
-
- protected void setProtocolHandler(AMQProtocolHandler protocolHandler)
- {
- _protocolHandler = protocolHandler;
- }
-
protected int getChannelId()
{
return _channelId;
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Thu Feb 28 16:14:30 2013
@@ -27,7 +27,6 @@ import org.apache.qpid.client.message.AM
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.QpidMessageProperties;
import org.apache.qpid.client.messaging.address.Link.Reliability;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
@@ -60,10 +59,9 @@ public class BasicMessageProducer_0_10 e
private byte[] userIDBytes;
BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
- Boolean immediate, Boolean mandatory) throws AMQException
+ AMQSession session, long producerId, Boolean immediate, Boolean mandatory) throws AMQException
{
- super(_logger, connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory);
+ super(_logger, connection, destination, transacted, channelId, session, producerId, immediate, mandatory);
userIDBytes = Strings.toUTF8(getUserID());
}
@@ -79,14 +77,18 @@ public class BasicMessageProducer_0_10 e
(name,
destination.getExchangeClass().toString(),
null, null,
- name.startsWith("amq.") ? Option.PASSIVE : Option.NONE);
+ name.startsWith("amq.") ? Option.PASSIVE : Option.NONE,
+ destination.isExchangeDurable() ? Option.DURABLE : Option.NONE,
+ destination.isExchangeAutoDelete() ? Option.AUTO_DELETE : Option.NONE);
}
}
else
{
try
{
- getSession().handleAddressBasedDestination(destination,false,false,false);
+ getSession().resolveAddress(destination,false,false);
+ ((AMQSession_0_10)getSession()).handleLinkCreation(destination);
+ ((AMQSession_0_10)getSession()).sync();
}
catch(Exception e)
{
@@ -251,25 +253,35 @@ public class BasicMessageProducer_0_10 e
return getSession().isQueueBound(destination);
}
+ // We should have a close and closed method to distinguish between normal close
+ // and a close due to session or connection error.
@Override
public void close() throws JMSException
{
super.close();
AMQDestination dest = getAMQDestination();
- if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ AMQSession_0_10 ssn = (AMQSession_0_10) getSession();
+ if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
{
- if (dest.getDelete() == AddressOption.ALWAYS ||
- dest.getDelete() == AddressOption.SENDER )
+ try
{
- try
- {
- ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
- getAMQDestination().getQueueName());
- }
- catch(TransportException e)
+ if (dest.getDelete() == AddressOption.ALWAYS ||
+ dest.getDelete() == AddressOption.SENDER )
{
- throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
+ ssn.handleNodeDelete(dest);
}
+ ssn.handleLinkDelete(dest);
+ }
+ catch(TransportException e)
+ {
+ throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
+ }
+ catch (AMQException e)
+ {
+ JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage());
+ ex.setLinkedException(e);
+ ex.initCause(e);
+ throw ex;
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org