You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/10 14:35:28 UTC
svn commit: r574221 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client:
AMQSession_0_10.java BasicMessageConsumer_0_10.java
BasicMessageProducer_0_10.java
Author: arnaudsimon
Date: Mon Sep 10 05:35:27 2007
New Revision: 574221
URL: http://svn.apache.org/viewvc?rev=574221&view=rev
Log: (empty)
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (with props)
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=574221&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Mon Sep 10 05:35:27 2007
@@ -0,0 +1,445 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpidity.client.Session;
+import org.apache.qpidity.client.util.MessagePartListenerAdapter;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Destination;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This is a 0.10 Session
+ */
+public class AMQSession_0_10 extends AMQSession
+{
+
+ /**
+ * This class logger
+ */
+ private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class);
+
+ /**
+ * The maximum number of pre-fetched messages per destination
+ */
+ private static final long MAX_PREFETCH = 100;
+
+ /**
+ * The underlying QpidSession
+ */
+ private Session _qpidSession;
+
+ /**
+ * The latest qpid Exception that has been reaised.
+ */
+ private QpidException _currentException;
+
+ /**
+ * All the not yet acknoledged message tags
+ */
+ private ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
+
+ //--- constructors
+
+ /**
+ * Creates a new session on a connection.
+ *
+ * @param con The connection on which to create the session.
+ * @param channelId The unique identifier for the session.
+ * @param transacted Indicates whether or not the session is transactional.
+ * @param acknowledgeMode The acknoledgement mode for the session.
+ * @param messageFactoryRegistry The message factory factory for the session.
+ * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
+ * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
+ */
+ AMQSession_0_10(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
+ MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark,
+ int defaultPrefetchLowMark)
+ {
+
+ super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
+ defaultPrefetchLowMark);
+ // create the qpid session with an expiry <= 0 so that the session does not expire
+ _qpidSession = null; // todo when the connection is finalized _connection.getQpidConnection().createSession(0);
+ // set the exception listnere for this session
+ _qpidSession.setExceptionListener(new QpidSessionExceptionListener());
+ // set transacted if required
+ if (_transacted)
+ {
+ _qpidSession.txSelect();
+ }
+ }
+
+ /**
+ * Creates a new session on a connection with the default 0-10 message factory.
+ *
+ * @param con The connection on which to create the session.
+ * @param channelId The unique identifier for the session.
+ * @param transacted Indicates whether or not the session is transactional.
+ * @param acknowledgeMode The acknoledgement mode for the session.
+ * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
+ * @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
+ */
+ AMQSession_0_10(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
+ int defaultPrefetchLow)
+ {
+
+ this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefault010Registry(),
+ defaultPrefetchHigh, defaultPrefetchLow);
+ }
+
+ //------- 0-10 specific methods
+
+ /**
+ * Add a message tag to be acknowledged
+ * This is used for client ack mode
+ *
+ * @param tag The id of the message to be acknowledged
+ */
+ void addMessageTag(long tag)
+ {
+ _unacknowledgedMessageTags.add(tag);
+ }
+
+ //------- overwritten methods of class AMQSession
+
+ /**
+ * Acknowledge one or many messages.
+ *
+ * @param deliveryTag The tag of the last message to be acknowledged.
+ * @param multiple <tt>true</tt> to acknowledge all messages up to and including the one specified by the
+ * delivery tag, <tt>false</tt> to just acknowledge that message.
+ */
+ public void acknowledgeMessage(long deliveryTag, boolean multiple)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + _channelId);
+ }
+ // acknowledge this message
+ RangeSet ranges = new RangeSet();
+ if (multiple)
+ {
+ for (Long messageTag : _unacknowledgedMessageTags)
+ {
+ ranges.add(messageTag);
+ }
+ //empty the list of unack messages
+ _unacknowledgedMessageTags.clear();
+ }
+ else
+ {
+ ranges.add(deliveryTag);
+ _unacknowledgedMessageTags.remove(deliveryTag);
+ }
+ getQpidSession().messageAcknowledge(ranges);
+ }
+
+ /**
+ * Bind a queue with an exchange.
+ *
+ * @param queueName Specifies the name of the queue to bind. If the queue name is empty,
+ * refers to the current
+ * queue for the session, which is the last declared queue.
+ * @param exchangeName The exchange name.
+ * @param routingKey Specifies the routing key for the binding.
+ * @param arguments 0_8 specific
+ */
+ public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey,
+ final FieldTable arguments, final AMQShortString exchangeName)
+ throws AMQException, FailoverException
+ {
+ getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), routingKey.toString(), null);
+ // We need to sync so that we get notify of an error.
+ getQpidSession().sync();
+ getCurrentException();
+ }
+
+
+ /**
+ * Close this session.
+ *
+ * @param timeout no used / 0_8 specific
+ * @throws AMQException
+ * @throws FailoverException
+ */
+ public void sendClose(long timeout) throws AMQException, FailoverException
+ {
+ getQpidSession().sessionClose();
+ // We need to sync so that we get notify of an error.
+ getQpidSession().sync();
+ getCurrentException();
+ }
+
+ /**
+ * Commit the receipt and the delivery of all messages exchanged by this session resources.
+ */
+ public void sendCommit() throws AMQException, FailoverException
+ {
+ getQpidSession().txCommit();
+ // We need to sync so that we get notify of an error.
+ getQpidSession().sync();
+ getCurrentException();
+ }
+
+ /**
+ * Create a queue with a given name.
+ *
+ * @param name The queue name
+ * @param autoDelete If this field is set and the exclusive field is also set,
+ * then the queue is deleted when the connection closes.
+ * @param durable If set when creating a new queue,
+ * the queue will be marked as durable.
+ * @param exclusive Exclusive queues can only be used from one connection at a time.
+ * @throws AMQException
+ * @throws FailoverException
+ */
+ public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
+ final boolean exclusive) throws AMQException, FailoverException
+ {
+ getQpidSession().queueDeclare(name.toString(), null, null, durable ? Option.DURABLE : Option.NO_OPTION,
+ autoDelete ? Option.AUTO_DELETE : Option.NO_OPTION,
+ exclusive ? Option.EXCLUSIVE : Option.NO_OPTION);
+ // We need to sync so that we get notify of an error.
+ getQpidSession().sync();
+ getCurrentException();
+ }
+
+ /**
+ * This method asks the broker to redeliver all unacknowledged messages
+ *
+ * @throws AMQException
+ * @throws FailoverException
+ */
+ public void sendRecover() throws AMQException, FailoverException
+ {
+ // release all unack messages
+ RangeSet ranges = new RangeSet();
+ for (long messageTag : _unacknowledgedMessageTags)
+ {
+ // release this message
+ ranges.add(messageTag);
+ }
+ getQpidSession().messageRelease(ranges);
+ // We need to sync so that we get notify of an error.
+ getQpidSession().sync();
+ getCurrentException();
+ }
+
+ /**
+ * Release (0_8 notion of Reject) an acquired message
+ *
+ * @param deliveryTag the message ID
+ * @param requeue always true
+ */
+ public void rejectMessage(long deliveryTag, boolean requeue)
+ {
+ // The value of requeue is always true
+ RangeSet ranges = new RangeSet();
+ ranges.add(deliveryTag);
+ getQpidSession().messageRelease(ranges);
+ //I don't think we need to sync
+ }
+
+ /**
+ * Create an 0_10 message consumer
+ */
+ public BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
+ final int prefetchLow, final boolean noLocal,
+ final boolean exclusive, String messageSelector,
+ final FieldTable ft, final boolean noConsume,
+ final boolean autoClose)
+ {
+
+ final AMQProtocolHandler protocolHandler = getProtocolHandler();
+ return new BasicMessageConsumer_0_10(_channelId, _connection, destination, messageSelector, noLocal,
+ _messageFactoryRegistry, this, protocolHandler, ft, prefetchHigh,
+ prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose);
+ }
+
+ /**
+ * Bind a queue with an exchange.
+ */
+ public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName,
+ final AMQShortString routingKey) throws JMSException
+ {
+ getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), routingKey.toString(), null);
+ // we asume that a binding is always successful
+ return true;
+ }
+
+ /**
+ * This method is invoked when a consumer is creted
+ * Registers the consumer with the broker
+ */
+ public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
+ boolean nowait, String messageSelector, AMQShortString tag)
+ throws AMQException, FailoverException
+ {
+ getQpidSession().messageSubscribe(queueName.toString(), tag.toString(), Session.TRANSFER_CONFIRM_MODE_REQUIRED,
+ Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
+ new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer),
+ null, consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION,
+ consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
+ // We need to sync so that we get notify of an error.
+ getQpidSession().sync();
+ getCurrentException();
+ }
+
+ /**
+ * Create an 0_10 message producer
+ */
+ public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+ final boolean immediate, final boolean waitUntilSent,
+ long producerId)
+ {
+ return new BasicMessageProducer_0_10(_connection, (AMQDestination) destination, _transacted, _channelId, this,
+ getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
+
+ }
+
+ /**
+ * creates an exchange if it does not already exist
+ */
+ public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type,
+ final AMQProtocolHandler protocolHandler, final boolean nowait)
+ throws AMQException, FailoverException
+ {
+ getQpidSession().exchangeDeclare(name.toString(), type.toString(), null, null);
+ // autoDelete --> false
+ // durable --> false
+ // passive -- false
+ // We need to sync so that we get notify of an error.
+ getQpidSession().sync();
+ getCurrentException();
+ }
+
+ /**
+ * Declare a queue with the given queueName
+ */
+ public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)
+ throws AMQException, FailoverException
+ {
+ getQpidSession().queueDeclare(amqd.getAMQQueueName().toString(), null, null,
+ amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NO_OPTION,
+ amqd.isDurable() ? Option.DURABLE : Option.NO_OPTION,
+ amqd.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
+ // passive --> false
+ // We need to sync so that we get notify of an error.
+ getQpidSession().sync();
+ getCurrentException();
+ }
+
+ /**
+ * deletes a queue
+ */
+ public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException
+ {
+ getQpidSession().queueDelete(queueName.toString());
+ // ifEmpty --> false
+ // ifUnused --> false
+ // We need to sync so that we get notify of an error.
+ getQpidSession().sync();
+ getCurrentException();
+ }
+
+ /**
+ * Activate/deactivate the message flow for all the consumers of this session.
+ */
+ public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException
+ {
+ if (suspend)
+ {
+ for (BasicMessageConsumer consumer : _consumers.values())
+ {
+ getQpidSession().messageStop(consumer.getConsumerTag().toString());
+ }
+ }
+ else
+ {
+ for (BasicMessageConsumer consumer : _consumers.values())
+ {
+ getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ MAX_PREFETCH);
+ }
+ }
+ // We need to sync so that we get notify of an error.
+ getQpidSession().sync();
+ getCurrentException();
+ }
+
+
+ //------ Private methods
+ /**
+ * Access to the underlying Qpid Session
+ *
+ * @return The associated Qpid Session.
+ */
+ protected org.apache.qpidity.client.Session getQpidSession()
+ {
+ return _qpidSession;
+ }
+
+
+ /**
+ * Get the latest thrown exception.
+ *
+ * @throws org.apache.qpid.AMQException get the latest thrown error.
+ */
+ public synchronized void getCurrentException() throws AMQException
+ {
+ if (_currentException != null)
+ {
+ QpidException toBeTrhown = _currentException;
+ _currentException = null;
+ throw new AMQException(AMQConstant.getConstant(toBeTrhown.getErrorCode().getCode()),
+ toBeTrhown.getMessage(), toBeTrhown);
+ }
+ }
+
+ //------ Inner classes
+ /**
+ * Lstener for qpid protocol exceptions
+ */
+ private class QpidSessionExceptionListener implements org.apache.qpidity.client.ExceptionListener
+ {
+ public void onException(QpidException exception)
+ {
+ synchronized (this)
+ {
+ //todo check the error code for finding out if we need to notify the
+ // JMS connection exception listener
+ _currentException = exception;
+ }
+ }
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=574221&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Mon Sep 10 05:35:27 2007
@@ -0,0 +1,112 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.UnprocessedMessage_0_10;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.Struct;
+
+import javax.jms.JMSException;
+import java.io.IOException;
+
+/**
+ * This is a 0.10 message consumer.
+ */
+public class BasicMessageConsumer_0_10 extends BasicMessageConsumer
+ implements org.apache.qpidity.client.util.MessageListener
+{
+ /**
+ * This class logger
+ */
+ protected final Logger _logger = LoggerFactory.getLogger(getClass());
+
+ protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
+ String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
+ AMQSession session, AMQProtocolHandler protocolHandler,
+ FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ {
+ super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
+ rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
+
+ }
+
+ // ----- Interface org.apache.qpidity.client.util.MessageListener
+ public void onMessage(Message message)
+ {
+ int channelId = getSession().getChannelId();
+ long deliveryId = message.getMessageTransferId();
+ String consumerTag = getConsumerTag().toString();
+ AMQShortString exchange = new AMQShortString(message.getDeliveryProperties().getExchange());
+ AMQShortString routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
+ boolean redelivered = message.getDeliveryProperties().getRedelivered();
+ UnprocessedMessage_0_10 newMessage =
+ new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
+ try
+ {
+ newMessage.receiveBody(message.readData());
+ }
+ catch (IOException e)
+ {
+ getSession().getAMQConnection().exceptionReceived(e);
+ }
+ Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
+ newMessage.setContentHeader(headers);
+ getSession().messageReceived(newMessage);
+ }
+
+ //----- overwritten methods
+
+ /**
+ * This method is invoked when this consumer is stopped.
+ * It tells the broker to stop delivering messages to this consumer.
+ */
+ public void sendCancel() throws JMSAMQException
+ {
+ ((AMQSession_0_10) getSession()).getQpidSession().messageStop(getConsumerTag().toString());
+ ((AMQSession_0_10) getSession()).getQpidSession().sync();
+ try
+ {
+ ((AMQSession_0_10) getSession()).getCurrentException();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException("Problem when stopping consumer", e);
+ }
+ }
+
+ /**
+ * This is invoked just before a message is delivered to the jms consumer
+ */
+ void postDeliver(AbstractJMSMessage msg) throws JMSException
+ {
+ // notify the session
+ ((AMQSession_0_10) getSession()).addMessageTag(msg.getDeliveryTag());
+ super.postDeliver(msg);
+ }
+
+}
\ No newline at end of file
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=574221&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Mon Sep 10 05:35:27 2007
@@ -0,0 +1,150 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpidity.jms.message.MessageImpl;
+import org.apache.qpidity.jms.message.MessageHelper;
+import org.apache.qpidity.jms.ExceptionHelper;
+import org.apache.qpidity.QpidException;
+
+import javax.jms.Message;
+import javax.jms.JMSException;
+import java.util.UUID;
+import java.io.IOException;
+
+/**
+ *
+ * This is a 0_10 message producer.
+ */
+public class BasicMessageProducer_0_10 extends BasicMessageProducer
+{
+
+ BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
+ AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
+ boolean immediate, boolean mandatory, boolean waitUntilSent)
+ {
+ super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate,
+ mandatory, waitUntilSent);
+ }
+
+ public void declareDestination(AMQDestination destination)
+ {
+ // Declare the exchange
+ // Note that the durable and internal arguments are ignored since passive is set to false
+ AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(), null,
+ // arguments
+ false, // autoDelete
+ false, // durable
+ destination.getExchangeName(), // exchange
+ false, // internal
+ true, // nowait
+ false, // passive
+ _session.getTicket(), // ticket
+ destination.getExchangeClass()); // type
+ _protocolHandler.writeFrame(declare);
+ }
+
+ //--- Overwritten methods
+
+
+ /**
+ * Sends a message to a given destination
+ * We will always convert the received message
+ */
+ public void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
+ int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate,
+ boolean wait) throws JMSException
+ {
+ // Only get current time if required
+ long currentTime = Long.MIN_VALUE;
+ if (!((timeToLive == 0) && _disableTimestamps))
+ {
+ currentTime = System.currentTimeMillis();
+ }
+ // the messae UID
+ String uid = (getDisableMessageID()) ? "MSG_ID_DISABLED" : UUID.randomUUID().toString();
+ MessageImpl qpidMessage;
+ // check that the message is not a foreign one
+ try
+ {
+ qpidMessage = (MessageImpl) origMessage;
+ }
+ catch (ClassCastException cce)
+ {
+ // this is a foreign message
+ qpidMessage = MessageHelper.transformMessage(origMessage);
+ // set message's properties in case they are queried after send.
+ origMessage.setJMSDestination(destination);
+ origMessage.setJMSDeliveryMode(deliveryMode);
+ origMessage.setJMSPriority(priority);
+ origMessage.setJMSMessageID(uid);
+ if (timeToLive != 0)
+ {
+ origMessage.setJMSExpiration(timeToLive + currentTime);
+ _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
+ }
+ else
+ {
+ origMessage.setJMSExpiration(timeToLive);
+ }
+ origMessage.setJMSTimestamp(currentTime);
+ }
+ // set the message properties
+ qpidMessage.setJMSDestination(destination);
+ qpidMessage.setJMSMessageID(uid);
+ qpidMessage.setJMSDeliveryMode(deliveryMode);
+ qpidMessage.setJMSPriority(priority);
+ if (timeToLive != 0)
+ {
+ qpidMessage.setJMSExpiration(timeToLive + currentTime);
+ }
+ else
+ {
+ qpidMessage.setJMSExpiration(timeToLive);
+ }
+ qpidMessage.setJMSTimestamp(currentTime);
+ qpidMessage.setRoutingKey(destination.getDestinationName().toString());
+ qpidMessage.setExchangeName(destination.getExchangeName().toString());
+ // call beforeMessageDispatch
+ try
+ {
+ qpidMessage.beforeMessageDispatch();
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ try
+ {
+ ((AMQSession_0_10) getSession()).getQpidSession().messageTransfer(qpidMessage.getExchangeName(),
+ qpidMessage.getQpidityMessage(),
+ org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
+ org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+ }
+ catch (IOException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+}
+
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
------------------------------------------------------------------------------
svn:eol-style = native