You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/10 20:22:37 UTC
svn commit: r1299257 [16/26] - in
/qpid/branches/rg-amqp-1-0-sandbox/qpid/java: broker-plugins/
broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/
broker-plugins/access-control/src/main/java/org/apache/qpid/serve...
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Sat Mar 10 19:22:10 2012
@@ -21,13 +21,8 @@
package org.apache.qpid.client;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.ArrayList;
-import java.util.Map;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
@@ -43,44 +38,20 @@ import org.apache.qpid.client.protocol.A
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
-import org.apache.qpid.filter.MessageFilter;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicConsumeOkBody;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicQosOkBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.BasicRecoverOkBody;
-import org.apache.qpid.framing.BasicRecoverSyncBody;
-import org.apache.qpid.framing.BasicRecoverSyncOkBody;
-import org.apache.qpid.framing.BasicRejectBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.TxCommitOkBody;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.transport.TransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
@@ -131,7 +102,7 @@ public class AMQSession_0_8 extends AMQS
{
while (true)
{
- Long tag = _unacknowledgedMessageTags.poll();
+ Long tag = getUnacknowledgedMessageTags().poll();
if (tag == null)
{
break;
@@ -145,15 +116,15 @@ public class AMQSession_0_8 extends AMQS
{
BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
- final AMQFrame ackFrame = body.generateFrame(_channelId);
+ final AMQFrame ackFrame = body.generateFrame(getChannelId());
if (_logger.isDebugEnabled())
{
- _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
+ _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + getChannelId());
}
getProtocolHandler().writeFrame(ackFrame, !isTransacted());
- _unacknowledgedMessageTags.remove(deliveryTag);
+ getUnacknowledgedMessageTags().remove(deliveryTag);
}
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
@@ -162,7 +133,7 @@ public class AMQSession_0_8 extends AMQS
{
getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
(getTicket(),queueName,exchangeName,routingKey,false,arguments).
- generateFrame(_channelId), QueueBindOkBody.class);
+ generateFrame(getChannelId()), QueueBindOkBody.class);
}
public void sendClose(long timeout) throws AMQException, FailoverException
@@ -179,7 +150,7 @@ public class AMQSession_0_8 extends AMQS
getProtocolHandler().closeSession(this);
getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
- new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId),
+ new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(getChannelId()),
ChannelCloseOkBody.class, timeout);
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully.
@@ -191,7 +162,7 @@ public class AMQSession_0_8 extends AMQS
// Acknowledge all delivered messages
while (true)
{
- Long tag = _deliveredMessageTags.poll();
+ Long tag = getDeliveredMessageTags().poll();
if (tag == null)
{
break;
@@ -202,7 +173,7 @@ public class AMQSession_0_8 extends AMQS
final AMQProtocolHandler handler = getProtocolHandler();
- handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
+ handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(getChannelId()), TxCommitOkBody.class);
}
public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException,
@@ -218,22 +189,22 @@ public class AMQSession_0_8 extends AMQS
}
}
QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table);
- AMQFrame queueDeclare = body.generateFrame(_channelId);
+ AMQFrame queueDeclare = body.generateFrame(getChannelId());
getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
}
public void sendRecover() throws AMQException, FailoverException
{
enforceRejectBehaviourDuringRecover();
- _prefetchedMessageTags.clear();
- _unacknowledgedMessageTags.clear();
+ getPrefetchedMessageTags().clear();
+ getUnacknowledgedMessageTags().clear();
if (isStrictAMQP())
{
// We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
- _connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId));
+ getAMQConnection().getProtocolHandler().writeFrame(body.generateFrame(getChannelId()));
_logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
}
else
@@ -243,17 +214,17 @@ public class AMQSession_0_8 extends AMQS
if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
{
BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
- _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class);
+ getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverOkBody.class);
}
else if(getProtocolVersion().equals(ProtocolVersion.v0_9))
{
BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
- _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
+ getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class);
}
else if(getProtocolVersion().equals(ProtocolVersion.v0_91))
{
BasicRecoverSyncBody body = ((MethodRegistry_0_91)getMethodRegistry()).createBasicRecoverSyncBody(false);
- _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
+ getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class);
}
else
{
@@ -266,9 +237,9 @@ public class AMQSession_0_8 extends AMQS
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Prefetched message: _unacknowledgedMessageTags :" + _unacknowledgedMessageTags);
+ _logger.debug("Prefetched message: _unacknowledgedMessageTags :" + getUnacknowledgedMessageTags());
}
- ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(_consumers.values());
+ ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(getConsumers().values());
boolean messageListenerFound = false;
boolean serverRejectBehaviourFound = false;
for(BasicMessageConsumer_0_8 consumer : consumersToCheck)
@@ -287,7 +258,7 @@ public class AMQSession_0_8 extends AMQS
if (serverRejectBehaviourFound)
{
//reject(false) any messages we don't want returned again
- switch(_acknowledgeMode)
+ switch(getAcknowledgeMode())
{
case Session.DUPS_OK_ACKNOWLEDGE:
case Session.AUTO_ACKNOWLEDGE:
@@ -296,7 +267,7 @@ public class AMQSession_0_8 extends AMQS
break;
}
case Session.CLIENT_ACKNOWLEDGE:
- for(Long tag : _unacknowledgedMessageTags)
+ for(Long tag : getUnacknowledgedMessageTags())
{
rejectMessage(tag, false);
}
@@ -314,7 +285,7 @@ public class AMQSession_0_8 extends AMQS
// consumer on the queue. Whilst this is within the JMS spec it is not
// user friendly and avoidable.
boolean normalRejectBehaviour = true;
- for (BasicMessageConsumer_0_8 consumer : _consumers.values())
+ for (BasicMessageConsumer_0_8 consumer : getConsumers().values())
{
if(RejectBehaviour.SERVER.equals(consumer.getRejectBehaviour()))
{
@@ -326,7 +297,7 @@ public class AMQSession_0_8 extends AMQS
while (true)
{
- Long tag = _deliveredMessageTags.poll();
+ Long tag = getDeliveredMessageTags().poll();
if (tag == null)
{
break;
@@ -338,8 +309,8 @@ public class AMQSession_0_8 extends AMQS
public void rejectMessage(long deliveryTag, boolean requeue)
{
- if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)||
- ((_acknowledgeMode == AUTO_ACKNOWLEDGE || _acknowledgeMode == DUPS_OK_ACKNOWLEDGE ) && hasMessageListeners()))
+ if ((getAcknowledgeMode() == CLIENT_ACKNOWLEDGE) || (getAcknowledgeMode() == SESSION_TRANSACTED)||
+ ((getAcknowledgeMode() == AUTO_ACKNOWLEDGE || getAcknowledgeMode() == DUPS_OK_ACKNOWLEDGE ) && hasMessageListeners()))
{
if (_logger.isDebugEnabled())
{
@@ -347,9 +318,9 @@ public class AMQSession_0_8 extends AMQS
}
BasicRejectBody body = getMethodRegistry().createBasicRejectBody(deliveryTag, requeue);
- AMQFrame frame = body.generateFrame(_channelId);
+ AMQFrame frame = body.generateFrame(getChannelId());
- _connection.getProtocolHandler().writeFrame(frame);
+ getAMQConnection().getProtocolHandler().writeFrame(frame);
}
}
@@ -370,12 +341,12 @@ public class AMQSession_0_8 extends AMQS
public AMQMethodEvent execute() throws AMQException, FailoverException
{
AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody
- (exchangeName, routingKey, queueName).generateFrame(_channelId);
+ (exchangeName, routingKey, queueName).generateFrame(getChannelId());
return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
}
- }, _connection).execute();
+ }, getAMQConnection()).execute();
// Extract and return the response code from the query.
ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
@@ -392,7 +363,6 @@ public class AMQSession_0_8 extends AMQS
AMQShortString queueName,
AMQProtocolHandler protocolHandler,
boolean nowait,
- MessageFilter messageSelector,
int tag) throws AMQException, FailoverException
{
@@ -406,7 +376,7 @@ public class AMQSession_0_8 extends AMQS
consumer.getArguments());
- AMQFrame jmsConsume = body.generateFrame(_channelId);
+ AMQFrame jmsConsume = body.generateFrame(getChannelId());
if (nowait)
{
@@ -424,17 +394,25 @@ public class AMQSession_0_8 extends AMQS
ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,
name.toString().startsWith("amq."),
false,false,false,false,null);
- AMQFrame exchangeDeclare = body.generateFrame(_channelId);
+ AMQFrame exchangeDeclare = body.generateFrame(getChannelId());
protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean nowait) throws AMQException, FailoverException
+ final boolean nowait, boolean passive) throws AMQException, FailoverException
{
- QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
+ QueueDeclareBody body =
+ getMethodRegistry().createQueueDeclareBody(getTicket(),
+ amqd.getAMQQueueName(),
+ passive,
+ amqd.isDurable(),
+ amqd.isExclusive(),
+ amqd.isAutoDelete(),
+ false,
+ null);
- AMQFrame queueDeclare = body.generateFrame(_channelId);
+ AMQFrame queueDeclare = body.generateFrame(getChannelId());
protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
}
@@ -446,7 +424,7 @@ public class AMQSession_0_8 extends AMQS
false,
false,
true);
- AMQFrame queueDeleteFrame = body.generateFrame(_channelId);
+ AMQFrame queueDeleteFrame = body.generateFrame(getChannelId());
getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
}
@@ -454,8 +432,8 @@ public class AMQSession_0_8 extends AMQS
public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException
{
ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(!suspend);
- AMQFrame channelFlowFrame = body.generateFrame(_channelId);
- _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
+ AMQFrame channelFlowFrame = body.generateFrame(getChannelId());
+ getAMQConnection().getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
}
public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
@@ -464,18 +442,18 @@ public class AMQSession_0_8 extends AMQS
{
final AMQProtocolHandler protocolHandler = getProtocolHandler();
- return new BasicMessageConsumer_0_8(_channelId, _connection, destination, messageSelector, noLocal,
- _messageFactoryRegistry,this, protocolHandler, arguments, prefetchHigh, prefetchLow,
- exclusive, _acknowledgeMode, noConsume, autoClose);
+ return new BasicMessageConsumer_0_8(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal,
+ getMessageFactoryRegistry(),this, protocolHandler, arguments, prefetchHigh, prefetchLow,
+ exclusive, getAcknowledgeMode(), noConsume, autoClose);
}
- public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, long producerId) throws JMSException
+ public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final Boolean mandatory,
+ final Boolean immediate, long producerId) throws JMSException
{
try
{
- return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId,
+ return new BasicMessageProducer_0_8(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(),
this, getProtocolHandler(), producerId, immediate, mandatory);
}
catch (AMQException e)
@@ -505,7 +483,7 @@ public class AMQSession_0_8 extends AMQS
private void returnBouncedMessage(final ReturnMessage msg)
{
- _connection.performConnectionTask(new Runnable()
+ getAMQConnection().performConnectionTask(new Runnable()
{
public void run()
{
@@ -513,8 +491,8 @@ public class AMQSession_0_8 extends AMQS
{
// Bounced message is processed here, away from the mina thread
AbstractJMSMessage bouncedMessage =
- _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
- msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(),_queueDestinationCache,_topicDestinationCache);
+ getMessageFactoryRegistry().createMessage(0, false, msg.getExchange(),
+ msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache, _topicDestinationCache);
AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
AMQShortString reason = msg.getReplyText();
_logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
@@ -522,20 +500,17 @@ public class AMQSession_0_8 extends AMQS
// @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
if (errorCode == AMQConstant.NO_CONSUMERS)
{
- _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
- }
- else if (errorCode == AMQConstant.NO_ROUTE)
+ getAMQConnection().exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
+ } else if (errorCode == AMQConstant.NO_ROUTE)
{
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
- }
- else
+ getAMQConnection().exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
+ } else
{
- _connection.exceptionReceived(
+ getAMQConnection().exceptionReceived(
new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
}
- }
- catch (Exception e)
+ } catch (Exception e)
{
_logger.error(
"Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
@@ -571,7 +546,7 @@ public class AMQSession_0_8 extends AMQS
return null;
}
- }, _connection).execute();
+ }, getAMQConnection()).execute();
}
public DestinationCache<AMQQueue> getQueueDestinationCache()
@@ -607,9 +582,18 @@ public class AMQSession_0_8 extends AMQS
return matches;
}
+ public long getMessageCount()
+ {
+ return _messageCount;
+ }
+
+ public long getConsumerCount()
+ {
+ return _consumerCount;
+ }
}
- protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException
+ protected Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException
{
AMQFrame queueDeclare =
getMethodRegistry().createQueueDeclareBody(getTicket(),
@@ -619,10 +603,10 @@ public class AMQSession_0_8 extends AMQS
amqd.isExclusive(),
amqd.isAutoDelete(),
false,
- null).generateFrame(_channelId);
+ null).generateFrame(getChannelId());
QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
- return okHandler._messageCount;
+ return okHandler.getMessageCount();
}
protected boolean tagLE(long tag1, long tag2)
@@ -647,6 +631,7 @@ public class AMQSession_0_8 extends AMQS
public void handleAddressBasedDestination(AMQDestination dest,
boolean isConsumer,
+ boolean noLocal,
boolean noWait) throws AMQException
{
throw new UnsupportedOperationException("The new addressing based sytanx is "
@@ -683,7 +668,7 @@ public class AMQSession_0_8 extends AMQS
{
// if the Connection has closed then we should throw any exception that
// has occurred that we were not waiting for
- AMQStateManager manager = _connection.getProtocolHandler()
+ AMQStateManager manager = getAMQConnection().getProtocolHandler()
.getStateManager();
Exception e = manager.getLastException();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java Sat Mar 10 19:22:10 2012
@@ -20,12 +20,11 @@
*/
package org.apache.qpid.client;
-import java.util.UUID;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.JMSException;
import javax.jms.TemporaryQueue;
-
-import org.apache.qpid.framing.AMQShortString;
+import java.util.UUID;
/** AMQ implementation of a TemporaryQueue. */
final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, TemporaryDestination
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Sat Mar 10 19:22:10 2012
@@ -20,17 +20,16 @@
*/
package org.apache.qpid.client;
-import java.net.URISyntaxException;
-
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-import javax.jms.Topic;
-
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.messaging.Address;
import org.apache.qpid.url.BindingURL;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.Topic;
+import java.net.URISyntaxException;
+
public class AMQTopic extends AMQDestination implements Topic
{
public AMQTopic(String address) throws URISyntaxException
@@ -175,7 +174,7 @@ public class AMQTopic extends AMQDestina
}
else
{
- return _exchangeName;
+ return super.getExchangeName();
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java Sat Mar 10 19:22:10 2012
@@ -20,182 +20,30 @@
*/
package org.apache.qpid.client;
-import java.io.Serializable;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
+import javax.jms.*;
import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-public class AMQTopicSessionAdaptor implements TopicSession, AMQSessionAdapter
+class AMQTopicSessionAdaptor extends AMQSessionAdapter<TopicSession> implements TopicSession
{
- protected final AMQSession _session;
-
- public AMQTopicSessionAdaptor(Session session)
- {
- _session = (AMQSession) session;
- }
- public Topic createTopic(String string) throws JMSException
+ public AMQTopicSessionAdaptor(TopicSession session)
{
- return _session.createTopic(string);
+ super(session);
}
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
- return _session.createSubscriber(topic);
+ return getSession().createSubscriber(topic);
}
public TopicSubscriber createSubscriber(Topic topic, String string, boolean b) throws JMSException
{
- return _session.createSubscriber(topic, string, b);
- }
-
- public TopicSubscriber createDurableSubscriber(Topic topic, String string) throws JMSException
- {
- return _session.createDurableSubscriber(topic, string);
- }
-
- public TopicSubscriber createDurableSubscriber(Topic topic, String string, String string1, boolean b) throws JMSException
- {
- return _session.createDurableSubscriber(topic, string, string1, b);
+ return getSession().createSubscriber(topic, string, b);
}
public TopicPublisher createPublisher(Topic topic) throws JMSException
{
- return _session.createPublisher(topic);
- }
-
- public TemporaryTopic createTemporaryTopic() throws JMSException
- {
- return _session.createTemporaryTopic();
- }
-
- public void unsubscribe(String string) throws JMSException
- {
- _session.unsubscribe(string);
- }
-
- public BytesMessage createBytesMessage() throws JMSException
- {
- return _session.createBytesMessage();
- }
-
- public MapMessage createMapMessage() throws JMSException
- {
- return _session.createMapMessage();
- }
-
- public Message createMessage() throws JMSException
- {
- return _session.createMessage();
- }
-
- public ObjectMessage createObjectMessage() throws JMSException
- {
- return _session.createObjectMessage();
- }
-
- public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException
- {
- return _session.createObjectMessage(serializable);
- }
-
- public StreamMessage createStreamMessage() throws JMSException
- {
- return _session.createStreamMessage();
- }
-
- public TextMessage createTextMessage() throws JMSException
- {
- return _session.createTextMessage();
- }
-
- public TextMessage createTextMessage(String string) throws JMSException
- {
- return _session.createTextMessage(string);
- }
-
- public boolean getTransacted() throws JMSException
- {
- return _session.getTransacted();
- }
-
- public int getAcknowledgeMode() throws JMSException
- {
- return _session.getAcknowledgeMode();
- }
-
- public void commit() throws JMSException
- {
- _session.commit();
- }
-
- public void rollback() throws JMSException
- {
- _session.rollback();
- }
-
- public void close() throws JMSException
- {
- _session.close();
- }
-
- public void recover() throws JMSException
- {
- _session.recover();
- }
-
- public MessageListener getMessageListener() throws JMSException
- {
- return _session.getMessageListener();
- }
-
- public void setMessageListener(MessageListener messageListener) throws JMSException
- {
- _session.setMessageListener(messageListener);
- }
-
- public void run()
- {
- _session.run();
- }
-
- public MessageProducer createProducer(Destination destination) throws JMSException
- {
- return _session.createProducer(destination);
- }
-
- public MessageConsumer createConsumer(Destination destination) throws JMSException
- {
- return _session.createConsumer(destination);
- }
-
- public MessageConsumer createConsumer(Destination destination, String string) throws JMSException
- {
- return _session.createConsumer(destination, string);
- }
-
- public MessageConsumer createConsumer(Destination destination, String string, boolean b) throws JMSException
- {
- return _session.createConsumer(destination, string, b);
+ return getSession().createPublisher(topic);
}
//The following methods cannot be called from a TopicSession as per JMS spec
@@ -219,8 +67,4 @@ public class AMQTopicSessionAdaptor impl
throw new IllegalStateException("Cannot call createTemporaryQueue from TopicSession");
}
- public AMQSession getSession()
- {
- return _session;
- }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Sat Mar 10 19:22:10 2012
@@ -20,29 +20,35 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.AMQInternalException;
-import org.apache.qpid.filter.JMSSelectorFilter;
-import org.apache.qpid.filter.MessageFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInternalException;
import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.*;
+import org.apache.qpid.client.filter.MessageFilter;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.CloseConsumerMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.client.filter.JMSSelectorFilter;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
import org.apache.qpid.transport.TransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -54,14 +60,13 @@ public abstract class BasicMessageConsum
{
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
- /** The connection being used by this consumer */
- protected final AMQConnection _connection;
+ private final AMQConnection _connection;
- protected final MessageFilter _messageSelectorFilter;
+ private final MessageFilter _messageSelectorFilter;
private final boolean _noLocal;
- protected AMQDestination _destination;
+ private AMQDestination _destination;
/**
* When true indicates that a blocking receive call is in progress
@@ -72,23 +77,17 @@ public abstract class BasicMessageConsum
*/
private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
- /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
- protected int _consumerTag;
+ private int _consumerTag;
- /** We need to know the channel id when constructing frames */
- protected final int _channelId;
+ private final int _channelId;
- /**
- * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
- * <p/> Argument true indicates we want strict FIFO semantics
- */
- protected final BlockingQueue _synchronousQueue;
+ private final BlockingQueue _synchronousQueue;
- protected final MessageFactoryRegistry _messageFactory;
+ private final MessageFactoryRegistry _messageFactory;
- protected final AMQSession _session;
+ private final AMQSession _session;
- protected final AMQProtocolHandler _protocolHandler;
+ private final AMQProtocolHandler _protocolHandler;
/**
* We need to store the "raw" field table so that we can resubscribe in the event of failover being required
@@ -107,17 +106,9 @@ public abstract class BasicMessageConsum
*/
private final int _prefetchLow;
- /**
- * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
- */
- protected boolean _exclusive;
+ private boolean _exclusive;
- /**
- * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
- * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
- * implementation.
- */
- protected final int _acknowledgeMode;
+ private final int _acknowledgeMode;
/**
* List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
@@ -208,6 +199,10 @@ public abstract class BasicMessageConsum
// possible to determine when querying the broker whether there are no arguments or just a non-matching selector
// argument, as specifying null for the arguments when querying means they should not be checked at all
ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
+ if(noLocal)
+ {
+ ft.put(AMQPFilterTypes.NO_LOCAL.getValue(), noLocal);
+ }
_arguments = ft;
@@ -232,6 +227,11 @@ public abstract class BasicMessageConsum
return _messageListener.get();
}
+ /**
+ * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
+ * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
+ * implementation.
+ */
public int getAcknowledgeMode()
{
return _acknowledgeMode;
@@ -279,7 +279,10 @@ public abstract class BasicMessageConsum
throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started.");
}
- _logger.debug("Message listener set for destination " + _destination);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Message listener set for destination " + _destination);
+ }
if (messageListener != null)
{
@@ -371,6 +374,9 @@ public abstract class BasicMessageConsum
return _noLocal;
}
+ /**
+ * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
+ */
public boolean isExclusive()
{
return _exclusive;
@@ -537,7 +543,7 @@ public abstract class BasicMessageConsum
}
else if (o instanceof CloseConsumerMessage)
{
- _closed.set(true);
+ setClosed();
deregisterConsumer();
return null;
}
@@ -554,14 +560,14 @@ public abstract class BasicMessageConsum
public void close(boolean sendClose) throws JMSException
{
- if (_logger.isInfoEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.info("Closing consumer:" + debugIdentity());
+ _logger.debug("Closing consumer:" + debugIdentity());
}
- if (!_closed.getAndSet(true))
+ if (!setClosed())
{
- _closing.set(true);
+ setClosing(true);
if (_logger.isDebugEnabled())
{
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
@@ -607,12 +613,8 @@ public abstract class BasicMessageConsum
}
else
{
- // FIXME: wow this is ugly
- // //fixme this probably is not right
- // if (!isNoConsume())
- { // done in BasicCancelOK Handler but not sending one so just deregister.
- deregisterConsumer();
- }
+ // FIXME?
+ deregisterConsumer();
}
// This will occur if session.close is called closing all consumers we may be blocked waiting for a receive
@@ -641,7 +643,7 @@ public abstract class BasicMessageConsum
{
// synchronized (_closed)
{
- _closed.set(true);
+ setClosed();
if (_logger.isDebugEnabled())
{
@@ -818,7 +820,7 @@ public abstract class BasicMessageConsum
{
// synchronized (_closed)
{
- _closed.set(true);
+ setClosed();
if (_logger.isDebugEnabled())
{
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
@@ -859,6 +861,7 @@ public abstract class BasicMessageConsum
_session.deregisterConsumer(this);
}
+ /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
public int getConsumerTag()
{
return _consumerTag;
@@ -1002,10 +1005,44 @@ public abstract class BasicMessageConsum
public void failedOverPre()
{
clearReceiveQueue();
- // TGM FIXME: think this should just be removed
- // clearUnackedMessages();
}
public void failedOverPost() {}
+ /** The connection being used by this consumer */
+ protected AMQConnection getConnection()
+ {
+ return _connection;
+ }
+
+ protected void setDestination(AMQDestination destination)
+ {
+ _destination = destination;
+ }
+
+ /** We need to know the channel id when constructing frames */
+ protected int getChannelId()
+ {
+ return _channelId;
+ }
+
+ /**
+ * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
+ * <p/> Argument true indicates we want strict FIFO semantics
+ */
+ protected BlockingQueue getSynchronousQueue()
+ {
+ return _synchronousQueue;
+ }
+
+ protected MessageFactoryRegistry getMessageFactory()
+ {
+ return _messageFactory;
+ }
+
+ protected AMQProtocolHandler getProtocolHandler()
+ {
+ return _protocolHandler;
+ }
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Sat Mar 10 19:22:10 2012
@@ -19,21 +19,32 @@ package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.*;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.*;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.Acquired;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.RangeSetFactory;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.TransportException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
-
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -46,7 +57,7 @@ public class BasicMessageConsumer_0_10 e
/**
* This class logger
*/
- protected final Logger _logger = LoggerFactory.getLogger(getClass());
+ private final Logger _logger = LoggerFactory.getLogger(getClass());
/**
* The underlying QpidSession
@@ -67,7 +78,7 @@ public class BasicMessageConsumer_0_10 e
private final long _capacity;
/** Flag indicating if the server supports message selectors */
- protected final boolean _serverJmsSelectorSupport;
+ private final boolean _serverJmsSelectorSupport;
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
@@ -80,11 +91,10 @@ public class BasicMessageConsumer_0_10 e
rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
_0_10session = (AMQSession_0_10) session;
- _preAcquire = evaluatePreAcquire(browseOnly, destination);
-
- _capacity = evaluateCapacity(destination);
_serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
+ _preAcquire = evaluatePreAcquire(browseOnly, destination, _serverJmsSelectorSupport);
+ _capacity = evaluateCapacity(destination);
if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType())
{
@@ -92,8 +102,8 @@ public class BasicMessageConsumer_0_10 e
if (!namedQueue)
{
- _destination = destination.copyDestination();
- _destination.setQueueName(null);
+ setDestination(destination.copyDestination());
+ getDestination().setQueueName(null);
}
}
}
@@ -181,14 +191,14 @@ public class BasicMessageConsumer_0_10 e
{
super.preDeliver(jmsMsg);
- if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
+ if (getAcknowledgeMode() == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
{
//For 0-10 we need to ensure that all messages are indicated processed in some way to
//ensure their AMQP command-id is marked completed, and so we must send a completion
//even for no-ack messages even though there isnt actually an 'acknowledgement' occurring.
//Add message to the unacked message list to ensure we dont lose record of it before
//sending a completion of some sort.
- _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
+ getSession().addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
}
}
@@ -196,7 +206,7 @@ public class BasicMessageConsumer_0_10 e
AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msg) throws Exception
{
AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader(), ((AMQSession_0_10)getSession()).getQpidSession());
- return _messageFactory.createMessage(msg.getMessageTransfer());
+ return getMessageFactory().createMessage(msg.getMessageTransfer());
}
/**
@@ -211,9 +221,9 @@ public class BasicMessageConsumer_0_10 e
boolean messageOk = true;
try
{
- if (_messageSelectorFilter != null && !_serverJmsSelectorSupport)
+ if (!_serverJmsSelectorSupport && getMessageSelectorFilter() != null)
{
- messageOk = _messageSelectorFilter.matches(message);
+ messageOk = getMessageSelectorFilter().matches(message);
}
}
catch (Exception e)
@@ -274,7 +284,7 @@ public class BasicMessageConsumer_0_10 e
{
_0_10session.messageAcknowledge
(Range.newInstance((int) message.getDeliveryTag()),
- _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+ getAcknowledgeMode() != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
final AMQException amqe = _0_10session.getCurrentException();
if (amqe != null)
@@ -338,20 +348,20 @@ public class BasicMessageConsumer_0_10 e
{
messageFlow();
}
- if (messageListener != null && !_synchronousQueue.isEmpty())
+ if (messageListener != null && !getSynchronousQueue().isEmpty())
{
- Iterator messages=_synchronousQueue.iterator();
+ Iterator messages= getSynchronousQueue().iterator();
while (messages.hasNext())
{
AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
messages.remove();
- _session.rejectMessage(message, true);
+ getSession().rejectMessage(message, true);
}
}
}
catch(TransportException e)
{
- throw _session.toJMSException("Exception while setting message listener:"+ e.getMessage(), e);
+ throw getSession().toJMSException("Exception while setting message listener:" + e.getMessage(), e);
}
}
@@ -378,7 +388,7 @@ public class BasicMessageConsumer_0_10 e
{
_syncReceive.set(true);
}
- if (_0_10session.isStarted() && _capacity == 0 && _synchronousQueue.isEmpty())
+ if (_0_10session.isStarted() && _capacity == 0 && getSynchronousQueue().isEmpty())
{
messageFlow();
}
@@ -415,19 +425,19 @@ public class BasicMessageConsumer_0_10 e
{
super.postDeliver(msg);
- switch (_acknowledgeMode)
+ switch (getAcknowledgeMode())
{
case Session.SESSION_TRANSACTED:
_0_10session.sendTxCompletionsIfNecessary();
break;
case Session.NO_ACKNOWLEDGE:
- if (!_session.isInRecovery())
+ if (!getSession().isInRecovery())
{
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ getSession().acknowledgeMessage(msg.getDeliveryTag(), false);
}
break;
case Session.AUTO_ACKNOWLEDGE:
- if (!_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
+ if (!getSession().isInRecovery() && getSession().getAMQConnection().getSyncAck())
{
((AMQSession_0_10) getSession()).getQpidSession().sync();
}
@@ -443,10 +453,10 @@ public class BasicMessageConsumer_0_10 e
@Override public void rollbackPendingMessages()
{
- if (_synchronousQueue.size() > 0)
+ if (getSynchronousQueue().size() > 0)
{
RangeSet ranges = RangeSetFactory.createRangeSet();
- Iterator iterator = _synchronousQueue.iterator();
+ Iterator iterator = getSynchronousQueue().iterator();
while (iterator.hasNext())
{
@@ -486,7 +496,7 @@ public class BasicMessageConsumer_0_10 e
}
else
{
- return _exclusive;
+ return super.isExclusive();
}
}
@@ -514,7 +524,7 @@ public class BasicMessageConsumer_0_10 e
return _preAcquire;
}
- private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination destination)
+ private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination destination, boolean serverJmsSelectorSupport)
{
boolean preAcquire;
if (browseOnly)
@@ -524,7 +534,7 @@ public class BasicMessageConsumer_0_10 e
else
{
boolean isQueue = (destination instanceof AMQQueue || getDestination().getAddressType() == AMQDestination.QUEUE_TYPE);
- if (isQueue && getMessageSelectorFilter() != null)
+ if (!serverJmsSelectorSupport && isQueue && getMessageSelectorFilter() != null)
{
preAcquire = false;
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Sat Mar 10 19:22:10 2012
@@ -20,24 +20,31 @@
*/
package org.apache.qpid.client;
-import javax.jms.JMSException;
-import javax.jms.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.*;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.UnprocessedMessage_0_8;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.url.BindingURL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8>
{
- protected final Logger _logger = LoggerFactory.getLogger(getClass());
+ private final Logger _logger = LoggerFactory.getLogger(getClass());
private AMQSession_0_8.DestinationCache<AMQTopic> _topicDestinationCache;
private AMQSession_0_8.DestinationCache<AMQQueue> _queueDestinationCache;
@@ -88,11 +95,11 @@ public class BasicMessageConsumer_0_8 ex
void sendCancel() throws AMQException, FailoverException
{
- BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(_consumerTag)), false);
+ BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(getConsumerTag())), false);
- final AMQFrame cancelFrame = body.generateFrame(_channelId);
+ final AMQFrame cancelFrame = body.generateFrame(getChannelId());
- _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+ getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class);
if (_logger.isDebugEnabled())
{
@@ -103,9 +110,9 @@ public class BasicMessageConsumer_0_8 ex
public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
{
- return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
- messageFrame.isRedelivered(), messageFrame.getExchange(),
- messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(),
+ return getMessageFactory().createMessage(messageFrame.getDeliveryTag(),
+ messageFrame.isRedelivered(), messageFrame.getExchange(),
+ messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(),
_queueDestinationCache, _topicDestinationCache);
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Sat Mar 10 19:22:10 2012
@@ -22,7 +22,6 @@ package org.apache.qpid.client;
import java.io.UnsupportedEncodingException;
import java.util.UUID;
-
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -33,12 +32,11 @@ import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
-
+import javax.jms.Topic;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
@@ -49,14 +47,11 @@ public abstract class BasicMessageProduc
{
enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL };
- protected final Logger _logger = LoggerFactory.getLogger(getClass());
+ private final Logger _logger ;
private AMQConnection _connection;
- /**
- * If true, messages will not get a timestamp.
- */
- protected boolean _disableTimestamps;
+ private boolean _disableTimestamps;
/**
* Priority of messages created by this producer.
@@ -73,10 +68,7 @@ public abstract class BasicMessageProduc
*/
private int _deliveryMode = DeliveryMode.PERSISTENT;
- /**
- * The Destination used for this consumer, if specified upon creation.
- */
- protected AMQDestination _destination;
+ private AMQDestination _destination;
/**
* Default encoding used for messages produced by this producer.
@@ -88,14 +80,14 @@ public abstract class BasicMessageProduc
*/
private String _mimeType;
- protected AMQProtocolHandler _protocolHandler;
+ private AMQProtocolHandler _protocolHandler;
/**
* True if this producer was created from a transacted session
*/
private boolean _transacted;
- protected int _channelId;
+ private int _channelId;
/**
* This is an id generated by the session and is used to tie individual producers to the session. This means we
@@ -105,29 +97,49 @@ public abstract class BasicMessageProduc
*/
private long _producerId;
- /**
- * The session used to create this producer
- */
- protected AMQSession _session;
+ private AMQSession _session;
private final boolean _immediate;
- private final boolean _mandatory;
+ private final Boolean _mandatory;
private boolean _disableMessageId;
private UUIDGen _messageIdGenerator = UUIDs.newGenerator();
- protected String _userID; // ref user id used in the connection.
+ private String _userID; // ref user id used in the connection.
- private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
- protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
+ /**
+ * The default value for immediate flag used this producer is false. That is, a consumer does
+ * not need to be attached to a queue.
+ */
+ private final boolean _defaultImmediateValue = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
+
+ /**
+ * The default value for mandatory flag used by this producer is true. That is, server will not
+ * silently drop messages where no queue is connected to the exchange for the message.
+ */
+ private final boolean _defaultMandatoryValue = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
- protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
+ /**
+ * The default value for mandatory flag used by this producer when publishing to a Topic is false. That is, server
+ * will silently drop messages where no queue is connected to the exchange for the message.
+ */
+ private final boolean _defaultMandatoryTopicValue =
+ Boolean.parseBoolean(System.getProperty("qpid.default_mandatory_topic",
+ System.getProperties().containsKey("qpid.default_mandatory")
+ ? System.getProperty("qpid.default_mandatory")
+ : "false"));
+
+ private PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
+
+ protected BasicMessageProducer(Logger logger,AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
+ AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
+ Boolean immediate, Boolean mandatory) throws AMQException
{
- _connection = connection;
+ _logger = logger;
+ _connection = connection;
_destination = destination;
_transacted = transacted;
_protocolHandler = protocolHandler;
@@ -139,8 +151,14 @@ public abstract class BasicMessageProduc
declareDestination(destination);
}
- _immediate = immediate;
- _mandatory = mandatory;
+ _immediate = immediate == null ? _defaultImmediateValue : immediate;
+ _mandatory = mandatory == null
+ ? destination == null ? null
+ : destination instanceof Topic
+ ? _defaultMandatoryTopicValue
+ : _defaultMandatoryValue
+ : mandatory;
+
_userID = connection.getUsername();
setPublishMode();
}
@@ -161,7 +179,10 @@ public abstract class BasicMessageProduc
publishMode = PublishMode.SYNC_PUBLISH_ALL;
}
- _logger.info("MessageProducer " + toString() + " using publish mode : " + publishMode);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("MessageProducer " + toString() + " using publish mode : " + publishMode);
+ }
}
void resubscribe() throws AMQException
@@ -256,6 +277,14 @@ public abstract class BasicMessageProduc
return _timeToLive;
}
+ protected AMQDestination getAMQDestination()
+ {
+ return _destination;
+ }
+
+ /**
+ * The Destination used for this consumer, if specified upon creation.
+ */
public Destination getDestination() throws JMSException
{
checkNotClosed();
@@ -265,7 +294,7 @@ public abstract class BasicMessageProduc
public void close() throws JMSException
{
- _closed.set(true);
+ setClosed();
_session.deregisterProducer(_producerId);
}
@@ -319,7 +348,12 @@ public abstract class BasicMessageProduc
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory,
+ sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive,
+ _mandatory == null
+ ? destination instanceof Topic
+ ? _defaultMandatoryTopicValue
+ : _defaultMandatoryValue
+ : _mandatory,
_immediate);
}
}
@@ -332,7 +366,13 @@ public abstract class BasicMessageProduc
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
+ _mandatory == null
+ ? destination instanceof Topic
+ ? _defaultMandatoryTopicValue
+ : _defaultMandatoryValue
+ : _mandatory,
+ _immediate);
}
}
@@ -480,7 +520,10 @@ public abstract class BasicMessageProduc
_logger.debug("Updating original message");
origMessage.setJMSPriority(message.getJMSPriority());
origMessage.setJMSTimestamp(message.getJMSTimestamp());
- _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
+ }
origMessage.setJMSExpiration(message.getJMSExpiration());
origMessage.setJMSMessageID(message.getJMSMessageID());
}
@@ -564,6 +607,9 @@ public abstract class BasicMessageProduc
}
+ /**
+ * The session used to create this producer
+ */
public AMQSession getSession()
{
return _session;
@@ -580,4 +626,73 @@ public abstract class BasicMessageProduc
throw getSession().toJMSException("Exception whilst checking destination binding:" + e.getMessage(), e);
}
}
+
+ /**
+ * If true, messages will not get a timestamp.
+ */
+ protected boolean isDisableTimestamps()
+ {
+ return _disableTimestamps;
+ }
+
+ protected void setDisableTimestamps(boolean disableTimestamps)
+ {
+ _disableTimestamps = disableTimestamps;
+ }
+
+ protected void setDestination(AMQDestination destination)
+ {
+ _destination = destination;
+ }
+
+ protected AMQProtocolHandler getProtocolHandler()
+ {
+ return _protocolHandler;
+ }
+
+ protected void setProtocolHandler(AMQProtocolHandler protocolHandler)
+ {
+ _protocolHandler = protocolHandler;
+ }
+
+ protected int getChannelId()
+ {
+ return _channelId;
+ }
+
+ protected void setChannelId(int channelId)
+ {
+ _channelId = channelId;
+ }
+
+ protected void setSession(AMQSession session)
+ {
+ _session = session;
+ }
+
+ protected String getUserID()
+ {
+ return _userID;
+ }
+
+ protected void setUserID(String userID)
+ {
+ _userID = userID;
+ }
+
+ protected PublishMode getPublishMode()
+ {
+ return publishMode;
+ }
+
+ protected void setPublishMode(PublishMode publishMode)
+ {
+ this.publishMode = publishMode;
+ }
+
+ Logger getLogger()
+ {
+ return _logger;
+ }
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Sat Mar 10 19:22:10 2012
@@ -17,18 +17,8 @@
*/
package org.apache.qpid.client;
-import static org.apache.qpid.transport.Option.NONE;
-import static org.apache.qpid.transport.Option.SYNC;
-import static org.apache.qpid.transport.Option.UNRELIABLE;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.AddressOption;
@@ -48,8 +38,18 @@ import org.apache.qpid.transport.Message
import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.Strings;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import static org.apache.qpid.transport.Option.NONE;
+import static org.apache.qpid.transport.Option.SYNC;
+import static org.apache.qpid.transport.Option.UNRELIABLE;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
/**
* This is a 0_10 message producer.
@@ -61,11 +61,11 @@ public class BasicMessageProducer_0_10 e
BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
- boolean immediate, boolean mandatory) throws AMQException
+ Boolean immediate, Boolean mandatory) throws AMQException
{
- super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory);
+ super(_logger, connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory);
- userIDBytes = Strings.toUTF8(_userID);
+ userIDBytes = Strings.toUTF8(getUserID());
}
void declareDestination(AMQDestination destination) throws AMQException
@@ -86,7 +86,7 @@ public class BasicMessageProducer_0_10 e
{
try
{
- getSession().handleAddressBasedDestination(destination,false,false);
+ getSession().handleAddressBasedDestination(destination,false,false,false);
}
catch(Exception e)
{
@@ -125,7 +125,7 @@ public class BasicMessageProducer_0_10 e
}
long currentTime = 0;
- if (timeToLive > 0 || !_disableTimestamps)
+ if (timeToLive > 0 || !isDisableTimestamps())
{
currentTime = System.currentTimeMillis();
}
@@ -136,7 +136,7 @@ public class BasicMessageProducer_0_10 e
message.setJMSExpiration(currentTime + timeToLive);
}
- if (!_disableTimestamps)
+ if (!isDisableTimestamps())
{
deliveryProp.setTimestamp(currentTime);
@@ -213,8 +213,8 @@ public class BasicMessageProducer_0_10 e
// if true, we need to sync the delivery of this message
boolean sync = false;
- sync = ( (publishMode == PublishMode.SYNC_PUBLISH_ALL) ||
- (publishMode == PublishMode.SYNC_PUBLISH_PERSISTENT &&
+ sync = ( (getPublishMode() == PublishMode.SYNC_PUBLISH_ALL) ||
+ (getPublishMode() == PublishMode.SYNC_PUBLISH_PERSISTENT &&
deliveryMode == DeliveryMode.PERSISTENT)
);
@@ -248,14 +248,14 @@ public class BasicMessageProducer_0_10 e
@Override
public boolean isBound(AMQDestination destination) throws JMSException
{
- return _session.isQueueBound(destination);
+ return getSession().isQueueBound(destination);
}
@Override
public void close() throws JMSException
{
super.close();
- AMQDestination dest = _destination;
+ AMQDestination dest = getAMQDestination();
if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
{
if (dest.getDelete() == AddressOption.ALWAYS ||
@@ -264,7 +264,7 @@ public class BasicMessageProducer_0_10 e
try
{
((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
- _destination.getQueueName());
+ getAMQDestination().getQueueName());
}
catch(TransportException e)
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Sat Mar 10 19:22:10 2012
@@ -20,18 +20,9 @@
*/
package org.apache.qpid.client;
-import java.util.UUID;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Topic;
-import javax.jms.Queue;
-
-import java.nio.ByteBuffer;
-
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
+import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -42,13 +33,24 @@ import org.apache.qpid.framing.ContentHe
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.MethodRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.Topic;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
public class BasicMessageProducer_0_8 extends BasicMessageProducer
{
+ private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_8.class);
BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
+ AMQSession session, AMQProtocolHandler protocolHandler, long producerId, Boolean immediate, Boolean mandatory) throws AMQException
{
- super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory);
+ super(_logger,connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory);
}
void declareDestination(AMQDestination destination)
@@ -56,7 +58,7 @@ public class BasicMessageProducer_0_8 ex
final MethodRegistry methodRegistry = getSession().getMethodRegistry();
ExchangeDeclareBody body =
- methodRegistry.createExchangeDeclareBody(_session.getTicket(),
+ methodRegistry.createExchangeDeclareBody(getSession().getTicket(),
destination.getExchangeName(),
destination.getExchangeClass(),
destination.getExchangeName().toString().startsWith("amq."),
@@ -68,29 +70,29 @@ public class BasicMessageProducer_0_8 ex
// Declare the exchange
// Note that the durable and internal arguments are ignored since passive is set to false
- AMQFrame declare = body.generateFrame(_channelId);
+ AMQFrame declare = body.generateFrame(getChannelId());
- _protocolHandler.writeFrame(declare);
+ getProtocolHandler().writeFrame(declare);
}
void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
boolean immediate) throws JMSException
{
- BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
+ BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(),
destination.getExchangeName(),
destination.getRoutingKey(),
mandatory,
immediate);
- AMQFrame publishFrame = body.generateFrame(_channelId);
+ AMQFrame publishFrame = body.generateFrame(getChannelId());
message.prepareForSending();
ByteBuffer payload = message.getData();
AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
- contentHeaderProperties.setUserId(_userID);
+ contentHeaderProperties.setUserId(getUserID());
//Set the JMS_QPID_DESTTYPE for 0-8/9 messages
int type;
@@ -110,7 +112,7 @@ public class BasicMessageProducer_0_8 ex
//Set JMS_QPID_DESTTYPE
delegate.getContentHeaderProperties().getHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
- if (!_disableTimestamps)
+ if (!isDisableTimestamps())
{
final long currentTime = System.currentTimeMillis();
contentHeaderProperties.setTimestamp(currentTime);
@@ -134,12 +136,12 @@ public class BasicMessageProducer_0_8 ex
if (payload != null)
{
- createContentBodies(payload, frames, 2, _channelId);
+ createContentBodies(payload, frames, 2, getChannelId());
}
- if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
+ if ((contentBodyFrameCount != 0) && getLogger().isDebugEnabled())
{
- _logger.debug("Sending content body frames to " + destination);
+ getLogger().debug("Sending content body frames to " + destination);
}
@@ -147,11 +149,11 @@ public class BasicMessageProducer_0_8 ex
int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
AMQFrame contentHeaderFrame =
- ContentHeaderBody.createAMQFrame(_channelId,
+ ContentHeaderBody.createAMQFrame(getChannelId(),
classIfForBasic, 0, contentHeaderProperties, size);
- if (_logger.isDebugEnabled())
+ if (getLogger().isDebugEnabled())
{
- _logger.debug("Sending content header frame to " + destination);
+ getLogger().debug("Sending content header frame to " + destination);
}
frames[0] = publishFrame;
@@ -160,7 +162,7 @@ public class BasicMessageProducer_0_8 ex
try
{
- _session.checkFlowControl();
+ getSession().checkFlowControl();
}
catch (InterruptedException e)
{
@@ -170,7 +172,7 @@ public class BasicMessageProducer_0_8 ex
throw jmse;
}
- _protocolHandler.writeFrame(compositeFrame);
+ getProtocolHandler().writeFrame(compositeFrame);
}
/**
@@ -194,7 +196,7 @@ public class BasicMessageProducer_0_8 ex
else
{
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ final long framePayloadMax = getSession().getAMQConnection().getMaximumFrameSize() - 1;
long remaining = payload.remaining();
for (int i = offset; i < frames.length; i++)
{
@@ -224,7 +226,7 @@ public class BasicMessageProducer_0_8 ex
else
{
int dataLength = payload.remaining();
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ final long framePayloadMax = getSession().getAMQConnection().getMaximumFrameSize() - 1;
int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java Sat Mar 10 19:22:10 2012
@@ -22,7 +22,6 @@ package org.apache.qpid.client;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
-
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -49,14 +48,14 @@ public abstract class Closeable
* We use an atomic boolean so that we do not have to synchronized access to this flag. Synchronizing access to this
* flag would mean have a synchronized block in every method.
*/
- protected final AtomicBoolean _closed = new AtomicBoolean(false);
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
/**
* Are we in the process of closing. We have this distinction so we can
* still signal we are in the process of closing so other objects can tell
* the difference and tidy up.
*/
- protected final AtomicBoolean _closing = new AtomicBoolean(false);
+ private final AtomicBoolean _closing = new AtomicBoolean(false);
/**
* Checks if this is closed, and raises a JMSException if it is.
@@ -91,6 +90,15 @@ public abstract class Closeable
return _closing.get();
}
+ protected boolean setClosed()
+ {
+ return _closed.getAndSet(true);
+ }
+
+ protected void setClosing(boolean closing)
+ {
+ _closing.set(closing);
+ }
/**
* Closes this object.
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java Sat Mar 10 19:22:10 2012
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.framing.AMQShortString;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
-import org.apache.qpid.framing.AMQShortString;
-
public enum CustomJMSXProperty
{
JMS_AMQP_NULL,
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java Sat Mar 10 19:22:10 2012
@@ -22,8 +22,8 @@ package org.apache.qpid.client;
public class MessageConsumerPair
{
- BasicMessageConsumer _consumer;
- Object _item;
+ private BasicMessageConsumer _consumer;
+ private Object _item;
public MessageConsumerPair(BasicMessageConsumer consumer, Object item)
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java Sat Mar 10 19:22:10 2012
@@ -20,12 +20,11 @@
*/
package org.apache.qpid.client;
-import java.util.Enumeration;
+import org.apache.qpid.common.QpidProperties;
import javax.jms.ConnectionMetaData;
import javax.jms.JMSException;
-
-import org.apache.qpid.common.QpidProperties;
+import java.util.Enumeration;
public class QpidConnectionMetaData implements ConnectionMetaData
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org