You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/10 14:35:28 UTC

svn commit: r574221 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: AMQSession_0_10.java BasicMessageConsumer_0_10.java BasicMessageProducer_0_10.java

Author: arnaudsimon
Date: Mon Sep 10 05:35:27 2007
New Revision: 574221

URL: http://svn.apache.org/viewvc?rev=574221&view=rev
Log: (empty)

Added:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java   (with props)

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=574221&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Mon Sep 10 05:35:27 2007
@@ -0,0 +1,445 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpidity.client.Session;
+import org.apache.qpidity.client.util.MessagePartListenerAdapter;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Destination;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This is a 0.10 Session
+ */
+public class AMQSession_0_10 extends AMQSession
+{
+
+    /**
+     * This class logger
+     */
+    private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class);
+
+    /**
+     * The maximum number of pre-fetched messages per destination
+     */
+    private static final long MAX_PREFETCH = 100;
+
+    /**
+     * The underlying QpidSession
+     */
+    private Session _qpidSession;
+
+    /**
+     * The latest qpid Exception that has been reaised.
+     */
+    private QpidException _currentException;
+
+    /**
+     * All the not yet acknoledged message tags
+     */
+    private ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
+
+    //--- constructors
+
+    /**
+     * Creates a new session on a connection.
+     *
+     * @param con                     The connection on which to create the session.
+     * @param channelId               The unique identifier for the session.
+     * @param transacted              Indicates whether or not the session is transactional.
+     * @param acknowledgeMode         The acknoledgement mode for the session.
+     * @param messageFactoryRegistry  The message factory factory for the session.
+     * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
+     * @param defaultPrefetchLowMark  The number of prefetched messages at which to resume the session.
+     */
+    AMQSession_0_10(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
+                    MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark,
+                    int defaultPrefetchLowMark)
+    {
+
+        super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
+              defaultPrefetchLowMark);
+        // create the qpid session with an expiry  <= 0 so that the session does not expire
+        _qpidSession = null; // todo when the connection is finalized _connection.getQpidConnection().createSession(0);
+        // set the exception listnere for this session
+        _qpidSession.setExceptionListener(new QpidSessionExceptionListener());
+        // set transacted if required
+        if (_transacted)
+        {
+            _qpidSession.txSelect();
+        }
+    }
+
+    /**
+     * Creates a new session on a connection with the default 0-10 message factory.
+     *
+     * @param con                 The connection on which to create the session.
+     * @param channelId           The unique identifier for the session.
+     * @param transacted          Indicates whether or not the session is transactional.
+     * @param acknowledgeMode     The acknoledgement mode for the session.
+     * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
+     * @param defaultPrefetchLow  The number of prefetched messages at which to resume the session.
+     */
+    AMQSession_0_10(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
+                    int defaultPrefetchLow)
+    {
+
+        this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefault010Registry(),
+             defaultPrefetchHigh, defaultPrefetchLow);
+    }
+
+    //------- 0-10 specific methods
+
+    /**
+     * Add a message tag to be acknowledged
+     * This is used for client ack mode
+     *
+     * @param tag The id of the message to be acknowledged
+     */
+    void addMessageTag(long tag)
+    {
+        _unacknowledgedMessageTags.add(tag);
+    }
+
+    //------- overwritten methods of class AMQSession
+
+    /**
+     * Acknowledge one or many messages.
+     *
+     * @param deliveryTag The tag of the last message to be acknowledged.
+     * @param multiple    <tt>true</tt> to acknowledge all messages up to and including the one specified by the
+     *                    delivery tag, <tt>false</tt> to just acknowledge that message.
+     */
+    public void acknowledgeMessage(long deliveryTag, boolean multiple)
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + _channelId);
+        }
+        // acknowledge this message
+        RangeSet ranges = new RangeSet();
+        if (multiple)
+        {
+            for (Long messageTag : _unacknowledgedMessageTags)
+            {
+                ranges.add(messageTag);
+            }
+            //empty the list of unack messages
+            _unacknowledgedMessageTags.clear();
+        }
+        else
+        {
+            ranges.add(deliveryTag);
+            _unacknowledgedMessageTags.remove(deliveryTag);
+        }
+        getQpidSession().messageAcknowledge(ranges);
+    }
+
+    /**
+     * Bind a queue with an exchange.
+     *
+     * @param queueName    Specifies the name of the queue to bind. If the queue name is empty,
+     *                     refers to the current
+     *                     queue for the session, which is the last declared queue.
+     * @param exchangeName The exchange name.
+     * @param routingKey   Specifies the routing key for the binding.
+     * @param arguments    0_8 specific
+     */
+    public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey,
+                              final FieldTable arguments, final AMQShortString exchangeName)
+            throws AMQException, FailoverException
+    {
+        getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), routingKey.toString(), null);
+        // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
+
+    /**
+     * Close this session.
+     *
+     * @param timeout no used / 0_8 specific
+     * @throws AMQException
+     * @throws FailoverException
+     */
+    public void sendClose(long timeout) throws AMQException, FailoverException
+    {
+        getQpidSession().sessionClose();
+        // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
+    /**
+     * Commit the receipt and the delivery of all messages exchanged by this session resources.
+     */
+    public void sendCommit() throws AMQException, FailoverException
+    {
+        getQpidSession().txCommit();
+        // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
+    /**
+     * Create a queue with a given name.
+     *
+     * @param name       The queue name
+     * @param autoDelete If this field is set and the exclusive field is also set,
+     *                   then the queue is deleted when the connection closes.
+     * @param durable    If set when creating a new queue,
+     *                   the queue will be marked as durable.
+     * @param exclusive  Exclusive queues can only be used from one connection at a time.
+     * @throws AMQException
+     * @throws FailoverException
+     */
+    public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
+                                final boolean exclusive) throws AMQException, FailoverException
+    {
+        getQpidSession().queueDeclare(name.toString(), null, null, durable ? Option.DURABLE : Option.NO_OPTION,
+                                      autoDelete ? Option.AUTO_DELETE : Option.NO_OPTION,
+                                      exclusive ? Option.EXCLUSIVE : Option.NO_OPTION);
+        // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
+    /**
+     * This method asks the broker to redeliver all unacknowledged messages
+     *
+     * @throws AMQException
+     * @throws FailoverException
+     */
+    public void sendRecover() throws AMQException, FailoverException
+    {
+        // release all unack messages
+        RangeSet ranges = new RangeSet();
+        for (long messageTag : _unacknowledgedMessageTags)
+        {
+            // release this message           
+            ranges.add(messageTag);
+        }
+        getQpidSession().messageRelease(ranges);
+        // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
+    /**
+     * Release (0_8 notion of Reject) an acquired message
+     *
+     * @param deliveryTag the message ID
+     * @param requeue     always true
+     */
+    public void rejectMessage(long deliveryTag, boolean requeue)
+    {
+        // The value of requeue is always true
+        RangeSet ranges = new RangeSet();
+        ranges.add(deliveryTag);
+        getQpidSession().messageRelease(ranges);
+        //I don't think we need to sync
+    }
+
+    /**
+     * Create an 0_10 message consumer
+     */
+    public BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
+                                                      final int prefetchLow, final boolean noLocal,
+                                                      final boolean exclusive, String messageSelector,
+                                                      final FieldTable ft, final boolean noConsume,
+                                                      final boolean autoClose)
+    {
+
+        final AMQProtocolHandler protocolHandler = getProtocolHandler();
+        return new BasicMessageConsumer_0_10(_channelId, _connection, destination, messageSelector, noLocal,
+                                             _messageFactoryRegistry, this, protocolHandler, ft, prefetchHigh,
+                                             prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose);
+    }
+
+    /**
+     * Bind a queue with an exchange.
+     */
+    public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName,
+                                final AMQShortString routingKey) throws JMSException
+    {
+        getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), routingKey.toString(), null);
+        // we asume that a binding is always successful
+        return true;
+    }
+
+    /**
+     * This method is invoked when a consumer is creted
+     * Registers the consumer with the broker
+     */
+    public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
+                            boolean nowait, String messageSelector, AMQShortString tag)
+            throws AMQException, FailoverException
+    {
+        getQpidSession().messageSubscribe(queueName.toString(), tag.toString(), Session.TRANSFER_CONFIRM_MODE_REQUIRED,
+                                          Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
+                                          new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer),
+                                          null, consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION,
+                                          consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
+        // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
+    /**
+     * Create an 0_10 message producer
+     */
+    public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+                                                      final boolean immediate, final boolean waitUntilSent,
+                                                      long producerId)
+    {
+        return new BasicMessageProducer_0_10(_connection, (AMQDestination) destination, _transacted, _channelId, this,
+                                             getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
+
+    }
+
+    /**
+     * creates an exchange if it does not already exist
+     */
+    public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type,
+                                    final AMQProtocolHandler protocolHandler, final boolean nowait)
+            throws AMQException, FailoverException
+    {
+        getQpidSession().exchangeDeclare(name.toString(), type.toString(), null, null);
+        // autoDelete --> false
+        // durable --> false
+        // passive -- false
+        // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
+    /**
+     * Declare a queue with the given queueName
+     */
+    public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)
+            throws AMQException, FailoverException
+    {
+        getQpidSession().queueDeclare(amqd.getAMQQueueName().toString(), null, null,
+                                      amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NO_OPTION,
+                                      amqd.isDurable() ? Option.DURABLE : Option.NO_OPTION,
+                                      amqd.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
+        // passive --> false
+        // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
+    /**
+     * deletes a queue
+     */
+    public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException
+    {
+        getQpidSession().queueDelete(queueName.toString());
+        // ifEmpty --> false
+        // ifUnused --> false
+        // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
+    /**
+     * Activate/deactivate the message flow for all the consumers of this session.
+     */
+    public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException
+    {
+        if (suspend)
+        {
+            for (BasicMessageConsumer consumer : _consumers.values())
+            {
+                getQpidSession().messageStop(consumer.getConsumerTag().toString());
+            }
+        }
+        else
+        {
+            for (BasicMessageConsumer consumer : _consumers.values())
+            {
+                getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE,
+                                             MAX_PREFETCH);
+            }
+        }
+        // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
+
+    //------ Private methods
+    /**
+     * Access to the underlying Qpid Session
+     *
+     * @return The associated Qpid Session.
+     */
+    protected org.apache.qpidity.client.Session getQpidSession()
+    {
+        return _qpidSession;
+    }
+
+
+    /**
+     * Get the latest thrown exception.
+     *
+     * @throws org.apache.qpid.AMQException get the latest thrown error.
+     */
+    public synchronized void getCurrentException() throws AMQException
+    {
+        if (_currentException != null)
+        {
+            QpidException toBeTrhown = _currentException;
+            _currentException = null;
+            throw new AMQException(AMQConstant.getConstant(toBeTrhown.getErrorCode().getCode()),
+                                   toBeTrhown.getMessage(), toBeTrhown);
+        }
+    }
+
+    //------ Inner classes
+    /**
+     * Lstener for qpid protocol exceptions
+     */
+    private class QpidSessionExceptionListener implements org.apache.qpidity.client.ExceptionListener
+    {
+        public void onException(QpidException exception)
+        {
+            synchronized (this)
+            {
+                //todo check the error code for finding out if we need to notify the
+                // JMS connection exception listener
+                _currentException = exception;
+            }
+        }
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=574221&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Mon Sep 10 05:35:27 2007
@@ -0,0 +1,112 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.UnprocessedMessage_0_10;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.Struct;
+
+import javax.jms.JMSException;
+import java.io.IOException;
+
+/**
+ * This is a 0.10 message consumer.
+ */
+public class BasicMessageConsumer_0_10 extends BasicMessageConsumer
+        implements org.apache.qpidity.client.util.MessageListener
+{
+    /**
+     * This class logger
+     */
+    protected final Logger _logger = LoggerFactory.getLogger(getClass());
+
+    protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
+                                        String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
+                                        AMQSession session, AMQProtocolHandler protocolHandler,
+                                        FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+                                        boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+    {
+        super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
+              rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
+
+    }
+
+    // ----- Interface org.apache.qpidity.client.util.MessageListener
+    public void onMessage(Message message)
+    {
+        int channelId = getSession().getChannelId();
+        long deliveryId = message.getMessageTransferId();
+        String consumerTag = getConsumerTag().toString();
+        AMQShortString exchange = new AMQShortString(message.getDeliveryProperties().getExchange());
+        AMQShortString routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
+        boolean redelivered = message.getDeliveryProperties().getRedelivered();
+        UnprocessedMessage_0_10 newMessage =
+                new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
+        try
+        {
+            newMessage.receiveBody(message.readData());
+        }
+        catch (IOException e)
+        {
+            getSession().getAMQConnection().exceptionReceived(e);
+        }
+        Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
+        newMessage.setContentHeader(headers);
+        getSession().messageReceived(newMessage);
+    }
+
+    //----- overwritten methods
+
+    /**
+     * This method is invoked when this consumer is stopped.
+     * It tells the broker to stop delivering messages to this consumer.
+     */
+    public void sendCancel() throws JMSAMQException
+    {
+        ((AMQSession_0_10) getSession()).getQpidSession().messageStop(getConsumerTag().toString());
+        ((AMQSession_0_10) getSession()).getQpidSession().sync();
+        try
+        {
+            ((AMQSession_0_10) getSession()).getCurrentException();
+        }
+        catch (AMQException e)
+        {
+            throw new JMSAMQException("Problem when stopping consumer", e);
+        }
+    }
+
+    /**
+     * This is invoked just before a message is delivered to the jms consumer
+     */
+    void postDeliver(AbstractJMSMessage msg) throws JMSException
+    {
+        // notify the session
+        ((AMQSession_0_10) getSession()).addMessageTag(msg.getDeliveryTag());
+        super.postDeliver(msg);
+    }
+  
+}
\ No newline at end of file

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=574221&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Mon Sep 10 05:35:27 2007
@@ -0,0 +1,150 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpidity.jms.message.MessageImpl;
+import org.apache.qpidity.jms.message.MessageHelper;
+import org.apache.qpidity.jms.ExceptionHelper;
+import org.apache.qpidity.QpidException;
+
+import javax.jms.Message;
+import javax.jms.JMSException;
+import java.util.UUID;
+import java.io.IOException;
+
+/**
+ *
+ *  This is a 0_10 message producer. 
+ */
+public class BasicMessageProducer_0_10 extends BasicMessageProducer
+{
+
+    BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
+                              AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
+                              boolean immediate, boolean mandatory, boolean waitUntilSent)
+    {
+        super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate,
+              mandatory, waitUntilSent);
+    }
+
+    public void declareDestination(AMQDestination destination)
+    {
+        // Declare the exchange
+        // Note that the durable and internal arguments are ignored since passive is set to false
+        AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+                                                              _protocolHandler.getProtocolMinorVersion(), null,
+                                                              // arguments
+                                                              false, // autoDelete
+                                                              false, // durable
+                                                              destination.getExchangeName(), // exchange
+                                                              false, // internal
+                                                              true, // nowait
+                                                              false, // passive
+                                                              _session.getTicket(), // ticket
+                                                              destination.getExchangeClass()); // type
+        _protocolHandler.writeFrame(declare);
+    }
+
+    //--- Overwritten methods
+
+
+    /**
+     * Sends a message to a given destination
+     * We will always convert the received message
+     */
+    public void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
+                            int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate,
+                            boolean wait) throws JMSException
+    {
+        // Only get current time if required
+        long currentTime = Long.MIN_VALUE;
+        if (!((timeToLive == 0) && _disableTimestamps))
+        {
+            currentTime = System.currentTimeMillis();
+        }
+        // the messae UID
+        String uid = (getDisableMessageID()) ? "MSG_ID_DISABLED" : UUID.randomUUID().toString();
+        MessageImpl qpidMessage;
+        // check that the message is not a foreign one
+        try
+        {
+            qpidMessage = (MessageImpl) origMessage;
+        }
+        catch (ClassCastException cce)
+        {
+            // this is a foreign message
+            qpidMessage = MessageHelper.transformMessage(origMessage);
+            // set message's properties in case they are queried after send.
+            origMessage.setJMSDestination(destination);
+            origMessage.setJMSDeliveryMode(deliveryMode);
+            origMessage.setJMSPriority(priority);
+            origMessage.setJMSMessageID(uid);
+            if (timeToLive != 0)
+            {
+                origMessage.setJMSExpiration(timeToLive + currentTime);
+                _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
+            }
+            else
+            {
+                origMessage.setJMSExpiration(timeToLive);
+            }
+            origMessage.setJMSTimestamp(currentTime);
+        }
+        // set the message properties
+        qpidMessage.setJMSDestination(destination);
+        qpidMessage.setJMSMessageID(uid);
+        qpidMessage.setJMSDeliveryMode(deliveryMode);
+        qpidMessage.setJMSPriority(priority);
+        if (timeToLive != 0)
+        {
+            qpidMessage.setJMSExpiration(timeToLive + currentTime);
+        }
+        else
+        {
+            qpidMessage.setJMSExpiration(timeToLive);
+        }
+        qpidMessage.setJMSTimestamp(currentTime);
+        qpidMessage.setRoutingKey(destination.getDestinationName().toString());
+        qpidMessage.setExchangeName(destination.getExchangeName().toString());
+        // call beforeMessageDispatch
+        try
+        {
+            qpidMessage.beforeMessageDispatch();
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        try
+        {
+            ((AMQSession_0_10) getSession()).getQpidSession().messageTransfer(qpidMessage.getExchangeName(),
+                                                                              qpidMessage.getQpidityMessage(),
+                                                                              org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
+                                                                              org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+        }
+        catch (IOException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }       
+    }
+}
+

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
------------------------------------------------------------------------------
    svn:eol-style = native