You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2013/09/13 14:52:54 UTC

svn commit: r1522905 [1/3] - in /qpid/jms/trunk: ./ src/main/java/org/apache/qpid/jms/engine/ src/main/java/org/apache/qpid/jms/impl/ src/test/java/org/apache/qpid/jms/ src/test/java/org/apache/qpid/jms/impl/ src/test/java/org/apache/qpid/jms/test/test...

Author: robbie
Date: Fri Sep 13 12:52:52 2013
New Revision: 1522905

URL: http://svn.apache.org/r1522905
Log:
QPIDJMS-3: got message consumption integration test working

Work by Phil Harvey and myself.

Added:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessageToken.java
      - copied, changed from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessage.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionException.java
      - copied, changed from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/JmsInterruptedException.java
      - copied, changed from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/JmsTimeoutException.java
      - copied, changed from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkException.java
      - copied, changed from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/QpidJmsException.java
      - copied, changed from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageTokenImpl.java
      - copied, changed from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageImpl.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/QpidJmsTestCase.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ImmediateWaitUntil.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/DescriptorMatcher.java
Removed:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceivedMessage.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessage.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageImpl.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/JmsTest.java
Modified:
    qpid/jms/trunk/pom.xml
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriver.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceivedMessageImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ConnectionIntegrationTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/FrameMatchingHandler.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/FrameSender.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithNoPayloadMatchingHandler.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithPayloadMatchingHandler.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/HeaderHandlerImpl.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/ListDescribedType.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/Accepted.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/AttachFrame.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/BeginFrame.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/CloseFrame.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/DetachFrame.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/DispositionFrame.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/EndFrame.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/FlowFrame.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/Modified.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/OpenFrame.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/Rejected.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/Released.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/SaslChallengeFrame.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/SaslInitFrame.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/SaslMechanismsFrame.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/SaslOutcomeFrame.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/SaslResponseFrame.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/Source.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/Target.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/TransferFrame.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/generate-frames.xsl
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/AttachMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/BeginMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/CloseMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/DetachMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/DispositionMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/EndMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/FlowMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/OpenMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/SaslChallengeMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/SaslInitMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/SaslMechanismsMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/SaslOutcomeMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/SaslResponseMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/TransferMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/generate-matchers.xsl
    qpid/jms/trunk/src/test/resources/logging.properties

Modified: qpid/jms/trunk/pom.xml
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/pom.xml?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/pom.xml (original)
+++ qpid/jms/trunk/pom.xml Fri Sep 13 12:52:52 2013
@@ -70,6 +70,12 @@
       <version>${junit-version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>1.9.5</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java Fri Sep 13 12:52:52 2013
@@ -49,16 +49,17 @@ import org.apache.qpid.proton.message.Me
  * Obtain the {@link AmqpConnection} lock first to use them in a thread-safe
  * manner.
  *
- * TODO more tightly define the thread-safety of this class, e.g. should setSasl etc be synchronized?
  */
 @SuppressWarnings("rawtypes")
 public class AmqpConnection
 {
-    /** default timeout in milliseconds */
-    public static final long TIMEOUT = 10_000L;
-
     private static Logger _logger = Logger.getLogger("qpid.jms-client.connection");
 
+    /**
+     * Default timeout in milliseconds.
+     * TODO define a proper way for the timeout to be specified.
+     */
+    public static final long TIMEOUT = Long.getLong("org.apache.qpid.jms.connection.timeout", 10_000L);
 
     private static final ProtonFactoryLoader protonFactoryLoader = new ProtonFactoryLoader();
 
@@ -153,7 +154,7 @@ public class AmqpConnection
     /**
      * @return the username
      */
-    public String getUsername()
+    public synchronized String getUsername()
     {
         return _username;
     }
@@ -161,7 +162,7 @@ public class AmqpConnection
     /**
      * @param username the username to set
      */
-    public void setUsername(String username)
+    public synchronized void setUsername(String username)
     {
         _username = username;
     }
@@ -169,7 +170,7 @@ public class AmqpConnection
     /**
      * @return the password
      */
-    public String getPassword()
+    public synchronized String getPassword()
     {
         return _password;
     }
@@ -177,7 +178,7 @@ public class AmqpConnection
     /**
      * @param password the password to set
      */
-    public void setPassword(String password)
+    public synchronized void setPassword(String password)
     {
         _password = password;
     }
@@ -387,7 +388,7 @@ public class AmqpConnection
         return (MessageFactory) protonFactoryLoader.loadFactory(MessageFactory.class);
     }
 
-    public void setSasl(Sasl sasl)
+    public synchronized void setSasl(Sasl sasl)
     {
         _sasl = sasl;
     }

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriver.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriver.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriver.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriver.java Fri Sep 13 12:52:52 2013
@@ -82,12 +82,14 @@ public class AmqpConnectionDriver
         int port = amqpConnection.getPort();
 
         SocketChannel channel = null;
+        String threadName = null;
         try
         {
             channel = SocketChannel.open();
             channel.configureBlocking(true);
             channel.connect(new InetSocketAddress(remoteHost, port));
             channel.configureBlocking(false);
+            threadName = "DriverRunnable-" + channel.getLocalAddress() + "/" + channel.getRemoteAddress();
         }
         catch (IOException e)
         {
@@ -108,7 +110,8 @@ public class AmqpConnectionDriver
         amqpConnection.setSasl(sasl);
 
         _driverRunnable = new DriverRunnable();
-        _driverThread = new Thread(_driverRunnable); // TODO set a sensible thread name
+        _driverThread = new Thread(_driverRunnable);
+        _driverThread.setName(threadName);
         _driverThread.start();
     }
 

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java Fri Sep 13 12:52:52 2013
@@ -1,4 +1,5 @@
 /*
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,25 +16,87 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
+ *
  */
 package org.apache.qpid.jms.engine;
 
 import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
 import org.apache.qpid.proton.message.Message;
 
+/**
+ * Thread-safe (all state is guarded by the corresponding {@link AmqpConnection} monitor)
+ *
+ */
 public class AmqpMessage
 {
-    private Message _message;
 
+    private final AmqpReceiver _amqpReceiver;
+    private final Delivery _delivery;
+    private final Message _message;
+
+    public AmqpMessage(Delivery delivery, Message message, AmqpReceiver amqpReceiver)
+    {
+        _delivery = delivery;
+        _amqpReceiver = amqpReceiver;
+        _message = message;
+    }
+
+    /**
+     * Currently used when creating a message that we intend to send
+     */
     public AmqpMessage()
     {
         _message = Proton.message();
+        _amqpReceiver = null;
+        _delivery = null;
     }
 
-    //TODO: restrict visibility of the Proton Message.
-    public Message getMessage()
+    Message getMessage()
     {
         return _message;
     }
 
+    public void accept(boolean settle)
+    {
+        synchronized (_amqpReceiver.getAmqpConnection())
+        {
+            _delivery.disposition(Accepted.getInstance());
+            if(settle)
+            {
+                _delivery.settle();
+            }
+        }
+    }
+
+    public void settle()
+    {
+        synchronized (_amqpReceiver.getAmqpConnection())
+        {
+            _delivery.settle();
+        }
+    }
+
+    /**
+     * If using proton-j, returns true if locally or remotely settled.
+     * If using proton-c, returns true if remotely settled.
+     * TODO - remove this hack when Proton-J and -C APIs are properly aligned
+     * The C API defines isSettled as being true if the delivery has been settled locally OR remotely
+     */
+    public boolean isSettled()
+    {
+        synchronized (_amqpReceiver.getAmqpConnection())
+        {
+            return _delivery.isSettled() || ((_delivery instanceof DeliveryImpl && ((DeliveryImpl)_delivery).remotelySettled()));
+        }
+    }
+
+    public void setText(String string)
+    {
+        AmqpValue body = new AmqpValue(string);
+        _message.setBody(body);
+    }
 }

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java Fri Sep 13 12:52:52 2013
@@ -40,7 +40,7 @@ public class AmqpReceiver extends AmqpLi
         _protonReceiver.flow(credit);
     }
 
-    public AmqpReceivedMessage receiveNoWait()
+    public AmqpMessage receiveNoWait()
     {
         synchronized (getAmqpConnection())
         {
@@ -74,7 +74,7 @@ public class AmqpReceiver extends AmqpLi
                         Message message = getAmqpConnection().getMessageFactory().createMessage();
                         message.decode(_buffer, 0, total);
 
-                        AmqpReceivedMessage amqpMessage = new AmqpReceivedMessage(currentDelivery, message, this);
+                        AmqpMessage amqpMessage = new AmqpMessage(currentDelivery, message, this);
                         currentDelivery.setContext(amqpMessage);
                         _protonReceiver.advance();
                         return amqpMessage;

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java Fri Sep 13 12:52:52 2013
@@ -38,7 +38,7 @@ public class AmqpSender extends AmqpLink
         _protonSender = protonSender;
     }
 
-    public AmqpSentMessage sendMessage(AmqpMessage amqpMessage)
+    public AmqpSentMessageToken sendMessage(AmqpMessage amqpMessage)
     {
         synchronized (getAmqpConnection())
         {
@@ -66,7 +66,7 @@ public class AmqpSender extends AmqpLink
             _protonSender.send(_buffer, 0, encoded);
             _protonSender.advance();
 
-            AmqpSentMessage amqpSentMessage = new AmqpSentMessage(del, this);
+            AmqpSentMessageToken amqpSentMessage = new AmqpSentMessageToken(del, this);
             del.setContext(amqpSentMessage);
 
             return amqpSentMessage;

Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessageToken.java (from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessage.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessageToken.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessageToken.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessage.java&r1=1519004&r2=1522905&rev=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessage.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessageToken.java Fri Sep 13 12:52:52 2013
@@ -23,13 +23,13 @@ package org.apache.qpid.jms.engine;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.engine.Delivery;
 
-public class AmqpSentMessage
+public class AmqpSentMessageToken
 {
     private Delivery _delivery;
     private AmqpSender _amqpSender;
     private AmqpConnection _amqpConnection;
 
-    public AmqpSentMessage(Delivery delivery, AmqpSender sender)
+    public AmqpSentMessageToken(Delivery delivery, AmqpSender sender)
     {
         _delivery = delivery;
         _amqpSender = sender;

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java Fri Sep 13 12:52:52 2013
@@ -98,12 +98,21 @@ public class AmqpSession
         return amqpSender;
     }
 
-    public AmqpReceiver createAmqpReceiver(String name, String address)
+    public AmqpReceiver createAmqpReceiver(String address)
     {
+        String name = address + "->" + UUID.randomUUID().toString();
         Receiver protonReceiver = _protonSession.receiver(name);
+
         Source source = new Source();
         source.setAddress(address);
         protonReceiver.setSource(source);
+
+        Target target = new Target();
+        protonReceiver.setTarget(target);
+
+        protonReceiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        protonReceiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
         AmqpReceiver amqpReceiver = new AmqpReceiver(this, protonReceiver);
         protonReceiver.setContext(amqpReceiver);
         protonReceiver.open();

Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionException.java (from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionException.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionException.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java&r1=1519004&r2=1522905&rev=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionException.java Fri Sep 13 12:52:52 2013
@@ -18,10 +18,9 @@
  * under the License.
  *
  */
-package org.apache.qpid.jms.engine;
+package org.apache.qpid.jms.impl;
 
-//TODO make me (or wrap me in) a JMSException
-public class ConnectionException extends Exception
+public class ConnectionException extends QpidJmsException
 {
     private static final long serialVersionUID = 419676688719664719L;
 

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java Fri Sep 13 12:52:52 2013
@@ -37,8 +37,6 @@ import javax.jms.Topic;
 import org.apache.qpid.jms.engine.AmqpConnection;
 import org.apache.qpid.jms.engine.AmqpConnectionDriver;
 import org.apache.qpid.jms.engine.AmqpSession;
-import org.apache.qpid.jms.engine.ConnectionException;
-import org.apache.qpid.proton.TimeoutException;
 
 /**
  * A JMS connection.
@@ -48,6 +46,8 @@ import org.apache.qpid.proton.TimeoutExc
  * <li>Other internal classes must use the connection's lock and state-change methods -
  *     see {@link #lock()}/{@link #releaseLock()} and {@link #stateChanged()} for details.</li>
  * </ul>
+ *
+ * TODO wherever we throws JMSException, throw a subclass that has the cause set
  */
 public class ConnectionImpl implements Connection
 {
@@ -60,6 +60,8 @@ public class ConnectionImpl implements C
 
     private ConnectionLock _connectionLock;
 
+    private volatile boolean _isStarted;
+
     /**
      * TODO: accept a client id
      * TODO: defer connection to the broker if client has not been set. Defer it until any other method is called.
@@ -87,22 +89,13 @@ public class ConnectionImpl implements C
 
             connect();
         }
-        catch(InterruptedException e)
+        catch (IOException e)
         {
-            Thread.currentThread().interrupt();
-            JMSException jmse = new JMSException("Interrupted while trying to create connection");
-            jmse.setLinkedException(e);
-            throw jmse;
-        }
-        catch (TimeoutException | IOException | ConnectionException e)
-        {
-            JMSException jmse = new JMSException("Unable to create connection");
-            jmse.setLinkedException(e);
-            throw jmse;
+            throw new QpidJmsException("Unable to create connection", e);
         }
     }
 
-    void waitUntil(Predicate condition, long timeoutMillis) throws TimeoutException, InterruptedException
+    void waitUntil(Predicate condition, long timeoutMillis) throws JmsTimeoutException, JmsInterruptedException
     {
         long deadline = timeoutMillis < 0 ? Long.MAX_VALUE : System.currentTimeMillis() + timeoutMillis;
 
@@ -122,7 +115,19 @@ public class ConnectionImpl implements C
                 }
                 if (wait && !done && !first)
                 {
-                    _amqpConnection.wait(timeoutMillis < 0 ? 0 : deadline - System.currentTimeMillis());
+                    try
+                    {
+                        _amqpConnection.wait(timeoutMillis < 0 ? 0 : deadline - System.currentTimeMillis());
+                    }
+                    catch (InterruptedException e)
+                    {
+                        //Note we are not setting the interrupted status, as it
+                        //is likely that user code will reenter the client code to
+                        //perform e.g close/rollback/etc and setting the status
+                        //could erroneously make those fail.
+                        throw new JmsInterruptedException("Interrupted while waiting for conditition: "
+                                                            + condition.getCurrentState() , e);
+                    }
                 }
 
                 wait = deadline > System.currentTimeMillis();
@@ -138,12 +143,12 @@ public class ConnectionImpl implements C
 
             if (!done)
             {
-                throw new TimeoutException(timeoutMillis, condition.getCurrentState());
+                throw new JmsTimeoutException(timeoutMillis, condition.getCurrentState());
             }
         }
     }
 
-    private void connect() throws IOException, ConnectionException, TimeoutException, InterruptedException
+    private void connect() throws IOException, ConnectionException, JmsTimeoutException, JmsInterruptedException
     {
         lock();
         try
@@ -157,19 +162,20 @@ public class ConnectionImpl implements C
                 }
             }, AmqpConnection.TIMEOUT);
 
+            //TODO: sort out exception throwing
             if(_amqpConnection.getConnectionError().getCondition() != null)
             {
-                throw new ConnectionException("Connection failed: 1 " + _amqpConnection.getConnectionError());
+                throw new ConnectionException("Connection failed: " + _amqpConnection.getConnectionError());
             }
 
             if(_amqpConnection.isAuthenticationError())
             {
-                throw new ConnectionException("Connection failed: 2");
+                throw new ConnectionException("Connection failed: authentication failure");
             }
 
             if(!_amqpConnection.isConnected())
             {
-                throw new ConnectionException("Connection failed: 3");
+                throw new ConnectionException("Connection failed");
             }
         }
         finally
@@ -204,16 +210,7 @@ public class ConnectionImpl implements C
         }
         catch(InterruptedException e)
         {
-            Thread.currentThread().interrupt();
-            JMSException jmse = new JMSException("Interrupted while trying to close connection");
-            jmse.setLinkedException(e);
-            throw jmse;
-        }
-        catch (TimeoutException | ConnectionException e)
-        {
-            JMSException jmse = new JMSException("Unable to close connection");
-            jmse.setLinkedException(e);
-            throw jmse;
+            throw new JmsInterruptedException("Interrupted while trying to close connection", e);
         }
         finally
         {
@@ -221,9 +218,6 @@ public class ConnectionImpl implements C
         }
     }
 
-    /**
-     * TODO the params are ignored - fix this
-     */
     @Override
     public SessionImpl createSession(boolean transacted, int acknowledgeMode) throws JMSException
     {
@@ -247,19 +241,6 @@ public class ConnectionImpl implements C
 
             return session;
         }
-        catch (TimeoutException e)
-        {
-            JMSException jmse = new JMSException("Unable to create session");
-            jmse.setLinkedException(e);
-            throw jmse;
-        }
-        catch(InterruptedException e)
-        {
-            Thread.currentThread().interrupt();
-            JMSException jmse = new JMSException("Interrupted while trying to create session");
-            jmse.setLinkedException(e);
-            throw jmse;
-        }
         finally
         {
             releaseLock();
@@ -267,14 +248,6 @@ public class ConnectionImpl implements C
     }
 
     /**
-     * TODO add @Override when we start implementing the JMS2 API.
-     */
-    public Session createSession() throws JMSException
-    {
-        return createSession(false, Session.AUTO_ACKNOWLEDGE);
-    }
-
-    /**
      * <p>
      * Acquire the connection lock.
      * </p>
@@ -351,8 +324,7 @@ public class ConnectionImpl implements C
     @Override
     public void start() throws JMSException
     {
-        // PHTODO Auto-generated method stub
-        throw new UnsupportedOperationException("PHTODO");
+        _isStarted = true;
     }
 
     @Override
@@ -376,4 +348,8 @@ public class ConnectionImpl implements C
         throw new UnsupportedOperationException("PHTODO");
     }
 
+    boolean isStarted()
+    {
+        return _isStarted;
+    }
 }

Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/JmsInterruptedException.java (from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/JmsInterruptedException.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/JmsInterruptedException.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java&r1=1519004&r2=1522905&rev=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/JmsInterruptedException.java Fri Sep 13 12:52:52 2013
@@ -18,15 +18,20 @@
  * under the License.
  *
  */
-package org.apache.qpid.jms.engine;
+package org.apache.qpid.jms.impl;
 
-// TODO make me (or wrap me in) a JMSException
-public class LinkException extends Exception
+/**
+ * Used to signal that we have been interrupted.
+ *
+ * When we throw this, the interrupted status is not set on the thread.
+ */
+public class JmsInterruptedException extends QpidJmsException
 {
-    private static final long serialVersionUID = 419676688719664719L;
+    private static final long serialVersionUID = 384180653752426597L;
 
-    public LinkException(String msg)
+    public JmsInterruptedException(String reason, InterruptedException cause)
     {
-        super(msg);
+        super(reason, cause);
     }
+
 }

Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/JmsTimeoutException.java (from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/JmsTimeoutException.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/JmsTimeoutException.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java&r1=1519004&r2=1522905&rev=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/JmsTimeoutException.java Fri Sep 13 12:52:52 2013
@@ -18,15 +18,14 @@
  * under the License.
  *
  */
-package org.apache.qpid.jms.engine;
+package org.apache.qpid.jms.impl;
 
-//TODO make me (or wrap me in) a JMSException
-public class ConnectionException extends Exception
+public class JmsTimeoutException extends QpidJmsException
 {
-    private static final long serialVersionUID = 419676688719664719L;
+    private static final long serialVersionUID = 7486676055343430641L;
 
-    public ConnectionException(String msg)
+    public JmsTimeoutException(long timeoutMillis, String pendingCondition)
     {
-        super(msg);
+        super("Timed out after " + timeoutMillis + " ms waiting for condition: " + pendingCondition);
     }
 }

Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkException.java (from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkException.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkException.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java&r1=1519004&r2=1522905&rev=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkException.java Fri Sep 13 12:52:52 2013
@@ -18,10 +18,9 @@
  * under the License.
  *
  */
-package org.apache.qpid.jms.engine;
+package org.apache.qpid.jms.impl;
 
-// TODO make me (or wrap me in) a JMSException
-public class LinkException extends Exception
+public class LinkException extends QpidJmsException
 {
     private static final long serialVersionUID = 419676688719664719L;
 

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java Fri Sep 13 12:52:52 2013
@@ -24,23 +24,19 @@ import javax.jms.JMSException;
 
 import org.apache.qpid.jms.engine.AmqpConnection;
 import org.apache.qpid.jms.engine.AmqpLink;
-import org.apache.qpid.jms.engine.LinkException;
-import org.apache.qpid.proton.TimeoutException;
 
 public class LinkImpl
 {
-    private SessionImpl _sessionImpl;
     private ConnectionImpl _connectionImpl;
     private AmqpLink _amqpLink;
 
-    public LinkImpl(SessionImpl sessionImpl, AmqpLink amqpLink)
+    public LinkImpl(ConnectionImpl connectionImpl, AmqpLink amqpLink)
     {
-        _sessionImpl = sessionImpl;
-        _connectionImpl = _sessionImpl.getConnectionImpl();
+        _connectionImpl = connectionImpl;
         _amqpLink = amqpLink;
     }
 
-    public void establish() throws TimeoutException, InterruptedException, LinkException
+    public void establish() throws LinkException, JmsTimeoutException, JmsInterruptedException
     {
         _connectionImpl.waitUntil(new SimplePredicate("Link is established or failed", _amqpLink)
         {
@@ -52,7 +48,7 @@ public class LinkImpl
         }, AmqpConnection.TIMEOUT);
         if(!_amqpLink.isEstablished())
         {
-            throw new LinkException("Failed to establish link " + _amqpLink); // TODO make message less verbose
+            throw new LinkException("Failed to establish link " + _amqpLink);
         }
     }
 
@@ -63,40 +59,14 @@ public class LinkImpl
         {
             _amqpLink.close();
             _connectionImpl.stateChanged();
-            while(!_amqpLink.isClosed())
+            _connectionImpl.waitUntil(new SimplePredicate("Link is closed", _amqpLink)
             {
-                try
+                @Override
+                public boolean test()
                 {
-                    _connectionImpl.waitUntil(new SimplePredicate("Link is closed", _amqpLink)
-                    {
-                        @Override
-                        public boolean test()
-                        {
-                            return _amqpLink.isClosed();
-                        }
-                    }, AmqpConnection.TIMEOUT);
+                    return _amqpLink.isClosed();
                 }
-                catch (TimeoutException e)
-                {
-                    JMSException jmse = new JMSException("Unable to close link");
-                    jmse.setLinkedException(e);
-                    throw jmse;
-                }
-                catch(InterruptedException e)
-                {
-                    Thread.currentThread().interrupt();
-                    JMSException jmse = new JMSException("Interrupted while trying to close link");
-                    jmse.setLinkedException(e);
-                    throw jmse;
-                }
-
-            }
-
-            //TODO: link errors? E.g:
-            //            if(_amqpSender.getLinkError().getCondition() != null)
-            //            {
-            //                throw new ConnectionException("Sender close failed: " + _amqpSender.getLinkError());
-            //            }
+            }, AmqpConnection.TIMEOUT);
         }
         finally
         {

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java Fri Sep 13 12:52:52 2013
@@ -32,10 +32,14 @@ public abstract class MessageImpl implem
 
     protected MessageImpl()
     {
-        //TODO: move to AmqpSession.createMessage()?
         _amqpMessage = new AmqpMessage();
     }
 
+    AmqpMessage getAmqpMessage()
+    {
+        return _amqpMessage;
+    }
+
     @Override
     public String getJMSMessageID() throws JMSException
     {
@@ -351,9 +355,5 @@ public abstract class MessageImpl implem
         throw new UnsupportedOperationException("PHTODO");
     }
 
-    public AmqpMessage getAmqpMessage()
-    {
-        return _amqpMessage;
-    }
 
 }

Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/QpidJmsException.java (from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/QpidJmsException.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/QpidJmsException.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java&r1=1519004&r2=1522905&rev=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/QpidJmsException.java Fri Sep 13 12:52:52 2013
@@ -1,4 +1,5 @@
 /*
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,25 +16,28 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
+ *
  */
-package org.apache.qpid.jms.engine;
+package org.apache.qpid.jms.impl;
 
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.message.Message;
+import javax.jms.JMSException;
 
-public class AmqpMessage
+public class QpidJmsException extends JMSException
 {
-    private Message _message;
+    private static final long serialVersionUID = 751932967255393054L;
 
-    public AmqpMessage()
+    public QpidJmsException(String reason)
     {
-        _message = Proton.message();
+        super(reason);
     }
 
-    //TODO: restrict visibility of the Proton Message.
-    public Message getMessage()
+    public QpidJmsException(String reason, Exception cause)
     {
-        return _message;
+        super(reason);
+        if (cause != null)
+        {
+            setLinkedException(cause);
+            initCause(cause);
+        }
     }
-
 }

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceivedMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceivedMessageImpl.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceivedMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceivedMessageImpl.java Fri Sep 13 12:52:52 2013
@@ -20,76 +20,357 @@
  */
 package org.apache.qpid.jms.impl;
 
-import org.apache.qpid.jms.engine.AmqpConnection;
-import org.apache.qpid.jms.engine.AmqpReceivedMessage;
-import org.apache.qpid.proton.TimeoutException;
-import org.apache.qpid.proton.message.Message;
+import java.util.Enumeration;
 
-public class ReceivedMessageImpl
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.engine.AmqpMessage;
+
+public class ReceivedMessageImpl implements javax.jms.Message // TODO inherit from sendable base class
 {
-    private AmqpReceivedMessage _amqpMessage;
-    private ReceiverImpl _receiverImpl;
+    private final AmqpMessage _amqpMessage;
+    private final SessionImpl _sessionImpl;
+    private final ConnectionImpl _connectionImpl;
 
-    public ReceivedMessageImpl(AmqpReceivedMessage amqpMessage, ReceiverImpl receiverImpl)
+    public ReceivedMessageImpl(AmqpMessage amqpMessage, SessionImpl sessionImpl)
     {
         _amqpMessage = amqpMessage;
-        _receiverImpl = receiverImpl;
+        _sessionImpl = sessionImpl;
+        _connectionImpl = _sessionImpl.getConnectionImpl();
     }
 
     public void accept(boolean settle)
     {
-        _receiverImpl.getConnectionImpl().lock();
+        _connectionImpl.lock();
         try
         {
-            _amqpMessage.accept();
-            if(settle)
-            {
-                _amqpMessage.settle();
-            }
-            _receiverImpl.getConnectionImpl().stateChanged();
+            _amqpMessage.accept(settle);
+            _connectionImpl.stateChanged();
         }
         finally
         {
-            _receiverImpl.getConnectionImpl().releaseLock();
+            _connectionImpl.releaseLock();
         }
     }
 
-    public void settle()
+    AmqpMessage getAmqpMessage()
     {
-        _receiverImpl.getConnectionImpl().lock();
-        try
-        {
-            _amqpMessage.settle();
-            _receiverImpl.getConnectionImpl().stateChanged();
-        }
-        finally
-        {
-            _receiverImpl.getConnectionImpl().releaseLock();
-        }
+        return _amqpMessage;
     }
 
-    public void waitUntilSettled() throws TimeoutException, InterruptedException
+    @Override
+    public String getJMSMessageID() throws JMSException
     {
-        _receiverImpl.getConnectionImpl().lock();
-        try
-        {
-            _receiverImpl.getConnectionImpl().waitUntil(new SimplePredicate("Message is settled", _amqpMessage)
-            {
-                @Override
-                public boolean test()
-                {
-                    return _amqpMessage.isSettled();
-                }
-            }, AmqpConnection.TIMEOUT);
-        }
-        finally
-        {
-            _receiverImpl.getConnectionImpl().releaseLock();
-        }
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setJMSMessageID(String id) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public long getJMSTimestamp() throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setJMSTimestamp(long timestamp) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public byte[] getJMSCorrelationIDAsBytes() throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setJMSCorrelationID(String correlationID) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public String getJMSCorrelationID() throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public Destination getJMSReplyTo() throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setJMSReplyTo(Destination replyTo) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public Destination getJMSDestination() throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setJMSDestination(Destination destination) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public int getJMSDeliveryMode() throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setJMSDeliveryMode(int deliveryMode) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public boolean getJMSRedelivered() throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setJMSRedelivered(boolean redelivered) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public String getJMSType() throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setJMSType(String type) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public long getJMSExpiration() throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setJMSExpiration(long expiration) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public int getJMSPriority() throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setJMSPriority(int priority) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void clearProperties() throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public boolean propertyExists(String name) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public boolean getBooleanProperty(String name) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public byte getByteProperty(String name) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public short getShortProperty(String name) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public int getIntProperty(String name) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public long getLongProperty(String name) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public float getFloatProperty(String name) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public double getDoubleProperty(String name) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public String getStringProperty(String name) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public Object getObjectProperty(String name) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public Enumeration getPropertyNames() throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setBooleanProperty(String name, boolean value) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setByteProperty(String name, byte value) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setShortProperty(String name, short value) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setIntProperty(String name, int value) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setLongProperty(String name, long value) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setFloatProperty(String name, float value) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setDoubleProperty(String name, double value) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setStringProperty(String name, String value) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setObjectProperty(String name, Object value) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void acknowledge() throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
     }
 
-    public Message getMessage()
+    @Override
+    public void clearBody() throws JMSException
     {
-        return _amqpMessage.getMessage();
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
     }
 }

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java Fri Sep 13 12:52:52 2013
@@ -20,30 +20,58 @@
  */
 package org.apache.qpid.jms.impl;
 
-import org.apache.qpid.jms.engine.AmqpReceivedMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+
+import org.apache.qpid.jms.engine.AmqpMessage;
 import org.apache.qpid.jms.engine.AmqpReceiver;
-import org.apache.qpid.proton.TimeoutException;
 
-public class ReceiverImpl extends LinkImpl
+public class ReceiverImpl extends LinkImpl implements MessageConsumer
 {
     private final AmqpReceiver _amqpReceiver;
+    private final SessionImpl _sessionImpl;
 
-    public ReceiverImpl(SessionImpl sessionImpl, AmqpReceiver amqpReceiver)
+    public ReceiverImpl(ConnectionImpl connectionImpl, SessionImpl sessionImpl, AmqpReceiver amqpReceiver)
     {
-        super(sessionImpl, amqpReceiver);
+        super(connectionImpl, amqpReceiver);
+        _sessionImpl = sessionImpl;
         _amqpReceiver = amqpReceiver;
     }
 
-    public ReceivedMessageImpl receive(long timeout) throws TimeoutException, InterruptedException
+    @Override
+    public Message receive() throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public Message receive(long timeout) throws JMSException
     {
         getConnectionImpl().lock();
         try
         {
+            if(!getConnectionImpl().isStarted())
+            {
+                return null;
+            }
+
             MessageReceivedPredicate messageReceievedCondition = new MessageReceivedPredicate();
             getConnectionImpl().waitUntil(messageReceievedCondition, timeout);
-            getConnectionImpl().stateChanged();
 
-            return new ReceivedMessageImpl(messageReceievedCondition.getReceivedMessage(), this);
+            //TODO: decide what if any particular message impl class to instantiate
+            //TODO: accepting/settling will be acknowledge-mode dependent
+            ReceivedMessageImpl receivedMessageImpl = new ReceivedMessageImpl(messageReceievedCondition.getReceivedMessage(), _sessionImpl);
+            receivedMessageImpl.accept(true);
+            getConnectionImpl().stateChanged();
+            return receivedMessageImpl;
+        }
+        catch (JmsTimeoutException e)
+        {
+            //No message in allotted time, return null to signal this
+            return null;
         }
         finally
         {
@@ -67,7 +95,7 @@ public class ReceiverImpl extends LinkIm
 
     private final class MessageReceivedPredicate extends SimplePredicate
     {
-        AmqpReceivedMessage _message;
+        AmqpMessage _message;
 
         public MessageReceivedPredicate()
         {
@@ -84,10 +112,38 @@ public class ReceiverImpl extends LinkIm
             return _message != null;
         }
 
-        public AmqpReceivedMessage getReceivedMessage()
+        public AmqpMessage getReceivedMessage()
         {
             return _message;
         }
     }
 
+    @Override
+    public String getMessageSelector() throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public MessageListener getMessageListener() throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public void setMessageListener(MessageListener listener) throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
+    @Override
+    public Message receiveNoWait() throws JMSException
+    {
+        // PHTODO Auto-generated method stub
+        throw new UnsupportedOperationException("PHTODO");
+    }
+
 }

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java Fri Sep 13 12:52:52 2013
@@ -27,8 +27,7 @@ import javax.jms.MessageProducer;
 
 import org.apache.qpid.jms.engine.AmqpMessage;
 import org.apache.qpid.jms.engine.AmqpSender;
-import org.apache.qpid.jms.engine.AmqpSentMessage;
-import org.apache.qpid.proton.TimeoutException;
+import org.apache.qpid.jms.engine.AmqpSentMessageToken;
 
 public class SenderImpl extends LinkImpl implements MessageProducer
 {
@@ -36,7 +35,7 @@ public class SenderImpl extends LinkImpl
 
     public SenderImpl(SessionImpl sessionImpl, AmqpSender amqpSender)
     {
-        super(sessionImpl, amqpSender);
+        super(sessionImpl.getConnectionImpl(), amqpSender);
         _amqpSender = amqpSender;
     }
 
@@ -48,28 +47,14 @@ public class SenderImpl extends LinkImpl
         {
             AmqpMessage amqpMessage = getAmqpMessageFromJmsMessage(message);
 
-            AmqpSentMessage sentMessage = _amqpSender.sendMessage(amqpMessage);
+            AmqpSentMessageToken sentMessage = _amqpSender.sendMessage(amqpMessage);
 
             getConnectionImpl().stateChanged();
 
-            SentMessageImpl sentMessageImpl = new SentMessageImpl(sentMessage, this);
+            SentMessageTokenImpl sentMessageImpl = new SentMessageTokenImpl(sentMessage, this);
             sentMessageImpl.waitUntilAccepted();
             sentMessage.settle();
         }
-        catch (InterruptedException e)
-        {
-            Thread.currentThread().interrupt();
-            JMSException jmse = new JMSException("Interrupted while trying to send messages");
-            jmse.setLinkedException(e);
-            throw jmse;
-        }
-        catch (TimeoutException e)
-        {
-            JMSException jmse = new JMSException("Timed out during send");
-            e.printStackTrace();
-            jmse.setLinkedException(e);
-            throw jmse;
-        }
         finally
         {
             getConnectionImpl().releaseLock();

Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageTokenImpl.java (from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageImpl.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageTokenImpl.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageTokenImpl.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageImpl.java&r1=1519004&r2=1522905&rev=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageTokenImpl.java Fri Sep 13 12:52:52 2013
@@ -21,28 +21,31 @@
 package org.apache.qpid.jms.impl;
 
 import org.apache.qpid.jms.engine.AmqpConnection;
-import org.apache.qpid.jms.engine.AmqpSentMessage;
-import org.apache.qpid.proton.TimeoutException;
+import org.apache.qpid.jms.engine.AmqpSentMessageToken;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
 
-public class SentMessageImpl
+public class SentMessageTokenImpl
 {
-    private AmqpSentMessage _sentMessage;
+    private AmqpSentMessageToken _sentMessage;
     private SenderImpl _sender;
 
-    public SentMessageImpl(AmqpSentMessage sentMessage, SenderImpl sender)
+    public SentMessageTokenImpl(AmqpSentMessageToken sentMessage, SenderImpl sender)
     {
         _sentMessage = sentMessage;
         _sender = sender;
     }
 
-    public void waitUntilAccepted() throws TimeoutException, InterruptedException
+    public void waitUntilAccepted() throws JmsTimeoutException, JmsInterruptedException
     {
         _sender.getConnectionImpl().waitUntil(new SimplePredicate("Remote delivery state exists", _sentMessage)
         {
             @Override
             public boolean test()
             {
-                return _sentMessage.getRemoteDeliveryState() != null; //TODO: should we actually check it is *accepted*?
+                DeliveryState remoteDeliveryState = _sentMessage.getRemoteDeliveryState();
+                return remoteDeliveryState != null && Accepted.getInstance().equals(remoteDeliveryState);
+                //TODO: throw an exception if it isn't accepted.
             }
         }, AmqpConnection.TIMEOUT);
     }

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java Fri Sep 13 12:52:52 2013
@@ -21,10 +21,10 @@
 package org.apache.qpid.jms.impl;
 
 import java.io.Serializable;
-import java.util.UUID;
 
 import javax.jms.BytesMessage;
 import javax.jms.Destination;
+import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 import javax.jms.MapMessage;
 import javax.jms.Message;
@@ -46,12 +46,10 @@ import org.apache.qpid.jms.engine.AmqpCo
 import org.apache.qpid.jms.engine.AmqpReceiver;
 import org.apache.qpid.jms.engine.AmqpSender;
 import org.apache.qpid.jms.engine.AmqpSession;
-import org.apache.qpid.jms.engine.ConnectionException;
-import org.apache.qpid.jms.engine.LinkException;
-import org.apache.qpid.proton.TimeoutException;
 
 public class SessionImpl implements Session
 {
+    private static final int INITIAL_RECEIVER_CREDIT = 1;
     private AmqpSession _amqpSession;
     private ConnectionImpl _connectionImpl;
 
@@ -61,7 +59,7 @@ public class SessionImpl implements Sess
         _connectionImpl = connectionImpl;
     }
 
-    public void establish() throws TimeoutException, InterruptedException
+    public void establish() throws JmsTimeoutException, JmsInterruptedException
     {
         _connectionImpl.waitUntil(new SimplePredicate("Session established", _amqpSession)
         {
@@ -98,19 +96,6 @@ public class SessionImpl implements Sess
                 throw new ConnectionException("Session close failed: " + _amqpSession.getSessionError());
             }
         }
-        catch(InterruptedException e)
-        {
-            Thread.currentThread().interrupt();
-            JMSException jmse = new JMSException("Interrupted while trying to close session");
-            jmse.setLinkedException(e);
-            throw jmse;
-        }
-        catch (TimeoutException | ConnectionException e)
-        {
-            JMSException jmse = new JMSException("Unable to close session");
-            jmse.setLinkedException(e);
-            throw jmse;
-        }
         finally
         {
             _connectionImpl.releaseLock();
@@ -133,8 +118,7 @@ public class SessionImpl implements Sess
         else if (destination instanceof Queue)
         {
             Queue queue = (Queue) destination;
-            String senderName = "producer-" + queue.getQueueName() + "-" + UUID.randomUUID();
-            return createSender(senderName, queue.getQueueName());
+            return createSender(queue.getQueueName());
         }
         else if(destination instanceof Topic)
         {
@@ -147,7 +131,7 @@ public class SessionImpl implements Sess
 
     }
 
-    private SenderImpl createSender(String senderName, String address) throws JMSException
+    private SenderImpl createSender(String address) throws JMSException
     {
         _connectionImpl.lock();
         try
@@ -158,34 +142,51 @@ public class SessionImpl implements Sess
             sender.establish();
             return sender;
         }
-        catch (InterruptedException e)
-        {
-            Thread.currentThread().interrupt();
-            JMSException jmse = new JMSException("Interrupted while trying to create sender");
-            jmse.setLinkedException(e);
-            throw jmse;
-        }
-        catch (TimeoutException | LinkException e)
-        {
-            JMSException jmse = new JMSException("Unable to create sender: " + e.getMessage());
-            jmse.setLinkedException(e);
-            throw jmse;
-        }
         finally
         {
             _connectionImpl.releaseLock();
         }
     }
 
-    public ReceiverImpl createReceiver(String name, String address) throws TimeoutException, InterruptedException, LinkException
+    @Override
+    public MessageConsumer createConsumer(Destination destination) throws JMSException
+    {
+        if(destination == null)
+        {
+            throw new InvalidDestinationException("Null destination provided");
+        }
+        else if (destination instanceof Queue)
+        {
+            Queue queue = (Queue) destination;
+            return createReceiver(queue.getQueueName());
+        }
+        else if(destination instanceof Topic)
+        {
+            throw new UnsupportedOperationException("Topics are not yet supported");
+        }
+        else
+        {
+            throw new IllegalArgumentException("Destination expected to be a Queue or a Topic but was: " + destination.getClass());
+        }
+    }
+
+    private ReceiverImpl createReceiver(String address) throws JMSException
     {
         _connectionImpl.lock();
         try
         {
-            AmqpReceiver amqpReceiver = _amqpSession.createAmqpReceiver(name, address);
-            ReceiverImpl receiver = new ReceiverImpl(this, amqpReceiver);
+            AmqpReceiver amqpReceiver = _amqpSession.createAmqpReceiver(address);
+            ReceiverImpl receiver = new ReceiverImpl(_connectionImpl, this, amqpReceiver);
             _connectionImpl.stateChanged();
             receiver.establish();
+
+            if(_connectionImpl.isStarted())
+            {
+                //issue initial flow for the consumer
+                amqpReceiver.credit(INITIAL_RECEIVER_CREDIT);
+                _connectionImpl.stateChanged();
+            }
+
             return receiver;
         }
         finally
@@ -305,13 +306,6 @@ public class SessionImpl implements Sess
     }
 
     @Override
-    public MessageConsumer createConsumer(Destination destination) throws JMSException
-    {
-        // PHTODO Auto-generated method stub
-        throw new UnsupportedOperationException("PHTODO");
-    }
-
-    @Override
     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
     {
         // PHTODO Auto-generated method stub

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java Fri Sep 13 12:52:52 2013
@@ -21,8 +21,6 @@ package org.apache.qpid.jms.impl;
 import javax.jms.JMSException;
 import javax.jms.TextMessage;
 
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-
 public class TextMessageImpl extends MessageImpl implements TextMessage
 {
     public TextMessageImpl() throws JMSException
@@ -45,10 +43,7 @@ public class TextMessageImpl extends Mes
     @Override
     public void setText(String string) throws JMSException
     {
-        AmqpValue body = new AmqpValue(string);
-
-        //TODO: stop accessing the Proton Message directly
-        getAmqpMessage().getMessage().setBody(body);
+        getAmqpMessage().setText(string);
     }
 
 }

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ConnectionIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ConnectionIntegrationTest.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ConnectionIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ConnectionIntegrationTest.java Fri Sep 13 12:52:52 2013
@@ -29,8 +29,7 @@ import org.apache.qpid.jms.test.testpeer
 import org.junit.Test;
 
 // TODO find a way to make the test abort immediately if the TestAmqpPeer throws an exception
-// TODO add tests such as testBrokerDoesNotRespond and testBrokerSendsWrongFrame
-public class ConnectionIntegrationTest
+public class ConnectionIntegrationTest extends QpidJmsTestCase
 {
     private final IntegrationTestFixture _testFixture = new IntegrationTestFixture();
 

Added: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/QpidJmsTestCase.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/QpidJmsTestCase.java?rev=1522905&view=auto
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/QpidJmsTestCase.java (added)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/QpidJmsTestCase.java Fri Sep 13 12:52:52 2013
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+public class QpidJmsTestCase
+{
+    private final Logger _logger = Logger.getLogger(getClass().getName());
+
+    private final Map<String, String> _propertiesSetForTest = new HashMap<String, String>();
+
+    @Rule public TestName _testName = new TestName();
+
+    /**
+     * Set a System property for duration of this test only. The tearDown will
+     * guarantee to reset the property to its previous value after the test
+     * completes.
+     *
+     * @param property The property to set
+     * @param value the value to set it to, if null, the property will be cleared
+     */
+    protected void setTestSystemProperty(final String property, final String value)
+    {
+        if (!_propertiesSetForTest.containsKey(property))
+        {
+            // Record the current value so we can revert it later.
+            _propertiesSetForTest.put(property, System.getProperty(property));
+        }
+
+        if (value == null)
+        {
+            System.clearProperty(property);
+            _logger.info("Set system property '" + property + "' to be cleared");
+        }
+        else
+        {
+            System.setProperty(property, value);
+            _logger.info("Set system property '" + property + "' to: '" + value + "'");
+        }
+
+    }
+
+    /**
+     * Restore the System property values that were set by this test run.
+     */
+    protected void revertTestSystemProperties()
+    {
+        if(!_propertiesSetForTest.isEmpty())
+        {
+            for (String key : _propertiesSetForTest.keySet())
+            {
+                String value = _propertiesSetForTest.get(key);
+                if (value != null)
+                {
+                    System.setProperty(key, value);
+                    _logger.info("Reverted system property '" + key + "' to: '" + value + "'");
+                }
+                else
+                {
+                    System.clearProperty(key);
+                    _logger.info("Reverted system property '" + key + "' to be cleared");
+                }
+            }
+
+            _propertiesSetForTest.clear();
+        }
+    }
+
+    @After
+    public void tearDown() throws java.lang.Exception
+    {
+        _logger.info("========== tearDown " + getTestName() + " ==========");
+        revertTestSystemProperties();
+    }
+
+    @Before
+    public void setUp() throws Exception
+    {
+        _logger.info("========== start " + getTestName() + " ==========");
+    }
+
+    protected String getTestName()
+    {
+        return getClass().getSimpleName() + "." +_testName.getMethodName();
+    }
+}

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java Fri Sep 13 12:52:52 2013
@@ -18,16 +18,20 @@
  */
 package org.apache.qpid.jms;
 
+import static org.junit.Assert.assertNotNull;
+
 import javax.jms.Connection;
 import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 
+import org.apache.qpid.jms.impl.ReceivedMessageImpl;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.junit.Test;
 
-public class SessionIntegrationTest
+public class SessionIntegrationTest extends QpidJmsTestCase
 {
     private final IntegrationTestFixture _testFixture = new IntegrationTestFixture();
 
@@ -67,4 +71,31 @@ public class SessionIntegrationTest
             producer.send(message);
         }
     }
+
+    @Test
+    public void testSendReceiveTextMessageWithContent() throws Exception
+    {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer();
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+
+            // TODO check that it's a TextMessage with expected content: String expectedText = "myMessage";
+            ReceivedMessageImpl receivedMessage = (ReceivedMessageImpl) messageConsumer.receive(1000);
+            assertNotNull(receivedMessage);
+
+            testPeer.waitForAllHandlersToComplete();
+        }
+    }
 }

Added: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ImmediateWaitUntil.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ImmediateWaitUntil.java?rev=1522905&view=auto
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ImmediateWaitUntil.java (added)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ImmediateWaitUntil.java Fri Sep 13 12:52:52 2013
@@ -0,0 +1,41 @@
+package org.apache.qpid.jms.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Assert;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class ImmediateWaitUntil implements Answer<Void>
+{
+    @Override
+    public Void answer(InvocationOnMock invocation) throws Throwable
+    {
+        //verify the arg types / expected values
+        Object[] args = invocation.getArguments();
+        assertEquals(2, args.length);
+        assertTrue(args[0] instanceof Predicate);
+        assertTrue(args[1] instanceof Long);
+
+        Predicate predicate = (Predicate)args[0];
+
+        if(predicate.test())
+        {
+            return null;
+        }
+        else
+        {
+            throw new JmsTimeoutException(0, "ImmediateWaitUntil predicate test returned false: " + predicate);
+        }
+    }
+
+    public static void mockWaitUntil(ConnectionImpl connectionImpl) throws JmsTimeoutException, JmsInterruptedException
+    {
+        Assert.assertFalse("This method cant mock the method on a real object", connectionImpl.getClass() == ConnectionImpl.class);
+
+        Mockito.doAnswer(new ImmediateWaitUntil()).when(connectionImpl).waitUntil(Matchers.isA(Predicate.class), Matchers.anyLong());
+    }
+}
\ No newline at end of file

Added: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java?rev=1522905&view=auto
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java (added)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java Fri Sep 13 12:52:52 2013
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.qpid.jms.QpidJmsTestCase;
+import org.apache.qpid.jms.engine.AmqpMessage;
+import org.apache.qpid.jms.engine.AmqpReceiver;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ReceiverImplTest extends QpidJmsTestCase
+{
+    private final ConnectionImpl _mockConnection = Mockito.mock(ConnectionImpl.class);
+    private final AmqpReceiver _mockAmqpReceiver = Mockito.mock(AmqpReceiver.class);
+    private final SessionImpl _mockSession = Mockito.mock(SessionImpl.class);
+    private final AmqpMessage _mockAmqpMessage = Mockito.mock(AmqpMessage.class);
+
+    @Test
+    public void testNoMessageReceivedWhenConnectionNotStarted() throws Exception
+    {
+        Mockito.when(_mockConnection.isStarted()).thenReturn(false);
+        Mockito.when(_mockAmqpReceiver.receiveNoWait()).thenReturn(_mockAmqpMessage);
+
+        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+
+        ReceiverImpl receiver = new ReceiverImpl(_mockConnection, _mockSession, _mockAmqpReceiver);
+
+        assertNull("Should not receive a message when connection is not started", receiver.receive(1));
+    }
+
+    @Test
+    public void testMessageReceivedWhenConnectionIsStarted() throws Exception
+    {
+        Mockito.when(_mockConnection.isStarted()).thenReturn(true);
+        Mockito.when(_mockAmqpReceiver.receiveNoWait()).thenReturn(_mockAmqpMessage);
+        Mockito.when(_mockSession.getConnectionImpl()).thenReturn(_mockConnection);
+
+        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+
+        ReceiverImpl receiver = new ReceiverImpl(_mockConnection, _mockSession, _mockAmqpReceiver);
+
+        ReceivedMessageImpl messageImpl = (ReceivedMessageImpl) receiver.receive(1);
+        assertNotNull("Should not receive a message when connection is not started", messageImpl);
+        assertEquals("Underlying AmqpMessage should be the one provided", _mockAmqpMessage, messageImpl.getAmqpMessage());
+    }
+}

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java Fri Sep 13 12:52:52 2013
@@ -23,6 +23,7 @@ import static org.hamcrest.MatcherAssert
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.logging.Logger;
 
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -31,60 +32,80 @@ import org.hamcrest.Matcher;
 
 public abstract class AbstractFrameFieldAndPayloadMatchingHandler extends FrameMatchingHandler
 {
-    private final Map<Integer, Matcher<?>> _matchers;
-    private Map<Integer, Object> _receivedFields;
+    private final Logger _logger = Logger.getLogger(getClass().getName());
 
+    private final Map<Enum<?>, Matcher<?>> _fieldMatchers;
+    private Map<Enum<?>, Object> _receivedFields;
+
+    /**
+     * @param fieldMatchers a map of field matchers, keyed by enums representing the fields
+     * (the enums just need to have an ordinal number matching the AMQP spec field order,
+     * and preferably a sensible name)
+     */
     protected AbstractFrameFieldAndPayloadMatchingHandler(FrameType frameType,
                                                 int channel,
                                                 UnsignedLong numericDescriptor,
                                                 Symbol symbolicDescriptor,
-                                                Map<Integer, Matcher<?>> matchers,
+                                                Map<Enum<?>, Matcher<?>> fieldMatchers,
                                                 Runnable onSuccess)
     {
         super(frameType, channel, numericDescriptor, symbolicDescriptor, onSuccess);
-        _matchers = matchers;
+        _fieldMatchers = fieldMatchers;
     }
 
-    protected Map<Integer, Matcher<?>> getMatchers()
+    protected Map<Enum<?>, Matcher<?>> getMatchers()
     {
-        return _matchers;
+        return _fieldMatchers;
     }
 
+    /**
+     * Returns the received values, keyed by enums representing the fields
+     * (the enums have an ordinal number matching the AMQP spec field order,
+     * and a sensible name)
+     */
     @Override
-    protected Map<Integer, Object> getReceivedFields()
+    protected Map<Enum<?>, Object> getReceivedFields()
     {
         return _receivedFields;
     }
 
     @Override
-    protected void frame(List<Object> described, Binary payload)
+    protected void verifyFrame(List<Object> described, Binary payload)
     {
         verifyPayload(payload);
         verifyFields(described);
-
-        succeeded();
     }
 
     protected void verifyFields(List<Object> described)
     {
-        int i = 0;
-        HashMap<Integer, Object> valueMap = new HashMap<>();
+        int fieldNumber = 0;
+        HashMap<Enum<?>, Object> valueMap = new HashMap<>();
         for(Object value : described)
         {
-            valueMap.put(i++, value);
+            valueMap.put(getField(fieldNumber++), value);
         }
 
         _receivedFields = valueMap;
 
-        for(Map.Entry<Integer, Matcher<?>> entry : _matchers.entrySet())
+        _logger.fine("About to check the fields of the described type."
+                + "\n  Received:" + valueMap
+                + "\n  Expectations: " + _fieldMatchers);
+        for(Map.Entry<Enum<?>, Matcher<?>> entry : _fieldMatchers.entrySet())
         {
             @SuppressWarnings("unchecked")
             Matcher<Object> matcher = (Matcher<Object>) entry.getValue();
-            Integer field = entry.getKey();
-
-            assertThat("Field value should match", valueMap.get(field), matcher);
+            Enum<?> field = entry.getKey();
+            assertThat("Field " + field + " value should match", valueMap.get(field), matcher);
         }
     }
 
+    /**
+     * Intended to be overridden in most cases (but not necessarily all - hence not marked as abstract)
+     */
+    protected Enum<?> getField(int fieldIndex)
+    {
+        throw new UnsupportedOperationException("getFieldName is expected to be overridden by subclass if it is required");
+    }
+
     protected abstract void verifyPayload(Binary payload);
 }
\ No newline at end of file

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java Fri Sep 13 12:52:52 2013
@@ -31,7 +31,7 @@ import org.apache.qpid.proton.codec.Data
  */
 public class AmqpDataFramer
 {
-    private static final int CAPACITY = 1024;
+    private static final int CAPACITY = 2024;
     private static final byte FRAME_PREAMBLE_SIZE_IN_FOUR_BYTE_WORDS = 2;
 
     public static byte[] encodeFrame(FrameType type, int channel, DescribedType describedType, Binary payload)
@@ -43,7 +43,10 @@ public class AmqpDataFramer
         Data frameBody = Proton.data(CAPACITY);
         frameBody.putDescribedType(describedType);
         frameBody.encode(buffer);
-        //TODO: cope with payload
+        if(payload != null)
+        {
+            buffer.put(payload.asByteBuffer());
+        }
 
         int frameSize = buffer.position();
         buffer.rewind();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org