You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/15 05:41:24 UTC
svn commit: r686136 [12/17] - in /incubator/qpid/branches/qpid.0-10/java: ./
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/bin/ broker/etc/ broker...
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Aug 14 20:40:49 2008
@@ -25,10 +25,12 @@
import javax.jms.IllegalStateException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.*;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
@@ -40,7 +42,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class AMQSession_0_8 extends AMQSession
+import java.util.Map;
+
+public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
/** Used for debugging. */
@@ -125,10 +129,19 @@
handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
}
- public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive) throws AMQException,
+ public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException,
FailoverException
{
- QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,null);
+ FieldTable table = null;
+ if(arguments != null && !arguments.isEmpty())
+ {
+ table = new FieldTable();
+ for(Map.Entry<String, Object> entry : arguments.entrySet())
+ {
+ table.setObject(entry.getKey(), entry.getValue());
+ }
+ }
+ QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table);
AMQFrame queueDeclare = body.generateFrame(_channelId);
getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
}
@@ -206,6 +219,7 @@
return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName());
}
+
public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
throws JMSException
{
@@ -233,10 +247,14 @@
{
throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
}
- }
+ }
- public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait,
- String messageSelector, AMQShortString tag) throws AMQException, FailoverException
+ @Override public void sendConsume(BasicMessageConsumer_0_8 consumer,
+ AMQShortString queueName,
+ AMQProtocolHandler protocolHandler,
+ boolean nowait,
+ String messageSelector,
+ int tag) throws AMQException, FailoverException
{
FieldTable arguments = FieldTableFactory.newFieldTable();
if ((messageSelector != null) && !messageSelector.equals(""))
@@ -256,7 +274,7 @@
BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
queueName,
- tag,
+ new AMQShortString(String.valueOf(tag)),
consumer.isNoLocal(),
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
consumer.isExclusive(),
@@ -314,18 +332,18 @@
}
public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
- final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable ft,
+ final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable arguments,
final boolean noConsume, final boolean autoClose) throws JMSException
{
final AMQProtocolHandler protocolHandler = getProtocolHandler();
return new BasicMessageConsumer_0_8(_channelId, _connection, destination, messageSelector, noLocal,
- _messageFactoryRegistry,this, protocolHandler, ft, prefetchHigh, prefetchLow,
+ _messageFactoryRegistry,this, protocolHandler, arguments, prefetchHigh, prefetchLow,
exclusive, _acknowledgeMode, noConsume, autoClose);
}
- public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+ public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
final boolean immediate, final boolean waitUntilSent, long producerId)
{
@@ -333,6 +351,66 @@
this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
}
+
+ @Override public void messageReceived(UnprocessedMessage message)
+ {
+
+ if (message instanceof ReturnMessage)
+ {
+ // Return of the bounced message.
+ returnBouncedMessage((ReturnMessage) message);
+ }
+ else
+ {
+ super.messageReceived(message);
+ }
+ }
+
+ private void returnBouncedMessage(final ReturnMessage msg)
+ {
+ _connection.performConnectionTask(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ // Bounced message is processed here, away from the mina thread
+ AbstractJMSMessage bouncedMessage =
+ _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
+ msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
+ AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
+ AMQShortString reason = msg.getReplyText();
+ _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+
+ // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
+ if (errorCode == AMQConstant.NO_CONSUMERS)
+ {
+ _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
+ }
+ else if (errorCode == AMQConstant.NO_ROUTE)
+ {
+ _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
+ }
+ else
+ {
+ _connection.exceptionReceived(
+ new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
+ }
+
+ }
+ catch (Exception e)
+ {
+ _logger.error(
+ "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
+ e);
+ }
+ }
+ });
+ }
+
+
+
+
public void sendRollback() throws AMQException, FailoverException
{
TxRollbackBody body = getMethodRegistry().createTxRollbackBody();
@@ -353,7 +431,7 @@
checkNotClosed();
AMQTopic origTopic = checkValidTopic(topic);
AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
- TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+ TopicSubscriberAdaptor<BasicMessageConsumer_0_8> subscriber = _subscriptions.get(name);
if (subscriber != null)
{
if (subscriber.getTopic().equals(topic))
@@ -412,6 +490,28 @@
return subscriber;
}
+
+
+
+ public void setPrefecthLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException
+ {
+ new FailoverRetrySupport<Object, AMQException>(
+ new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+
+ BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
+
+ // todo send low water mark when protocol allows.
+ // todo Be aware of possible changes to parameter order as versions change.
+ getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
+
+ return null;
+ }
+ }, _connection).execute();
+ }
+
class QueueDeclareOkHandler extends SpecificMethodFrameListener
{
@@ -437,7 +537,7 @@
}
- Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException
+ protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException
{
AMQFrame queueDeclare =
getMethodRegistry().createQueueDeclareBody(getTicket(),
@@ -449,18 +549,23 @@
false,
null).generateFrame(_channelId);
QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
- getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
+ getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
return okHandler._messageCount;
}
- final boolean tagLE(long tag1, long tag2)
+ protected final boolean tagLE(long tag1, long tag2)
{
return tag1 <= tag2;
}
- final boolean updateRollbackMark(long currentMark, long deliveryTag)
+ protected final boolean updateRollbackMark(long currentMark, long deliveryTag)
{
return false;
}
+ public AMQMessageDelegateFactory getMessageDelegateFactory()
+ {
+ return AMQMessageDelegateFactory.FACTORY_0_8;
+ }
+
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Aug 14 20:40:49 2008
@@ -22,9 +22,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.MessageConsumer;
@@ -48,7 +46,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-public abstract class BasicMessageConsumer<H, B> extends Closeable implements MessageConsumer
+public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer
{
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
@@ -71,7 +69,7 @@
private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
/** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
- protected AMQShortString _consumerTag;
+ protected int _consumerTag;
/** We need to know the channel id when constructing frames */
protected final int _channelId;
@@ -91,7 +89,7 @@
/**
* We need to store the "raw" field table so that we can resubscribe in the event of failover being required
*/
- private final FieldTable _rawSelectorFieldTable;
+ private final FieldTable _arguments;
/**
* We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of
@@ -168,7 +166,7 @@
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
- FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+ FieldTable arguments, int prefetchHigh, int prefetchLow,
boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
{
_channelId = channelId;
@@ -179,7 +177,7 @@
_messageFactory = messageFactory;
_session = session;
_protocolHandler = protocolHandler;
- _rawSelectorFieldTable = rawSelectorFieldTable;
+ _arguments = arguments;
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
@@ -277,6 +275,14 @@
_messageListener.set(messageListener);
_session.setHasMessageListeners();
_session.startDispatcherIfNecessary();
+
+ // If we already have messages on the queue, deliver them to the listener
+ Object o = _synchronousQueue.poll();
+ while (o != null)
+ {
+ messageListener.onMessage((Message) o);
+ o = _synchronousQueue.poll();
+ }
}
}
}
@@ -335,9 +341,9 @@
_receivingThread = null;
}
- public FieldTable getRawSelectorFieldTable()
+ public FieldTable getArguments()
{
- return _rawSelectorFieldTable;
+ return _arguments;
}
public int getPrefetch()
@@ -508,6 +514,12 @@
throw e;
}
+ else if (o instanceof CloseConsumerMessage)
+ {
+ _closed.set(true);
+ deregisterConsumer();
+ return null;
+ }
else
{
return (AbstractJMSMessage) o;
@@ -562,6 +574,7 @@
}
else
{
+ // FIXME: wow this is ugly
// //fixme this probably is not right
// if (!isNoConsume())
{ // done in BasicCancelOK Handler but not sending one so just deregister.
@@ -606,7 +619,8 @@
}
else
{
- _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8);
+ StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+ _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
}
}
}
@@ -615,25 +629,56 @@
}
/**
+ * @param closeMessage
+ * this message signals that we should close the browser
+ */
+ public void notifyCloseMessage(CloseConsumerMessage closeMessage)
+ {
+ if (isMessageListenerSet())
+ {
+ // Currently only possible to get this msg type with a browser.
+ // If we get the message here then we should probably just close
+ // this consumer.
+ // Though an AutoClose consumer with message listener is quite odd..
+ // Just log out the fact so we know where we are
+ _logger.warn("Using an AutoCloseconsumer with message listener is not supported.");
+ }
+ else
+ {
+ try
+ {
+ _synchronousQueue.put(closeMessage);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info(" SynchronousQueue.put interupted. Usually result of connection closing,"
+ + "but we shouldn't have close yet");
+ }
+ }
+ }
+
+
+ /**
* Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case of a
* message listener or a synchronous receive() caller.
*
* @param messageFrame the raw unprocessed mesage
*/
- void notifyMessage(UnprocessedMessage messageFrame)
+ void notifyMessage(U messageFrame)
{
- final boolean debug = _logger.isDebugEnabled();
-
- if (debug)
+ if (messageFrame instanceof CloseConsumerMessage)
{
- _logger.debug("notifyMessage called with message number " + messageFrame.getDeliveryTag());
+ notifyCloseMessage((CloseConsumerMessage) messageFrame);
+ return;
}
+
+
try
{
- AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(messageFrame);
+ AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(_session.getMessageDelegateFactory(), messageFrame);
- if (debug)
+ if (_logger.isDebugEnabled())
{
_logger.debug("Message is of type: " + jmsMessage.getClass().getName());
}
@@ -668,7 +713,7 @@
}
}
- public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<H, B> messageFrame)
+ public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, U messageFrame)
throws Exception;
/** @param jmsMessage this message has already been processed so can't redo preDeliver */
@@ -678,18 +723,9 @@
{
if (isMessageListenerSet())
{
- // we do not need a lock around the test above, and the dispatch below as it is invalid
- // for an application to alter an installed listener while the session is started
- // synchronized (_closed)
- {
- // if (!_closed.get())
- {
-
- preApplicationProcessing(jmsMessage);
- getMessageListener().onMessage(jmsMessage);
- postDeliver(jmsMessage);
- }
- }
+ preApplicationProcessing(jmsMessage);
+ getMessageListener().onMessage(jmsMessage);
+ postDeliver(jmsMessage);
}
else
{
@@ -750,7 +786,7 @@
{
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
}
-
+ _session.markDirty();
break;
case Session.DUPS_OK_ACKNOWLEDGE:
@@ -892,12 +928,12 @@
_session.deregisterConsumer(this);
}
- public AMQShortString getConsumerTag()
+ public int getConsumerTag()
{
return _consumerTag;
}
- public void setConsumerTag(AMQShortString consumerTag)
+ public void setConsumerTag(int consumerTag)
{
_consumerTag = consumerTag;
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Aug 14 20:40:49 2008
@@ -22,31 +22,24 @@
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
-import org.apache.qpid.jms.*;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpidity.api.Message;
-import org.apache.qpidity.transport.*;
-import org.apache.qpidity.transport.Session;
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.filter.MessageFilter;
-import org.apache.qpidity.filter.JMSSelectorFilter;
+import org.apache.qpid.transport.*;
+import org.apache.qpid.QpidException;
+import org.apache.qpid.filter.MessageFilter;
+import org.apache.qpid.filter.JMSSelectorFilter;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.MessageListener;
-import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This is a 0.10 message consumer.
*/
-public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer>
- implements org.apache.qpidity.nclient.util.MessageListener
+public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedMessage_0_10>
+ implements org.apache.qpid.nclient.MessagePartListener
{
/**
@@ -78,17 +71,18 @@
* Specify whether this consumer is performing a sync receive
*/
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
+ private String _consumerTagString;
//--- constructor
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
- FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+ FieldTable arguments, int prefetchHigh, int prefetchLow,
boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
throws JMSException
{
super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
- rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
+ arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
_0_10session = (AMQSession_0_10) session;
if (messageSelector != null && !messageSelector.equals(""))
{
@@ -108,7 +102,20 @@
_isStarted = connection.started();
}
- // ----- Interface org.apache.qpidity.client.util.MessageListener
+
+ @Override public void setConsumerTag(int consumerTag)
+ {
+ super.setConsumerTag(consumerTag);
+ _consumerTagString = String.valueOf(consumerTag);
+ }
+
+ public String getConsumerTagString()
+ {
+ return _consumerTagString;
+ }
+
+
+ // ----- Interface org.apache.qpid.client.util.MessageListener
/**
*
@@ -144,7 +151,7 @@
{
if (isMessageListenerSet() && ! getSession().prefetch())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1);
}
_logger.debug("messageOk, trying to notify");
@@ -157,52 +164,19 @@
/**
* This method is invoked by the transport layer when a message is delivered for this
* consumer. The message is transformed and pass to the session.
- * @param message an 0.10 message
+ * @param xfr an 0.10 message transfer
*/
- public void onMessage(Message message)
+ public void messageTransfer(MessageTransfer xfr)
+
+ //public void onMessage(Message message)
{
int channelId = getSession().getChannelId();
- long deliveryId = message.getMessageTransferId();
- AMQShortString consumerTag = getConsumerTag();
- AMQShortString exchange;
- AMQShortString routingKey;
- boolean redelivered = false;
- Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
- if (headers[0] == null) {
- headers[0] = new MessageProperties();
- }
- if( message.getDeliveryProperties() != null )
- {
- exchange = new AMQShortString(message.getDeliveryProperties().getExchange());
- routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
- redelivered = message.getDeliveryProperties().getRedelivered();
- }
- else
- {
- exchange = new AMQShortString("");
- routingKey = new AMQShortString("");
- headers[1] = new DeliveryProperties();
- }
+ int consumerTag = getConsumerTag();
+
UnprocessedMessage_0_10 newMessage =
- new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
- try
- {
- newMessage.receiveBody(message.readData());
- }
- catch (IOException e)
- {
- getSession().getAMQConnection().exceptionReceived(e);
- }
- // if there is a replyto destination then we need to request the exchange info
- ReplyTo replyTo = ((MessageProperties) headers[0]).getReplyTo();
- if (replyTo != null && replyTo.getExchange() != null && !replyTo.getExchange().equals(""))
- {
- // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
- // the exchnage class will be set later from within the sesion thread
- String replyToUrl = replyTo.getExchange() + "/" + replyTo.getRoutingKey() + "/" + replyTo.getRoutingKey();
- newMessage.setReplyToURL(replyToUrl);
- }
- newMessage.setContentHeader(headers);
+ new UnprocessedMessage_0_10(consumerTag, xfr);
+
+
getSession().messageReceived(newMessage);
// else ignore this message
}
@@ -215,47 +189,16 @@
*/
@Override void sendCancel() throws AMQException
{
- ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTag().toString());
+ ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTagString());
((AMQSession_0_10) getSession()).getQpidSession().sync();
// confirm cancel
getSession().confirmConsumerCancelled(getConsumerTag());
((AMQSession_0_10) getSession()).getCurrentException();
}
- @Override void notifyMessage(UnprocessedMessage messageFrame)
+ @Override void notifyMessage(UnprocessedMessage_0_10 messageFrame)
{
- // if there is a replyto destination then we need to request the exchange info
- String replyToURL = messageFrame.getReplyToURL();
- if (replyToURL != null && !replyToURL.equals(""))
- {
- AMQShortString shortExchangeName = new AMQShortString( replyToURL.substring(0, replyToURL.indexOf('/')));
- String replyToUrl = "://" + replyToURL;
- if (shortExchangeName.equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME))
- {
- replyToUrl = ExchangeDefaults.TOPIC_EXCHANGE_CLASS + replyToUrl;
- }
- else if (shortExchangeName.equals(ExchangeDefaults.DIRECT_EXCHANGE_NAME))
- {
- replyToUrl = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + replyToUrl;
- }
- else if (shortExchangeName.equals(ExchangeDefaults.HEADERS_EXCHANGE_NAME))
- {
- replyToUrl = ExchangeDefaults.HEADERS_EXCHANGE_CLASS + replyToUrl;
- }
- else if (shortExchangeName.equals(ExchangeDefaults.FANOUT_EXCHANGE_NAME))
- {
- replyToUrl = ExchangeDefaults.FANOUT_EXCHANGE_CLASS + replyToUrl;
- }
- else
- {
- Future<ExchangeQueryResult> future =
- ((AMQSession_0_10) getSession()).getQpidSession().exchangeQuery(shortExchangeName.toString());
- ExchangeQueryResult res = future.get();
- // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
- replyToUrl = res.getType() + replyToUrl;
- }
- ((UnprocessedMessage_0_10) messageFrame).setReplyToURL(replyToUrl);
- }
+
super.notifyMessage(messageFrame);
}
@@ -269,12 +212,10 @@
}
@Override public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
- UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception
+ AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msg) throws Exception
{
- return _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(),
- messageFrame.getExchange(), messageFrame.getRoutingKey(),
- messageFrame.getContentHeader(), messageFrame.getBodies(),
- messageFrame.getReplyToURL());
+ AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader(), ((AMQSession_0_10)getSession()).getQpidSession());
+ return _messageFactory.createMessage(msg.getMessageTransfer());
}
// private methods
@@ -330,7 +271,7 @@
// and messages are not prefetched we then need to request another one
if(! getSession().prefetch())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1);
}
}
@@ -418,7 +359,7 @@
super.setMessageListener(messageListener);
if (messageListener != null && ! getSession().prefetch())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1);
}
if (messageListener != null && !_synchronousQueue.isEmpty())
@@ -443,7 +384,7 @@
_isStarted = true;
if (_syncReceive.get())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1);
}
}
@@ -466,7 +407,7 @@
{
if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1);
}
if (! getSession().prefetch())
@@ -489,4 +430,5 @@
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
}
}
+
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Thu Aug 14 20:40:49 2008
@@ -20,40 +20,48 @@
*/
package org.apache.qpid.client;
-import java.util.concurrent.TimeUnit;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.QpidException;
import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.filter.JMSSelectorFilter;
+import org.apache.qpid.framing.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeaderBody,ContentBody>
+public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8>
{
protected final Logger _logger = LoggerFactory.getLogger(getClass());
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException
{
super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
- protocolHandler, rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive,
+ protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive,
acknowledgeMode, noConsume, autoClose);
+ try
+ {
+
+ if (messageSelector != null && messageSelector.length() > 0)
+ {
+ JMSSelectorFilter _filter = new JMSSelectorFilter(messageSelector);
+ }
+ }
+ catch (QpidException e)
+ {
+ throw new InvalidSelectorException("cannot create consumer because of selector issue");
+ }
}
void sendCancel() throws AMQException, FailoverException
{
- BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(_consumerTag, false);
+ BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(_consumerTag)), false);
final AMQFrame cancelFrame = body.generateFrame(_channelId);
@@ -65,7 +73,7 @@
}
}
- public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<ContentHeaderBody, ContentBody> messageFrame)throws Exception
+ public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
{
return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
@@ -74,4 +82,4 @@
}
-}
\ No newline at end of file
+}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Thu Aug 14 20:40:49 2008
@@ -31,23 +31,16 @@
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
-import javax.jms.Queue;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
-import javax.jms.Topic;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.CompositeAMQDataBlock;
import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.util.UUIDGen;
+import org.apache.qpid.util.UUIDs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,6 +115,8 @@
private boolean _disableMessageId;
+ private UUIDGen _messageIdGenerator = UUIDs.newGenerator();
+
private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
@@ -365,27 +360,27 @@
if (message instanceof BytesMessage)
{
- newMessage = new MessageConverter((BytesMessage) message).getConvertedMessage();
+ newMessage = new MessageConverter(_session, (BytesMessage) message).getConvertedMessage();
}
else if (message instanceof MapMessage)
{
- newMessage = new MessageConverter((MapMessage) message).getConvertedMessage();
+ newMessage = new MessageConverter(_session, (MapMessage) message).getConvertedMessage();
}
else if (message instanceof ObjectMessage)
{
- newMessage = new MessageConverter((ObjectMessage) message).getConvertedMessage();
+ newMessage = new MessageConverter(_session, (ObjectMessage) message).getConvertedMessage();
}
else if (message instanceof TextMessage)
{
- newMessage = new MessageConverter((TextMessage) message).getConvertedMessage();
+ newMessage = new MessageConverter(_session, (TextMessage) message).getConvertedMessage();
}
else if (message instanceof StreamMessage)
{
- newMessage = new MessageConverter((StreamMessage) message).getConvertedMessage();
+ newMessage = new MessageConverter(_session, (StreamMessage) message).getConvertedMessage();
}
else
{
- newMessage = new MessageConverter(message).getConvertedMessage();
+ newMessage = new MessageConverter(_session, message).getConvertedMessage();
}
if (newMessage != null)
@@ -453,19 +448,18 @@
}
}
+ UUID messageId = null;
if (_disableMessageId)
{
- message.setJMSMessageID(null);
+ message.setJMSMessageID((UUID)null);
}
else
{
- StringBuilder b = new StringBuilder(39);
- b.append("ID:");
- b.append(UUID.randomUUID());
- message.setJMSMessageID(b.toString());
+ messageId = _messageIdGenerator.generate();
+ message.setJMSMessageID(messageId);
}
- sendMessage(destination, origMessage, message, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
+ sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
if (message != origMessage)
{
@@ -484,8 +478,8 @@
}
abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
- int deliveryMode, int priority, long timeToLive, boolean mandatory,
- boolean immediate, boolean wait)throws JMSException;
+ UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
+ boolean immediate, boolean wait) throws JMSException;
private void checkTemporaryDestination(AMQDestination destination) throws JMSException
{
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Thu Aug 14 20:40:49 2008
@@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.nio.ByteBuffer;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -29,17 +30,15 @@
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpidity.nclient.util.ByteBufferMessage;
-import org.apache.qpidity.njms.ExceptionHelper;
-import org.apache.qpidity.transport.DeliveryProperties;
-import org.apache.qpidity.transport.MessageDeliveryMode;
-import org.apache.qpidity.transport.MessageDeliveryPriority;
-import org.apache.qpidity.transport.MessageProperties;
-import org.apache.qpidity.transport.ReplyTo;
+import org.apache.qpid.nclient.util.ByteBufferMessage;
+import org.apache.qpid.njms.ExceptionHelper;
+import org.apache.qpid.transport.*;
+import static org.apache.qpid.transport.Option.*;
/**
* This is a 0_10 message producer.
@@ -68,23 +67,25 @@
* Sends a message to a given destination
*/
void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
- int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate,
- boolean wait) throws JMSException
+ UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
+ boolean immediate, boolean wait) throws JMSException
{
message.prepareForSending();
- if (message.get010Message() == null)
+
+ AMQMessageDelegate_0_10 delegate = (AMQMessageDelegate_0_10) message.getDelegate();
+
+ DeliveryProperties deliveryProp = delegate.getDeliveryProperties();
+ MessageProperties messageProps = delegate.getMessageProperties();
+
+ if (messageId != null)
{
- message.set010Message(new ByteBufferMessage());
+ messageProps.setMessageId(messageId);
}
- // force a rebuild of the 0-10 message if data has changed
- if (message.getData() == null)
+ else if (messageProps.hasMessageId())
{
- message.dataChanged();
+ messageProps.clearMessageId();
}
- DeliveryProperties deliveryProp = message.get010Message().getDeliveryProperties();
- MessageProperties messageProps = message.get010Message().getMessageProperties();
- // set the delivery properties
if (!_disableTimestamps)
{
final long currentTime = System.currentTimeMillis();
@@ -124,10 +125,10 @@
deliveryProp.setPriority(MessageDeliveryPriority.get((short) priority));
message.setJMSPriority(priority);
}
- String excahngeName = destination.getExchangeName().toString();
- if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(excahngeName))
+ String exchangeName = destination.getExchangeName().toString();
+ if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(exchangeName))
{
- deliveryProp.setExchange(excahngeName);
+ deliveryProp.setExchange(exchangeName);
}
String routingKey = destination.getRoutingKey().toString();
if (deliveryProp.getRoutingKey() == null || ! deliveryProp.getRoutingKey().equals(routingKey))
@@ -135,105 +136,29 @@
deliveryProp.setRoutingKey(routingKey);
}
- BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
- if (contentHeaderProperties.reset())
- {
- // set the application properties
- messageProps.setContentType(contentHeaderProperties.getContentType().toString());
- messageProps.setContentLength(message.getContentLength());
-
- // XXX: fixme
- String mid = message.getJMSMessageID();
- if( mid != null )
- {
- messageProps.setMessageId(UUID.fromString(mid.substring(3)));
- }
-
- AMQShortString correlationID = contentHeaderProperties.getCorrelationId();
- if (correlationID != null)
- {
- messageProps.setCorrelationId(correlationID.getBytes());
- }
-
- String replyToURL = contentHeaderProperties.getReplyToAsString();
- if (replyToURL != null)
- {
- if(_logger.isDebugEnabled())
- {
- StringBuffer b = new StringBuffer();
- b.append("\n==========================");
- b.append("\nReplyTo : " + replyToURL);
- b.append("\n==========================");
- _logger.debug(b.toString());
- }
- AMQBindingURL dest;
- try
- {
- dest = new AMQBindingURL(replyToURL);
- }
- catch (URISyntaxException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- messageProps.setReplyTo(new ReplyTo(dest.getExchangeName().toString(), dest.getRoutingKey().toString()));
- }
-
- Map<String,Object> map = null;
-
- if (contentHeaderProperties.getHeaders() != null)
- {
- //JMS_QPID_DESTTYPE is always set but useles so this is a temporary fix
- contentHeaderProperties.getHeaders().remove(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
- map = FiledTableSupport.convertToMap(contentHeaderProperties.getHeaders());
- }
-
- AMQShortString type = contentHeaderProperties.getType();
- if (type != null)
- {
- if (map == null)
- {
- map = new HashMap<String,Object>();
- }
- map.put(AbstractJMSMessage.JMS_TYPE, type.toString());
- }
-
- if (map != null)
- {
- messageProps.setApplicationHeaders(map);
- }
- }
+ messageProps.setContentLength(message.getContentLength());
// send the message
try
{
- org.apache.qpidity.nclient.Session ssn = ((AMQSession_0_10) getSession()).getQpidSession();
+ org.apache.qpid.transport.Session ssn = (org.apache.qpid.transport.Session)
+ ((AMQSession_0_10) getSession()).getQpidSession();
// if true, we need to sync the delivery of this message
boolean sync = (deliveryMode == DeliveryMode.PERSISTENT &&
getSession().getAMQConnection().getSyncPersistence());
+ org.apache.mina.common.ByteBuffer data = message.getData();
+ ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice();
+
+ ssn.messageTransfer(destination.getExchangeName().toString(), MessageAcceptMode.NONE,
+ MessageAcquireMode.PRE_ACQUIRED,
+ new Header(deliveryProp, messageProps),
+ buffer, sync ? SYNC : NONE);
if (sync)
{
- ssn.setAutoSync(true);
- }
- try
- {
- ssn.messageTransfer(destination.getExchangeName().toString(),
- message.get010Message(),
- ssn.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
- ssn.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+ ssn.sync();
}
- finally
- {
- if (sync)
- {
- ssn.setAutoSync(false);
- }
- }
- }
- catch (IOException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
catch (RuntimeException rte)
{
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Thu Aug 14 20:40:49 2008
@@ -20,11 +20,15 @@
*/
package org.apache.qpid.client;
+import java.util.UUID;
+
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.AMQMessageDelegate;
+import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicConsumeBody;
@@ -65,9 +69,9 @@
_protocolHandler.writeFrame(declare);
}
- void sendMessage(AMQDestination destination, Message origMessage,AbstractJMSMessage message,
- int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate,
- boolean wait) throws JMSException
+ void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
+ UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
+ boolean immediate, boolean wait) throws JMSException
{
BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
destination.getExchangeName(),
@@ -79,7 +83,8 @@
message.prepareForSending();
ByteBuffer payload = message.getData();
- BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
+ AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
+ BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
if (!_disableTimestamps)
{
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java Thu Aug 14 20:40:49 2008
@@ -32,20 +32,20 @@
* Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract
*
*/
-class TopicSubscriberAdaptor implements TopicSubscriber
+class TopicSubscriberAdaptor<C extends BasicMessageConsumer> implements TopicSubscriber
{
private final Topic _topic;
- private final BasicMessageConsumer _consumer;
+ private final C _consumer;
private final boolean _noLocal;
- TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer, boolean noLocal)
+ TopicSubscriberAdaptor(Topic topic, C consumer, boolean noLocal)
{
_topic = topic;
_consumer = consumer;
_noLocal = noLocal;
}
- TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer)
+ TopicSubscriberAdaptor(Topic topic, C consumer)
{
this(topic, consumer, consumer.isNoLocal());
}
@@ -103,7 +103,7 @@
}
private void checkPreConditions() throws javax.jms.IllegalStateException{
- BasicMessageConsumer msgConsumer = (BasicMessageConsumer)_consumer;
+ C msgConsumer = _consumer;
if (msgConsumer.isClosed() ){
throw new javax.jms.IllegalStateException("Consumer is closed");
@@ -120,7 +120,7 @@
}
}
- BasicMessageConsumer getMessageConsumer()
+ C getMessageConsumer()
{
return _consumer;
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java Thu Aug 14 20:40:49 2008
@@ -21,9 +21,9 @@
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.dtx.XidImpl;
-import org.apache.qpidity.transport.*;
+import org.apache.qpid.QpidException;
+import org.apache.qpid.dtx.XidImpl;
+import org.apache.qpid.transport.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,7 +76,7 @@
_logger.debug("commit tx branch with xid: ", xid);
}
Future<XaResult> future =
- _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NO_OPTION);
+ _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NONE);
// now wait on the future for the result
XaResult result = null;
@@ -86,7 +86,7 @@
}
catch (SessionException e)
{
- // we need to restore the qpidity session that has been closed
+ // we need to restore the qpid session that has been closed
_xaSession.createSession();
// we should get a single exception
convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
@@ -127,10 +127,11 @@
default:
throw new XAException(XAException.XAER_INVAL);
}
+ _xaSession.flushAcknowledgments();
Future<XaResult> future = _xaSession.getQpidSession()
.dtxEnd(convertXid(xid),
- flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION,
- flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION);
+ flag == XAResource.TMFAIL ? Option.FAIL : Option.NONE,
+ flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NONE);
// now wait on the future for the result
XaResult result = null;
try
@@ -139,7 +140,7 @@
}
catch (SessionException e)
{
- // we need to restore the qpidity session that has been closed
+ // we need to restore the qpid session that has been closed
_xaSession.createSession();
// we should get a single exception
convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
@@ -168,7 +169,7 @@
}
catch (SessionException e)
{
- // we need to restore the qpidity session that has been closed
+ // we need to restore the qpid session that has been closed
_xaSession.createSession();
// we should get a single exception
convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
@@ -198,7 +199,7 @@
}
catch (SessionException e)
{
- // we need to restore the qpidity session that has been closed
+ // we need to restore the qpid session that has been closed
_xaSession.createSession();
// we should get a single exception
convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
@@ -245,7 +246,7 @@
}
catch (SessionException e)
{
- // we need to restore the qpidity session that has been closed
+ // we need to restore the qpid session that has been closed
_xaSession.createSession();
// we should get a single exception
convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
@@ -288,7 +289,7 @@
}
catch (SessionException e)
{
- // we need to restore the qpidity session that has been closed
+ // we need to restore the qpid session that has been closed
_xaSession.createSession();
// we should get a single exception
convertExecutionErrorToXAErr( e.getExceptions().get(0).getErrorCode());
@@ -297,7 +298,7 @@
int i = 0;
for (Object obj : res.getInDoubt())
{
- org.apache.qpidity.transport.Xid xid = (org.apache.qpidity.transport.Xid) obj;
+ org.apache.qpid.transport.Xid xid = (org.apache.qpid.transport.Xid) obj;
result[i] = new XidImpl(xid.getBranchId(), (int) xid.getFormat(), xid.getGlobalId());
i++;
}
@@ -326,7 +327,7 @@
}
catch (SessionException e)
{
- // we need to restore the qpidity session that has been closed
+ // we need to restore the qpid session that has been closed
_xaSession.createSession();
// we should get a single exception
convertExecutionErrorToXAErr( e.getExceptions().get(0).getErrorCode());
@@ -400,8 +401,8 @@
}
Future<XaResult> future = _xaSession.getQpidSession()
.dtxStart(convertXid(xid),
- flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION,
- flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION);
+ flag == XAResource.TMJOIN ? Option.JOIN : Option.NONE,
+ flag == XAResource.TMRESUME ? Option.RESUME : Option.NONE);
// now wait on the future for the result
XaResult result = null;
try
@@ -410,7 +411,7 @@
}
catch (SessionException e)
{
- // we need to restore the qpidity session that has been closed
+ // we need to restore the qpid session that has been closed
_xaSession.createSession();
// we should get a single exception
convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
@@ -511,7 +512,7 @@
* @return the qpid formated xid
* @throws XAException when xid is null or when it cannot be converted.
*/
- private org.apache.qpidity.transport.Xid convertXid(Xid xid) throws XAException
+ private org.apache.qpid.transport.Xid convertXid(Xid xid) throws XAException
{
if (xid == null)
{
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java Thu Aug 14 20:40:49 2008
@@ -17,7 +17,7 @@
*/
package org.apache.qpid.client;
-import org.apache.qpidity.nclient.DtxSession;
+import org.apache.qpid.nclient.DtxSession;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import javax.jms.*;
@@ -48,7 +48,7 @@
/**
* Create a JMS XASession
*/
- public XASessionImpl(org.apache.qpidity.nclient.Connection qpidConnection, AMQConnection con, int channelId,
+ public XASessionImpl(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId,
int defaultPrefetchHigh, int defaultPrefetchLow)
{
super(qpidConnection, con, channelId, false, // this is not a transacted session
@@ -61,7 +61,7 @@
//-- public methods
/**
- * Create a qpidity session.
+ * Create a qpid session.
*/
public void createSession()
{
@@ -126,7 +126,7 @@
*
* @return The associated Qpid Session.
*/
- protected org.apache.qpidity.nclient.DtxSession getQpidSession()
+ protected org.apache.qpid.nclient.DtxSession getQpidSession()
{
return _qpidDtxSession;
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Thu Aug 14 20:40:49 2008
@@ -139,11 +139,15 @@
// have a state waiter waiting until the connection is closed for some reason. Or in future we may have
// a slightly more complex state model therefore I felt it was worthwhile doing this.
AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
- _amqProtocolHandler.setStateManager(new AMQStateManager(_amqProtocolHandler.getProtocolSession()));
+
+ _amqProtocolHandler.setStateManager(new AMQStateManager());
+
+
if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null))
{
_logger.info("Failover process veto-ed by client");
+ //Restore Existing State Manager
_amqProtocolHandler.setStateManager(existingStateManager);
//todo: ritchiem these exceptions are useless... Would be better to attempt to propogate exception that
@@ -181,13 +185,19 @@
if (!failoverSucceeded)
{
+ //Restore Existing State Manager
_amqProtocolHandler.setStateManager(existingStateManager);
+
_amqProtocolHandler.getConnection().exceptionReceived(
new AMQDisconnectedException("Server closed connection and no failover " +
"was successful", null));
}
else
{
+ // Set the new Protocol Session in the StateManager.
+ existingStateManager.setProtocolSession(_amqProtocolHandler.getProtocolSession());
+
+ //Restore Existing State Manager
_amqProtocolHandler.setStateManager(existingStateManager);
try
{
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -5,14 +5,8 @@
import org.apache.qpid.framing.*;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.AMQNoConsumersException;
-import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.AMQChannelClosedException;
-import org.apache.qpid.protocol.AMQConstant;
public class AccessRequestOkMethodHandler implements StateAwareMethodListener<AccessRequestOkBody>
{
@@ -25,11 +19,10 @@
return _handler;
}
- public void methodReceived(AMQStateManager stateManager, AccessRequestOkBody method, int channelId)
+ public void methodReceived(AMQProtocolSession session, AccessRequestOkBody method, int channelId)
throws AMQException
{
_logger.debug("AccessRequestOk method received");
- final AMQProtocolSession session = stateManager.getProtocolSession();
session.setTicket(method.getTicket(), channelId);
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -22,11 +22,8 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,13 +42,9 @@
private BasicCancelOkMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, BasicCancelOkBody body, int channelId)
+ public void methodReceived(AMQProtocolSession session, BasicCancelOkBody body, int channelId)
throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
-
-
-
if (_logger.isInfoEnabled())
{
_logger.info("New BasicCancelOk method received for consumer:" + body.getConsumerTag());
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -21,10 +21,8 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.message.UnprocessedMessage_0_8;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.BasicDeliverBody;
import org.slf4j.Logger;
@@ -41,18 +39,16 @@
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, BasicDeliverBody body, int channelId)
- throws AMQException
+ public void methodReceived(AMQProtocolSession session, BasicDeliverBody body, int channelId)
+ throws AMQException
{
- final AMQProtocolSession session = stateManager.getProtocolSession();
final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(
- channelId,
body.getDeliveryTag(),
- body.getConsumerTag(),
+ body.getConsumerTag().toIntValue(),
body.getExchange(),
body.getRoutingKey(),
body.getRedelivered());
- _logger.debug("New JmsDeliver method received");
- session.unprocessedMessageReceived(msg);
+ _logger.debug("New JmsDeliver method received:" + session);
+ session.unprocessedMessageReceived(channelId, msg);
}
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -22,13 +22,9 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.ReturnMessage;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.UnprocessedMessage_0_8;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,19 +41,18 @@
}
- public void methodReceived(AMQStateManager stateManager, BasicReturnBody body, int channelId)
+ public void methodReceived(AMQProtocolSession session, BasicReturnBody body, int channelId)
throws AMQException
{
_logger.debug("New JmsBounce method received");
- final AMQProtocolSession session = stateManager.getProtocolSession();
- final ReturnMessage msg = new ReturnMessage(channelId,
+ final ReturnMessage msg = new ReturnMessage(
body.getExchange(),
body.getRoutingKey(),
body.getReplyText(),
body.getReplyCode()
);
- session.unprocessedMessageReceived(msg);
+ session.unprocessedMessageReceived(channelId, msg);
}
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -26,14 +26,12 @@
import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,12 +47,10 @@
return _handler;
}
- public void methodReceived(AMQStateManager stateManager, ChannelCloseBody method, int channelId)
+ public void methodReceived(AMQProtocolSession session, ChannelCloseBody method, int channelId)
throws AMQException
{
_logger.debug("ChannelClose method received");
- final AMQProtocolSession session = stateManager.getProtocolSession();
-
AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
AMQShortString reason = method.getReplyText();
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -23,9 +23,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,12 +39,11 @@
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, ChannelCloseOkBody method, int channelId)
+ public void methodReceived(AMQProtocolSession session, ChannelCloseOkBody method, int channelId)
throws AMQException
{
_logger.info("Received channel-close-ok for channel-id " + channelId);
- final AMQProtocolSession session = stateManager.getProtocolSession();
// todo this should do the local closure
}
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -2,7 +2,6 @@
import org.apache.qpid.framing.ChannelFlowBody;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
import org.slf4j.Logger;
@@ -42,11 +41,9 @@
private ChannelFlowMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, ChannelFlowBody body, int channelId)
+ public void methodReceived(AMQProtocolSession session, ChannelFlowBody body, int channelId)
throws AMQException
{
-
- final AMQProtocolSession session = stateManager.getProtocolSession();
session.setFlowControl(channelId, body.getActive());
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -22,10 +22,8 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +41,7 @@
private ChannelFlowOkMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, ChannelFlowOkBody body, int channelId)
+ public void methodReceived(AMQProtocolSession session, ChannelFlowOkBody body, int channelId)
throws AMQException
{