You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/11 13:11:49 UTC

svn commit: r620468 [3/4] - in /incubator/qpid/branches/thegreatmerge: ./ qpid/bin/ qpid/cpp/ qpid/cpp/examples/ qpid/cpp/examples/examples/direct/ qpid/cpp/examples/examples/fanout/ qpid/cpp/examples/examples/pub-sub/ qpid/cpp/examples/examples/reques...

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid.start
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid.start?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid.start (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid.start Mon Feb 11 04:11:03 2008
@@ -1,21 +1,21 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-exec qpid-server -run:debug "$@"
\ No newline at end of file
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+qpid-server -run:debug "$@"

Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid.start
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid.start
------------------------------------------------------------------------------
    svn:executable = *

Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid.stop
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid.stop
------------------------------------------------------------------------------
    svn:executable = *

Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid.stopall
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid.stopall
------------------------------------------------------------------------------
    svn:executable = *

Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/runAll
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/runAll
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java Mon Feb 11 04:11:03 2008
@@ -25,6 +25,8 @@
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -79,6 +81,14 @@
             ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory");
             // create the connection
             Connection connection = conFac.createConnection();
+
+            connection.setExceptionListener(new ExceptionListener()
+            {
+                public void onException(JMSException e)
+                {
+                    e.printStackTrace();
+                }
+            });
 
             // Create a session on the connection
             // This session is a default choice of non-transacted and uses the auto acknowledge feature of a session.

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Feb 11 04:11:03 2008
@@ -116,7 +116,7 @@
     private String _virtualHost;
    
 
-    private ExceptionListener _exceptionListener;
+    protected ExceptionListener _exceptionListener;
 
     private ConnectionListener _connectionListener;
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Mon Feb 11 04:11:03 2008
@@ -11,11 +11,13 @@
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.Session;
 import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.ClosedListener;
+import org.apache.qpidity.ErrorCode;
 import org.apache.qpidity.QpidException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate
+public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ClosedListener
 {
     /**
      * This class logger.
@@ -109,6 +111,7 @@
             }
             _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
                                     _conn.getUsername(), _conn.getPassword());
+            _qpidConnection.setClosedListener(this);
         }
         catch (QpidException e)
         {
@@ -137,5 +140,23 @@
             throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot close connection", e);
         }
 
+    }
+
+    public void onClosed(ErrorCode errorCode, String reason, Throwable t)
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Received a connection close from the broker: Error code : " + errorCode.getCode());
+        }
+        if (_conn._exceptionListener != null)
+        {
+            JMSException ex = new JMSException(reason,String.valueOf(errorCode.getCode()));
+            if (t != null)
+            {
+                ex.initCause(t);
+            }
+
+            _conn._exceptionListener.onException(ex);
+        }
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Feb 11 04:11:03 2008
@@ -20,6 +20,43 @@
  */
 package org.apache.qpid.client;
 
+import java.io.Serializable;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
+
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidArgumentException;
@@ -44,7 +81,6 @@
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
-import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
@@ -53,44 +89,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import javax.jms.TransactionRolledBackException;
-import java.io.Serializable;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  *
  * <p/><table id="crc"><caption>CRC Card</caption>
@@ -185,14 +183,14 @@
      * keeps a record of subscriptions which have been created in the current instance. It does not remember
      * subscriptions between executions of the client.
      */
-    private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
+    protected final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
             new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
 
     /**
      * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
      * up in the {@link #_subscriptions} map.
      */
-    private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
+    protected final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
             new ConcurrentHashMap<BasicMessageConsumer, String>();
 
     /**
@@ -200,7 +198,7 @@
      *
      * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue.
      */
-    private final FlowControllingBlockingQueue _queue;
+    protected final FlowControllingBlockingQueue _queue;
 
     /**
      * Holds the highest received delivery tag.
@@ -279,10 +277,10 @@
     protected final boolean _immediatePrefetch;
 
     /** Indicates that warnings should be generated on violations of the strict AMQP. */
-    private final boolean _strictAMQP;
+    protected final boolean _strictAMQP;
 
     /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */
-    private final boolean _strictAMQPFATAL;
+    protected final boolean _strictAMQPFATAL;
     private final Object _messageDeliveryLock = new Object();
 
     /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
@@ -518,8 +516,8 @@
         if (_logger.isInfoEnabled())
         {
             StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
-            _logger.info("Closing session: " + this + ":"
-                         + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
+            _logger.info("Closing session: " + this); // + ":"
+                         // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
         }
 
         synchronized (_connection.getFailoverMutex())
@@ -781,6 +779,14 @@
                                   false, false);
     }
 
+    public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException
+    {
+        checkValidDestination(destination);
+
+        return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null,
+                                  false, false);
+    }
+
     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
     {
         checkValidDestination(destination);
@@ -831,70 +837,7 @@
                                   false);
     }
 
-    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
-    {
-
-        checkNotClosed();
-        AMQTopic origTopic = checkValidTopic(topic);
-        AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
-        TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
-        if (subscriber != null)
-        {
-            if (subscriber.getTopic().equals(topic))
-            {
-                throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
-                                                + name);
-            }
-            else
-            {
-                unsubscribe(name);
-            }
-        }
-        else
-        {
-            AMQShortString topicName;
-            if (topic instanceof AMQTopic)
-            {
-                topicName = ((AMQTopic) topic).getRoutingKey();
-            }
-            else
-            {
-                topicName = new AMQShortString(topic.getTopicName());
-            }
-
-            if (_strictAMQP)
-            {
-                if (_strictAMQPFATAL)
-                {
-                    throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
-                }
-                else
-                {
-                    _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
-                                 + "for creation durableSubscriber. Requesting queue deletion regardless.");
-                }
-
-                deleteQueue(dest.getAMQQueueName());
-            }
-            else
-            {
-                // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
-                // says we must trash the subscription.
-                if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
-                    && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
-                {
-                    deleteQueue(dest.getAMQQueueName());
-                }
-            }
-        }
-
-        subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
-
-        _subscriptions.put(name, subscriber);
-        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
-
-        return subscriber;
-    }
+    public abstract TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException;
 
     /** Note, currently this does not handle reuse of the same name with different topics correctly. */
     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
@@ -1387,7 +1330,7 @@
             {
                 // in Qpid the 0-8 spec was hacked to have a recover-ok method... this is bad
                 // in 0-9 we used the cleaner addition of a new sync recover method with its own ok
-                if(getProtocolVersion().equals(ProtocolVersion.v8_0))
+                if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
                 {
                     BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
                     _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class);
@@ -1985,7 +1928,7 @@
     /*
      * I could have combined the last 3 methods, but this way it improves readability
      */
-    private AMQTopic checkValidTopic(Topic topic) throws JMSException
+    protected AMQTopic checkValidTopic(Topic topic) throws JMSException
     {
         if (topic == null)
         {
@@ -2353,7 +2296,7 @@
      *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
-    private void deleteQueue(final AMQShortString queueName) throws JMSException
+    protected void deleteQueue(final AMQShortString queueName) throws JMSException
     {
         try
         {

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Mon Feb 11 04:11:03 2008
@@ -27,6 +27,7 @@
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpidity.nclient.Session;
 import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
 import org.apache.qpidity.ErrorCode;
@@ -38,13 +39,12 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.JMSException;
-import javax.jms.Destination;
-import javax.jms.TemporaryQueue;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.UUID;
 import java.util.Map;
-import java.util.HashMap;
+import java.util.Iterator;
 
 /**
  * This is a 0.10 Session
@@ -146,6 +146,25 @@
 
     //------- overwritten methods of class AMQSession
 
+     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
+            throws JMSException
+    {
+        checkNotClosed();
+        checkValidTopic(topic);
+        if( _subscriptions.containsKey(name))
+        {
+            _subscriptions.get(name).close();
+        }
+        AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
+        BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
+        TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
+        
+        _subscriptions.put(name, subscriber);
+        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+        return subscriber;
+    }
+
     /**
      * Acknowledge one or many messages.
      *
@@ -223,6 +242,25 @@
     }
 
     /**
+     * We need to release message that may be pre-fetched in the local queue
+     *
+     * @throws JMSException
+     */
+    public void close() throws JMSException
+    {
+        super.close();
+        // We need to release pre-fetched messages
+        Iterator messages=_queue.iterator();
+        while (messages.hasNext())
+        {
+            UnprocessedMessage message=(UnprocessedMessage) messages.next();
+            messages.remove();
+            rejectMessage(message, true);
+        }
+    }
+
+
+    /**
      * Commit the receipt and the delivery of all messages exchanged by this session resources.
      */
     public void sendCommit() throws AMQException, FailoverException
@@ -359,9 +397,17 @@
                                           consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION,
                                           consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
 
-        getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
+        getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
         getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
         // We need to sync so that we get notify of an error.
+        if(consumer.isStrated())
+        {
+            // set the flow
+            getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+                    org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+                    AMQSession_0_10.MAX_PREFETCH);
+
+        }
         getQpidSession().sync();
         getCurrentException();
     }
@@ -462,11 +508,11 @@
                 //only set if msg list is null
                 try
                 {
-                    if (consumer.getMessageListener() != null)
-                    {
+                 //   if (consumer.getMessageListener() != null)
+                 //   {
                         getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE,
                                                      MAX_PREFETCH);
-                    }
+                  //  }
                     getQpidSession()
                     .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
                 }
@@ -546,7 +592,7 @@
      */
     private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ClosedListener
     {
-        public void onClosed(ErrorCode errorCode, String reason)
+        public void onClosed(ErrorCode errorCode, String reason, Throwable t)
         {
             synchronized (this)
             {
@@ -579,8 +625,7 @@
 
     void start() throws AMQException
     {
-
-        super.suspendChannel(false);
+        suspendChannel(false);
         for(BasicMessageConsumer  c:  _consumers.values())
         {
               c.start();
@@ -592,16 +637,19 @@
         }
     }
 
-     void stop() throws AMQException
+
+
+
+    void stop() throws AMQException
     {
         super.stop();
-           for(BasicMessageConsumer  c:  _consumers.values())
+        for(BasicMessageConsumer  c:  _consumers.values())
         {
               c.stop();
         }
     }
 
-      synchronized void startDistpatcherIfNecessary()
+   synchronized void startDistpatcherIfNecessary()
     {
         // If IMMEDIATE_PREFETCH is not set then we need to start fetching
         if (!_immediatePrefetch)
@@ -621,5 +669,72 @@
         }
 
         startDistpatcherIfNecessary(false);
+    }
+
+
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
+    {
+
+        checkNotClosed();
+        AMQTopic origTopic=checkValidTopic(topic);
+        AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection);
+
+        TopicSubscriberAdaptor subscriber=_subscriptions.get(name);
+        if (subscriber != null)
+        {
+            if (subscriber.getTopic().equals(topic))
+            {
+                throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
+                        + name);
+            }
+            else
+            {
+                unsubscribe(name);
+            }
+        }
+        else
+        {
+            AMQShortString topicName;
+            if (topic instanceof AMQTopic)
+            {
+                topicName=((AMQTopic) topic).getRoutingKey();
+            }
+            else
+            {
+                topicName=new AMQShortString(topic.getTopicName());
+            }
+
+            if (_strictAMQP)
+            {
+                if (_strictAMQPFATAL)
+                {
+                    throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
+                }
+                else
+                {
+                    _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
+                            + "for creation durableSubscriber. Requesting queue deletion regardless.");
+                }
+
+                deleteQueue(dest.getAMQQueueName());
+            }
+            else
+            {
+                // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
+                // says we must trash the subscription.
+                if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
+                        && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+                {
+                    deleteQueue(dest.getAMQQueueName());
+                }
+            }
+        }
+
+        subscriber=new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
+
+        _subscriptions.put(name, subscriber);
+        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+        return subscriber;
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Mon Feb 11 04:11:03 2008
@@ -21,9 +21,8 @@
 package org.apache.qpid.client;
 
 
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.TemporaryQueue;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
@@ -132,7 +131,7 @@
         {
             // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
 
-            BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
+            BasicRecoverBody body = getProtocolHandler().getMethodRegistry().createBasicRecoverBody(false);
             _connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId));
             _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
         }
@@ -142,12 +141,12 @@
             // in 0-9 we used the cleaner addition of a new sync recover method with its own ok
             if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
             {
-                BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
+                BasicRecoverBody body = getProtocolHandler().getMethodRegistry().createBasicRecoverBody(false);
                 _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class);
             }
             else if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v0_9))
             {
-                BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
+                BasicRecoverSyncBody body = ((MethodRegistry_0_9)getProtocolHandler().getMethodRegistry()).createBasicRecoverSyncBody(false);
                 _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
             }
             else
@@ -166,7 +165,7 @@
                 _logger.debug("Rejecting delivery tag:" + deliveryTag);
             }
 
-            AMQFrame basicRejectBody = getMethodRegistry().createBasicRejectBody(deliveryTag, requeue).generateFrame(_channelId);
+            AMQFrame basicRejectBody = getProtocolHandler().getMethodRegistry().createBasicRejectBody(deliveryTag, requeue).generateFrame(_channelId);
 
             _connection.getProtocolHandler().writeFrame(basicRejectBody);
         }
@@ -182,7 +181,7 @@
                     {
                         public AMQMethodEvent execute() throws AMQException, FailoverException
                         {
-                            AMQFrame boundFrame = getMethodRegistry().createExchangeBoundBody
+                            AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody
                                                     (exchangeName, routingKey, queueName).generateFrame(_channelId);
 
                             return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
@@ -225,7 +224,7 @@
         // we must register the consumer in the map before we actually start listening
         _consumers.put(tag, consumer);
         // TODO: Be aware of possible changes to parameter order as versions change.
-        AMQFrame jmsConsume = getMethodRegistry().createBasicConsumeBody(getTicket(),
+        AMQFrame jmsConsume = getProtocolHandler().getMethodRegistry().createBasicConsumeBody(getTicket(),
                 queueName,
                 tag,
                 consumer.isNoLocal(),
@@ -247,7 +246,7 @@
     public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
             final boolean nowait) throws AMQException, FailoverException
     {
-        AMQFrame exchangeDeclare = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,false,false,false,false,nowait,null).
+        AMQFrame exchangeDeclare = getProtocolHandler().getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,false,false,false,false,nowait,null).
                                             generateFrame(_channelId);
 
         protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
@@ -255,14 +254,14 @@
 
     public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException
     {
-        AMQFrame queueDeclare = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null).generateFrame(_channelId);
+        AMQFrame queueDeclare = getProtocolHandler().getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null).generateFrame(_channelId);
 
         protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
     }
 
     public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException
     {
-        QueueDeleteBody body = getMethodRegistry().createQueueDeleteBody(getTicket(),
+        QueueDeleteBody body = getProtocolHandler().getMethodRegistry().createQueueDeleteBody(getTicket(),
                 queueName,
                 false,
                 false,
@@ -311,4 +310,70 @@
 
         return new AMQTemporaryQueue(this);
     }
+
+    public  TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
+       {
+
+           checkNotClosed();
+           AMQTopic origTopic = checkValidTopic(topic);
+           AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
+           TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+           if (subscriber != null)
+           {
+               if (subscriber.getTopic().equals(topic))
+               {
+                   throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
+                                                   + name);
+               }
+               else
+               {
+                   unsubscribe(name);
+               }
+           }
+           else
+           {
+               AMQShortString topicName;
+               if (topic instanceof AMQTopic)
+               {
+                   topicName = ((AMQTopic) topic).getRoutingKey();
+               }
+               else
+               {
+                   topicName = new AMQShortString(topic.getTopicName());
+               }
+
+               if (_strictAMQP)
+               {
+                   if (_strictAMQPFATAL)
+                   {
+                       throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
+                   }
+                   else
+                   {
+                       _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
+                                    + "for creation durableSubscriber. Requesting queue deletion regardless.");
+                   }
+
+                   deleteQueue(dest.getAMQQueueName());
+               }
+               else
+               {
+                   // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
+                   // says we must trash the subscription.
+                   if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
+                       && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+                   {
+                       deleteQueue(dest.getAMQQueueName());
+                   }
+               }
+           }
+
+           subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+
+           _subscriptions.put(name, subscriber);
+           _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+           return subscriber;
+       }
+
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Mon Feb 11 04:11:03 2008
@@ -71,12 +71,26 @@
               queueName, isDurable);
     }
 
+    protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+                               boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
+    {
+        super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable );
+    }
+
+
     public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
             throws JMSException
     {
         return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(), false,
                             getDurableTopicQueueName(subscriptionName, connection),
                             true);
+    }
+
+    public static AMQTopic createDurable010Topic(AMQTopic topic, String subscriptionName, AMQConnection connection)
+            throws JMSException
+    {
+        return new AMQTopic(topic.getExchangeName(), ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getRoutingKey(), true, false,
+              getDurableTopicQueueName(subscriptionName, connection), false);
     }
 
     public static AMQShortString getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Mon Feb 11 04:11:03 2008
@@ -85,7 +85,7 @@
 
     protected MessageFactoryRegistry _messageFactory;
 
-    private final AMQSession _session;
+    protected final AMQSession _session;
 
     protected AMQProtocolHandler _protocolHandler;
 
@@ -434,7 +434,23 @@
         }
     }
 
-    public abstract Object getMessageFromQueue(long l) throws InterruptedException;
+    public  Object getMessageFromQueue(long l) throws InterruptedException
+    {
+         Object o;
+         if (l > 0)
+         {
+             o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
+         }
+         else if (l < 0)
+         {
+             o = _synchronousQueue.poll();
+         }
+         else
+         {
+             o = _synchronousQueue.take();
+         }
+         return o;
+     }
 
     private boolean closeOnAutoClose() throws JMSException
     {
@@ -1105,6 +1121,12 @@
     public void stop()
     {
         // do nothing as this is a 0_10 feature
+    }
+
+    public boolean isStrated()
+    {
+        // do nothing as this is a 0_10 feature
+        return false;
     }
 
     public AMQShortString getQueuename()

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Mon Feb 11 04:11:03 2008
@@ -19,10 +19,7 @@
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.UnprocessedMessage_0_10;
+import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.AMQShortString;
@@ -41,8 +38,8 @@
 import javax.jms.MessageListener;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.Iterator;
 
 /**
  * This is a 0.10 message consumer.
@@ -50,15 +47,6 @@
 public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer>
         implements org.apache.qpidity.nclient.util.MessageListener
 {
-    /**
-     * A counter for keeping the number of available messages for this consumer
-     */
-    private final AtomicLong _messageCounter = new AtomicLong(0);
-
-    /**
-     * Number of received message so far
-     */
-    private final AtomicLong _messagesReceived = new AtomicLong(0);
 
     /**
      * This class logger
@@ -117,6 +105,11 @@
     // ----- Interface org.apache.qpidity.client.util.MessageListener
 
     /**
+     *
+     * This is invoked by the session thread when emptying the session message queue.
+     * We first check if the message is valid (match the selector) and then deliver it to the
+     * message listener or to the sync consumer queue.
+     *
      * @param jmsMessage this message has already been processed so can't redo preDeliver
      * @param channelId
      */
@@ -136,12 +129,6 @@
             }
             catch (Exception e1)
             {
-                // the receiver may be waiting for a message
-                if (_messageCounter.get() >= 0)
-                {
-                    _messageCounter.decrementAndGet();
-                    _synchronousQueue.add(new NullTocken());
-                }
                 // we should silently log thie exception as it only hanppens when the connection is closed
                 _logger.error("Exception when receiving message", e1);
             }
@@ -152,20 +139,15 @@
         }
     }
 
+
+
+    /**
+     * This method is invoked by the transport layer when a message is delivered for this
+     * consumer. The message is transformed and pass to the session.
+     * @param message an 0.10 message
+     */
     public void onMessage(Message message)
     {
-        if (isMessageListenerSet())
-        {
-            _messagesReceived.incrementAndGet();
-            if (_messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH)
-            {
-                // require more credit
-                _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
-                                                          org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
-                                                          AMQSession_0_10.MAX_PREFETCH);
-                _messagesReceived.set(0);
-            }
-        }
         int channelId = getSession().getChannelId();
         long deliveryId = message.getMessageTransferId();
         String consumerTag = getConsumerTag().toString();
@@ -207,8 +189,6 @@
             newMessage.setReplyToURL(replyToUrl);
         }
         newMessage.setContentHeader(headers);
-        // increase the counter of messages
-        _messageCounter.incrementAndGet();
         getSession().messageReceived(newMessage);
         // else ignore this message
     }
@@ -246,6 +226,8 @@
         //{
             super.postDeliver(msg);
         //}
+
+
     }
 
     void notifyMessage(UnprocessedMessage messageFrame, int channelId)
@@ -351,50 +333,9 @@
             }
             messageOk = acquireMessage(message);
         }
-        if (!messageOk)
-        {
-            requestCreditIfCreditMode();
-        }
         return messageOk;
     }
 
-    private void requestCreditIfCreditMode()
-    {
-        try
-        {
-            // the current message received is not good, so we need to get a message.
-            if (getMessageListener() == null)
-            {
-                int oldval = _messageCounter.intValue();
-                _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
-                                                          org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
-                                                          1);
-                _0_10session.getQpidSession()
-                        .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
-                _0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
-                _0_10session.getQpidSession().sync();
-                _0_10session.getQpidSession()
-                        .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
-                if (_messageCounter.intValue() <= oldval)
-                {
-                    // we haven't received a message so tell the receiver to return null
-                    _synchronousQueue.add(new NullTocken());
-                }
-                else
-                {
-                    _messageCounter.decrementAndGet();
-                }
-            }
-            // we now need to check if we have received a message
-
-        }
-        catch (Exception e)
-        {
-            _logger.error(
-                    "Error getting message listener, couldn't request credit after releasing a message that failed the selector test",
-                    e);
-        }
-    }
 
     /**
      * Acknowledge a message
@@ -469,16 +410,18 @@
         super.setMessageListener(messageListener);
         if (messageListener == null)
         {
-            _0_10session.getQpidSession().messageStop(getConsumerTag().toString());
+           /* _0_10session.getQpidSession().messageStop(getConsumerTag().toString());
             _0_10session.getQpidSession()
                     .messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
             _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
                                                       org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
                                                       0xFFFFFFFF);
             _0_10session.getQpidSession().sync();
+            */
         }
         else
         {
+            //TODO: empty the list of sync messages.
             if (_connection.started())
             {
                 _0_10session.getQpidSession()
@@ -490,66 +433,13 @@
                                                           org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
                                                           0xFFFFFFFF);
                 _0_10session.getQpidSession().sync();
-                _messagesReceived.set(0);
-                ;
-            }
-        }
-    }
-
-    public Object getMessageFromQueue(long l) throws InterruptedException
-    {
-        if (!_isStarted)
-        {
-            return null;
-        }
-        Object o;
-        _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
-                                                  org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
-
-        if (l == 0)
-        {
-            o = _synchronousQueue.take();
-        }
-        else
-        {
-            if (l > 0)
-            {
-                o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
-            }
-            else
-            {
-                o = _synchronousQueue.poll();
-            }
-            if (o == null)
-            {
-                _logger.debug("Message Didn't arrive in time, checking if one is inflight");
-                // checking if one is inflight
-                _0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
-                _0_10session.getQpidSession().sync();
-                _0_10session.getQpidSession()
-                        .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
-                if (_messageCounter.get() > 0)
-                {
-                    o = _synchronousQueue.take();
-                }
             }
         }
-        if (o instanceof NullTocken)
-        {
-            o = null;
-        }
-        return o;
     }
 
-    protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+    public boolean isStrated()
     {
-        _messageCounter.decrementAndGet();
-        super.preApplicationProcessing(jmsMsg);
-    }
-
-    private class NullTocken
-    {
-
+        return _isStarted;
     }
 
     public void start()
@@ -560,5 +450,18 @@
     public void stop()
     {
         _isStarted = false;
+    }
+
+    public void close() throws JMSException
+    {
+        super.close();
+        // release message that may be staged
+        Iterator messages=_synchronousQueue.iterator();
+        while (messages.hasNext())
+        {
+            AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
+            messages.remove();
+            _session.rejectMessage(message, true);
+        }
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Mon Feb 11 04:11:03 2008
@@ -86,22 +86,5 @@
             messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
 
     }
-
-     public Object getMessageFromQueue(long l) throws InterruptedException
-     {
-         Object o;
-         if (l > 0)
-         {
-             o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
-         }
-         else if (l < 0)
-         {
-             o = _synchronousQueue.poll();
-         }
-         else
-         {
-             o = _synchronousQueue.take();
-         }
-         return o;
-     }
+     
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Mon Feb 11 04:11:03 2008
@@ -739,4 +739,9 @@
         _consumer = basicMessageConsumer;
     }
 
+    public void receivedFromServer()
+    {
+        _changedData = false;
+    }
+
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Mon Feb 11 04:11:03 2008
@@ -140,7 +140,9 @@
         props.setType(mprop.getType());
         props.setUserId(mprop.getUserId());
         props.setHeaders(FiledTableSupport.convertToFieldTable(mprop.getApplicationHeaders()));        
-        return createMessage(messageNbr, data, exchange, routingKey, props);
+        AbstractJMSMessage message = createMessage(messageNbr, data, exchange, routingKey, props);
+        message.receivedFromServer();
+        return message;
     }
 
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java Mon Feb 11 04:11:03 2008
@@ -49,11 +49,33 @@
 
         ConnectionDelegate connectionDelegate = new ConnectionDelegate()
         {
+            private boolean receivedClose = false;
+
             public SessionDelegate getSessionDelegate()
             {
                 return new ClientSessionDelegate();
             }
 
+            public void exception(Throwable t)
+            {
+                if (_closedListner != null)
+                {
+                    _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),t);
+                }
+                else
+                {
+                    throw new RuntimeException("connection closed",t);
+                }
+            }
+
+            public void closed()
+            {
+                if (_closedListner != null && !this.receivedClose)
+                {
+                    _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),null);
+                }
+            }
+
             @Override public void connectionClose(Channel context, ConnectionClose connectionClose)
             {
                 ErrorCode errorCode = ErrorCode.get(connectionClose.getReplyCode());
@@ -67,8 +89,10 @@
                 }
                 else
                 {
-                    _closedListner.onClosed(errorCode, connectionClose.getReplyText());
+                    _closedListner.onClosed(errorCode, connectionClose.getReplyText(),null);
                 }
+
+                this.receivedClose = true;
             }
         };
 
@@ -79,6 +103,7 @@
 
         if (System.getProperty("transport","mina").equalsIgnoreCase("nio"))
         {
+            System.out.println("Using NIO");
             if( _logger.isDebugEnabled())
             {
                 _logger.debug("using NIO");
@@ -180,6 +205,7 @@
 
     public void setClosedListener(ClosedListener closedListner)
     {
+
         _closedListner = closedListner;
     }
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java Mon Feb 11 04:11:03 2008
@@ -32,8 +32,8 @@
      * informs the connection's ExceptionListener
      * @param errorCode TODO
      * @param reason TODO
-     *
+     * @param t TODO
      * @see Connection
      */
-    public void onClosed(ErrorCode errorCode, String reason);
+    public void onClosed(ErrorCode errorCode, String reason, Throwable t);
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java Mon Feb 11 04:11:03 2008
@@ -1,5 +1,7 @@
  package org.apache.qpidity.nclient;
 
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 
@@ -16,17 +18,17 @@
 
         try
         {
-            javax.jms.Connection con = new AMQConnection("qpid:password=guest;username=guest;client_id=clientid;virtualhost=test@tcp:127.0.0.1:5672");
+            javax.jms.Connection con = new AMQConnection("qpid:password=pass;username=name@tcp:localhost:5672");
             con.start();
 
             javax.jms.Session ssn = con.createSession(false, 1);
 
             javax.jms.Destination dest = new AMQQueue(new AMQShortString("direct"),"test");
             javax.jms.MessageConsumer cons = ssn.createConsumer(dest);
-            javax.jms.MessageProducer prod = ssn.createProducer(dest);
+            //javax.jms.MessageProducer prod = ssn.createProducer(dest);
 
-            //javax.jms.TextMessage m = (javax.jms.TextMessage)cons.receive();
-         /*   cons.setMessageListener(new MessageListener()
+            javax.jms.TextMessage m = null; // (javax.jms.TextMessage)cons.receive();
+           cons.setMessageListener(new MessageListener()
             {
                 public void onMessage(Message m)
                 {
@@ -41,9 +43,25 @@
                     }
                 }
 
-            });*/
+            });
 
-            javax.jms.TextMessage msg = ssn.createTextMessage();
+           con.setExceptionListener(new ExceptionListener()
+           {
+               public void onException(JMSException e)
+               {
+                   e.printStackTrace();
+               }
+           });
+
+           System.out.println("Waiting");
+           while (m == null)
+           {
+
+           }
+
+           System.out.println("Exiting");
+
+            /*javax.jms.TextMessage msg = ssn.createTextMessage();
             msg.setText("This is a test message");
             msg.setBooleanProperty("targetMessage", false);
             prod.send(msg);
@@ -60,7 +78,7 @@
             else
             {
                System.out.println("message is not null"  + m);
-            }
+            }*/
 
         }
         catch(Exception e)

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java Mon Feb 11 04:11:03 2008
@@ -189,7 +189,7 @@
 
     void notifyException(QpidException ex)
     {
-        _exceptionListner.onClosed(null, null);
+        _exceptionListner.onClosed(null, null, null);
     }
 
     Map<String,MessagePartListener> getMessageListerners()

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java Mon Feb 11 04:11:03 2008
@@ -40,7 +40,7 @@
         Session ssn = conn.createSession(50000);
         ssn.setClosedListener(new ClosedListener()
                 {
-                     public void onClosed(ErrorCode errorCode, String reason)
+                     public void onClosed(ErrorCode errorCode, String reason, Throwable t)
                      {
                          System.out.println("ErrorCode : " + errorCode + " reason : " + reason);
                      }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java Mon Feb 11 04:11:03 2008
@@ -43,7 +43,7 @@
         Session ssn = conn.createSession(50000);
         ssn.setClosedListener(new ClosedListener()
                 {
-                     public void onClosed(ErrorCode errorCode, String reason)
+                     public void onClosed(ErrorCode errorCode, String reason, Throwable t)
                      {
                          System.out.println("ErrorCode : " + errorCode + " reason : " + reason);
                      }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java Mon Feb 11 04:11:03 2008
@@ -124,7 +124,7 @@
         session.sync();
     }
 
-    public void onClosed(ErrorCode errorCode, String reason)
+    public void onClosed(ErrorCode errorCode, String reason, Throwable t)
     {
         System.out.println("------- Broker Notified an error --------");
         System.out.println("------- " + errorCode + " --------");

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java Mon Feb 11 04:11:03 2008
@@ -1163,7 +1163,7 @@
      */
     private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ClosedListener
     {
-        public void onClosed(ErrorCode errorCode, String reason)
+        public void onClosed(ErrorCode errorCode, String reason, Throwable t)
         {
             synchronized (this)
             {

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java Mon Feb 11 04:11:03 2008
@@ -82,8 +82,9 @@
     protected void tearDown() throws Exception
     {
         _connection.close();
+        super.tearDown();
     }
-                
+
     public void testSimpleReceiveConnection()
     {
         try

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java Mon Feb 11 04:11:03 2008
@@ -98,7 +98,7 @@
 
         if (_count < _expected)
         {
-            wait(1000000000);
+            wait(60000);
         }
 
         if (_count < _expected)

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java Mon Feb 11 04:11:03 2008
@@ -20,13 +20,12 @@
  */
 package org.apache.qpid.test.unit.close;
 
-import junit.framework.TestCase;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.testutil.QpidClientConnection;
+import org.apache.qpid.testutil.QpidTestCase;
 import org.apache.qpid.url.URLSyntaxException;
 
 import org.slf4j.Logger;
@@ -41,7 +40,7 @@
 
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class MessageRequeueTest extends TestCase
+public class MessageRequeueTest extends QpidTestCase
 {
     private static final Logger _logger = LoggerFactory.getLogger(MessageRequeueTest.class);
 
@@ -64,7 +63,8 @@
     protected void setUp() throws Exception
     {
         super.setUp();
-         conn = new QpidClientConnection(BROKER);
+
+        conn = new QpidClientConnection(BROKER);
 
         conn.connect();
         // clear queue
@@ -78,7 +78,6 @@
 
     protected void tearDown() throws Exception
     {
-        super.tearDown();
 
         if (!passed) // clean up
         {
@@ -91,6 +90,7 @@
             conn.disconnect();
         }
 
+        super.tearDown();
     }
 
     /**
@@ -125,7 +125,7 @@
             if (messageLog[msgindex] != 0)
             {
                 _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag()
-                    + ") more than once.");
+                              + ") more than once.");
             }
 
             if (_logger.isInfoEnabled())
@@ -144,16 +144,18 @@
             msg = consumer.receive(1000);
         }
 
-         _logger.info("consuming done.");
+        _logger.info("consuming done.");
         conn.getSession().commit();
         consumer.close();
-        assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived);
 
         int index = 0;
         StringBuilder list = new StringBuilder();
         list.append("Failed to receive:");
         int failed = 0;
 
+        _logger.info("consumed: " + messagesReceived);
+
+        assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived);
         // wit 0_10 we can have a delivery tag of 0
         if (conn.isBroker08())
         {
@@ -174,7 +176,7 @@
 
             assertEquals(list.toString(), 0, failed);
         }
-        _logger.info("consumed: " + messagesReceived);
+
         conn.disconnect();
         passed = true;
     }
@@ -208,7 +210,7 @@
         }
         catch (InterruptedException e)
         {
-            fail("Uanble to join to Consumer theads");
+            fail("Unable to join to Consumer theads");
         }
 
         _logger.info("consumer 1 count is " + c1.getCount());

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Mon Feb 11 04:11:03 2008
@@ -106,7 +106,7 @@
         AMQTopic topic = new AMQTopic(con, "MyTopic1" + String.valueOf(shutdown));
         AMQTopic topic2 = new AMQTopic(con, "MyOtherTopic1" + String.valueOf(shutdown));
 
-        TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+        TopicSession session1 = con.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
         TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
         TopicPublisher publisher = session1.createPublisher(null);
 
@@ -144,11 +144,11 @@
         AMQConnection con1 = (AMQConnection) getConnection("guest", "guest");
         AMQTopic topic = new AMQTopic(con1, "MyTopic3");
 
-        TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+        TopicSession session1 = con1.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
         TopicPublisher publisher = session1.createPublisher(topic);
 
         AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
-        TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+        TopicSession session2 = con2.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
         TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0");
 
         con2.start();

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Mon Feb 11 04:11:03 2008
@@ -467,6 +467,18 @@
         }
 
         result = _consumer.receive(1000);
+
+        if (isBroker08())
+        {
+            assertNotNull("test message was consumed and rolled back, but is gone", result);
+           // assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
+        }
+        else
+        {
+            assertNull("test message was consumed and not rolled back, but is redelivered", result);
+        }
+
+        result = _consumer.receive(1000);
         assertNull("test message should be null:" + result, result);
 
         _session.commit();

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java Mon Feb 11 04:11:03 2008
@@ -89,7 +89,7 @@
             prepCon = (AMQConnection) getConnection("guest", "guest");
 
             _logger.info("Create prep session");
-            prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+            prepSession = prepCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
 
             _logger.info("Create prep producer to Q1");
             prepProducer1 = prepSession.createProducer(queue1);
@@ -100,7 +100,7 @@
             _logger.info("Create test connection");
             testCon = (AMQConnection) getConnection("guest", "guest");
             _logger.info("Create test session");
-            testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+            testSession = testCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
             _logger.info("Create test consumer of q2");
             testConsumer2 = testSession.createConsumer(queue2);
         }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java Mon Feb 11 04:11:03 2008
@@ -81,11 +81,8 @@
             String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
             try
             {
-                AMQConnectionFactory factory = new AMQConnectionFactory(brokerUrl);
                 _logger.info("connecting to Qpid :" + brokerUrl);
-                //connection = factory.createConnection();
-                setUp();
-                 connection = getConnection("guest", "guest") ;
+                connection = getConnection("guest", "guest") ;
                 // register exception listener
                 connection.setExceptionListener(this);
 
@@ -112,7 +109,6 @@
             connection.close();
             connected = false;
             _logger.info("disconnected");
-            tearDown();
         }
     }
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java Mon Feb 11 04:11:03 2008
@@ -21,8 +21,8 @@
 
 import javax.jms.Connection;
 import javax.naming.InitialContext;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
+import java.io.InputStream;
+import java.io.IOException;
 
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.client.AMQConnection;
@@ -39,117 +39,101 @@
 public class QpidTestCase extends TestCase
 {
 
-    /* this clas logger */
     private static final Logger _logger = LoggerFactory.getLogger(QpidTestCase.class);
 
-    /* Test properties */
-    private static final String SHEL = "broker_shel";
-    private static final String BROKER_PATH = "broker_path";
-    private static final String BROKER_PARAM = "broker_param";
-    private static final String BROKER_VERSION  = "broker_version";
-    public static final String BROKER_08 = "08";
-    private static final String BROKER_VM = "vm";
-    private static final String EXT_BROKER = "ext" ;
-    /**
-     * The process where the remote broker is running.
-     */
-    private Process _brokerProcess;
+    // system properties
+    private static final String BROKER = "broker";
+    private static final String BROKER_VERSION  = "broker.version";
+
+    // values
+    private static final String VM = "vm";
+    private static final String EXTERNAL = "external";
+    private static final String VERSION_08 = "0-8";
+    private static final String VERSION_010 = "0-10";
 
-    /* The test property values */
-    // The default broker is an in-VM one
-    private String _shel = BROKER_VM;
-    private String _brokerPath = "";
-    private String _brokerParams = "";
-    private String _brokerVersion = "08" ;
+    private String _broker = System.getProperty(BROKER, VM);
+    private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08);
+
+    private Process _brokerProcess;
 
-    /* The broker communication objects */
     private InitialContext _initialContext;
     private AMQConnectionFactory _connectionFactory;
 
-    //--------- JUnit support
-
     protected void setUp() throws Exception
     {
         super.setUp();
-        // get the propeties if they are set
-         if (System.getProperties().containsKey(BROKER_VERSION ))
-        {
-            _brokerVersion = System.getProperties().getProperty(BROKER_VERSION );
-        }
-        if (System.getProperties().containsKey(SHEL))
-        {
-            _shel = System.getProperties().getProperty(SHEL);
-        }
-        if (System.getProperties().containsKey(BROKER_PATH))
-        {
-            _brokerPath = System.getProperties().getProperty(BROKER_PATH);
-        }
-        if (System.getProperties().containsKey(BROKER_PARAM))
-        {
-            _brokerParams = System.getProperties().getProperty(BROKER_PARAM);
-        }
-        if (!_shel.equals(BROKER_VM) && ! _shel.equals(EXT_BROKER) )
-        {
-            // start a new broker
-            startBroker();
-        }
-        else if ( ! _shel.equals(EXT_BROKER) )
-        {
-            // create an in_VM broker
-            TransportConnection.createVMBroker(1);
-        }
-        _logger.info("=========================================");
-        _logger.info("broker version " + _brokerVersion + " ==== " + _shel + " " + _brokerPath + " " + _brokerParams);
+        startBroker();
     }
 
-    /**
-     * This method _is invoked after each test case.
-     *
-     * @throws Exception
-     */
     protected void tearDown() throws Exception
     {
-          killBroker();
-         super.tearDown();
+        stopBroker();
+        super.tearDown();
     }
 
-    public void killBroker()
+    public void startBroker() throws Exception
     {
-        _logger.info("Kill broker");
-        if (_brokerProcess != null)
+        if (_broker.equals(VM))
         {
-            // destroy the currently running broker
-            _brokerProcess.destroy();
-            _brokerProcess = null;
+            // create an in_VM broker
+            TransportConnection.createVMBroker(1);
         }
-        else   if ( ! _shel.equals(EXT_BROKER))
+        else if (!_broker.equals(EXTERNAL))
         {
-            TransportConnection.killAllVMBrokers();
+            _logger.info("starting broker: " + _broker);
+            ProcessBuilder pb = new ProcessBuilder(_broker.split("\\s+"));
+            pb.redirectErrorStream(true);
+            _brokerProcess = pb.start();
+
+            new Thread()
+            {
+                private InputStream in = _brokerProcess.getInputStream();
+
+                public void run()
+                {
+                    try
+                    {
+                        byte[] buf = new byte[4*1024];
+                        int n;
+                        while ((n = in.read(buf)) != -1)
+                        {
+                            System.out.write(buf, 0, n);
+                        }
+                    }
+                    catch (IOException e)
+                    {
+                        _logger.info("redirector", e);
+                    }
+                }
+            }.start();
+
+            Thread.sleep(1000);
+
+            try
+            {
+                int exit = _brokerProcess.exitValue();
+                throw new RuntimeException("broker aborted: " + exit);
+            }
+            catch (IllegalThreadStateException e)
+            {
+                // this is expect if the broker started succesfully
+            }
         }
     }
 
-    //--------- Util method
-
-    /**
-     * This method starts a remote server by spawning an external process.
-     *
-     * @throws Exception If the broker cannot be started
-     */
-    public void startBroker() throws Exception
+    public void stopBroker() throws Exception
     {
-        _logger.info("Starting broker: " + _shel + " " + _brokerPath + "  " + _brokerParams + "");
-        Runtime rt = Runtime.getRuntime();
-        _brokerProcess = rt.exec(_shel + " " + _brokerPath + "  " + _brokerParams + "");
-        BufferedReader reader = new BufferedReader(new InputStreamReader(_brokerProcess.getInputStream()));
-        if (reader.ready())
+        _logger.info("stopping broker: " + _broker);
+        if (_brokerProcess != null)
         {
-            //bad, we had an error starting the broker
-            throw new Exception("Problem when starting the broker: " + reader.readLine());
+            _brokerProcess.destroy();
+            _brokerProcess.waitFor();
+            _logger.info("broker exited: " + _brokerProcess.exitValue());
+            _brokerProcess = null;
         }
-        // We need to wait for th ebroker to start ideally we would need to ping it
-        synchronized(this)
+        else if (_broker.equals(VM))
         {
-            this.wait(1000);
+            TransportConnection.killAllVMBrokers();
         }
     }
 
@@ -159,28 +143,18 @@
      */
     public boolean isBroker08()
     {
-        return _brokerVersion.equals(BROKER_08);
+        return _brokerVersion.equals(VERSION_08);
     }
 
-    /**
-     * Stop the currently running broker.
-     */
-    public void stopBroker()
+    public boolean isBroker010()
     {
-        _logger.info("Stopping broker");
-        // stooping the broker
-        if (_brokerProcess != null)
-        {
-            _brokerProcess.destroy();
-        }
-        _initialContext = null;
-        _connectionFactory = null;
+        return _brokerVersion.equals(VERSION_010);
     }
 
-     public void shutdownServer() throws Exception
+    public void shutdownServer() throws Exception
     {
-        killBroker();
-        setUp();
+        stopBroker();
+        startBroker();
     }
     /**
      * we assume that the environment is correctly set
@@ -228,7 +202,7 @@
     {
         _logger.info("get Connection");
         Connection con;
-        if (_shel.equals(BROKER_VM))
+        if (_broker.equals(VM))
         {
             con = new AMQConnection("vm://:1", username, password, "Test", "test");
         }
@@ -243,7 +217,7 @@
     {
         _logger.info("get Connection");
         Connection con;
-        if (_shel.equals(BROKER_VM))
+        if (_broker.equals(VM))
         {
             con = new AMQConnection("vm://:1", username, password, id, "test");
         }
@@ -254,8 +228,4 @@
         return con;
     }
 
-    public void testfoo()
-    {
-        //do nothing, just to avoid maven to report an error  
-    }
 }

Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/common/bin/qpid-run
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/common/bin/qpid-run
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpidity/BrokerDetailsImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpidity/BrokerDetailsImpl.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpidity/BrokerDetailsImpl.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpidity/BrokerDetailsImpl.java Mon Feb 11 04:11:03 2008
@@ -116,7 +116,7 @@
         {
             if (getProtocol().equals(BrokerDetails.PROTOCOL_TCP))
             {
-                _port = 1234;
+                _port = 5672;
             }
             else if (getProtocol().equals(BrokerDetails.PROTOCOL_TLS))
             {