You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2013/09/13 14:52:54 UTC
svn commit: r1522905 [1/3] - in /qpid/jms/trunk: ./
src/main/java/org/apache/qpid/jms/engine/
src/main/java/org/apache/qpid/jms/impl/ src/test/java/org/apache/qpid/jms/
src/test/java/org/apache/qpid/jms/impl/
src/test/java/org/apache/qpid/jms/test/test...
Author: robbie
Date: Fri Sep 13 12:52:52 2013
New Revision: 1522905
URL: http://svn.apache.org/r1522905
Log:
QPIDJMS-3: got message consumption integration test working
Work by Phil Harvey and myself.
Added:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessageToken.java
- copied, changed from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionException.java
- copied, changed from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/JmsInterruptedException.java
- copied, changed from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/JmsTimeoutException.java
- copied, changed from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkException.java
- copied, changed from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/QpidJmsException.java
- copied, changed from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageTokenImpl.java
- copied, changed from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageImpl.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/QpidJmsTestCase.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ImmediateWaitUntil.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/DescriptorMatcher.java
Removed:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceivedMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageImpl.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/JmsTest.java
Modified:
qpid/jms/trunk/pom.xml
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriver.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceivedMessageImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ConnectionIntegrationTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/FrameMatchingHandler.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/FrameSender.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithNoPayloadMatchingHandler.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithPayloadMatchingHandler.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/HeaderHandlerImpl.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/ListDescribedType.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/Accepted.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/AttachFrame.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/BeginFrame.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/CloseFrame.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/DetachFrame.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/DispositionFrame.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/EndFrame.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/FlowFrame.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/Modified.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/OpenFrame.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/Rejected.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/Released.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/SaslChallengeFrame.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/SaslInitFrame.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/SaslMechanismsFrame.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/SaslOutcomeFrame.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/SaslResponseFrame.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/Source.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/Target.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/TransferFrame.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/generate-frames.xsl
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/AttachMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/BeginMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/CloseMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/DetachMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/DispositionMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/EndMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/FlowMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/OpenMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/SaslChallengeMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/SaslInitMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/SaslMechanismsMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/SaslOutcomeMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/SaslResponseMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/TransferMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/generate-matchers.xsl
qpid/jms/trunk/src/test/resources/logging.properties
Modified: qpid/jms/trunk/pom.xml
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/pom.xml?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/pom.xml (original)
+++ qpid/jms/trunk/pom.xml Fri Sep 13 12:52:52 2013
@@ -70,6 +70,12 @@
<version>${junit-version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>1.9.5</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java Fri Sep 13 12:52:52 2013
@@ -49,16 +49,17 @@ import org.apache.qpid.proton.message.Me
* Obtain the {@link AmqpConnection} lock first to use them in a thread-safe
* manner.
*
- * TODO more tightly define the thread-safety of this class, e.g. should setSasl etc be synchronized?
*/
@SuppressWarnings("rawtypes")
public class AmqpConnection
{
- /** default timeout in milliseconds */
- public static final long TIMEOUT = 10_000L;
-
private static Logger _logger = Logger.getLogger("qpid.jms-client.connection");
+ /**
+ * Default timeout in milliseconds.
+ * TODO define a proper way for the timeout to be specified.
+ */
+ public static final long TIMEOUT = Long.getLong("org.apache.qpid.jms.connection.timeout", 10_000L);
private static final ProtonFactoryLoader protonFactoryLoader = new ProtonFactoryLoader();
@@ -153,7 +154,7 @@ public class AmqpConnection
/**
* @return the username
*/
- public String getUsername()
+ public synchronized String getUsername()
{
return _username;
}
@@ -161,7 +162,7 @@ public class AmqpConnection
/**
* @param username the username to set
*/
- public void setUsername(String username)
+ public synchronized void setUsername(String username)
{
_username = username;
}
@@ -169,7 +170,7 @@ public class AmqpConnection
/**
* @return the password
*/
- public String getPassword()
+ public synchronized String getPassword()
{
return _password;
}
@@ -177,7 +178,7 @@ public class AmqpConnection
/**
* @param password the password to set
*/
- public void setPassword(String password)
+ public synchronized void setPassword(String password)
{
_password = password;
}
@@ -387,7 +388,7 @@ public class AmqpConnection
return (MessageFactory) protonFactoryLoader.loadFactory(MessageFactory.class);
}
- public void setSasl(Sasl sasl)
+ public synchronized void setSasl(Sasl sasl)
{
_sasl = sasl;
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriver.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriver.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriver.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriver.java Fri Sep 13 12:52:52 2013
@@ -82,12 +82,14 @@ public class AmqpConnectionDriver
int port = amqpConnection.getPort();
SocketChannel channel = null;
+ String threadName = null;
try
{
channel = SocketChannel.open();
channel.configureBlocking(true);
channel.connect(new InetSocketAddress(remoteHost, port));
channel.configureBlocking(false);
+ threadName = "DriverRunnable-" + channel.getLocalAddress() + "/" + channel.getRemoteAddress();
}
catch (IOException e)
{
@@ -108,7 +110,8 @@ public class AmqpConnectionDriver
amqpConnection.setSasl(sasl);
_driverRunnable = new DriverRunnable();
- _driverThread = new Thread(_driverRunnable); // TODO set a sensible thread name
+ _driverThread = new Thread(_driverRunnable);
+ _driverThread.setName(threadName);
_driverThread.start();
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java Fri Sep 13 12:52:52 2013
@@ -1,4 +1,5 @@
/*
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,25 +16,87 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
+ *
*/
package org.apache.qpid.jms.engine;
import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
import org.apache.qpid.proton.message.Message;
+/**
+ * Thread-safe (all state is guarded by the corresponding {@link AmqpConnection} monitor)
+ *
+ */
public class AmqpMessage
{
- private Message _message;
+ private final AmqpReceiver _amqpReceiver;
+ private final Delivery _delivery;
+ private final Message _message;
+
+ public AmqpMessage(Delivery delivery, Message message, AmqpReceiver amqpReceiver)
+ {
+ _delivery = delivery;
+ _amqpReceiver = amqpReceiver;
+ _message = message;
+ }
+
+ /**
+ * Currently used when creating a message that we intend to send
+ */
public AmqpMessage()
{
_message = Proton.message();
+ _amqpReceiver = null;
+ _delivery = null;
}
- //TODO: restrict visibility of the Proton Message.
- public Message getMessage()
+ Message getMessage()
{
return _message;
}
+ public void accept(boolean settle)
+ {
+ synchronized (_amqpReceiver.getAmqpConnection())
+ {
+ _delivery.disposition(Accepted.getInstance());
+ if(settle)
+ {
+ _delivery.settle();
+ }
+ }
+ }
+
+ public void settle()
+ {
+ synchronized (_amqpReceiver.getAmqpConnection())
+ {
+ _delivery.settle();
+ }
+ }
+
+ /**
+ * If using proton-j, returns true if locally or remotely settled.
+ * If using proton-c, returns true if remotely settled.
+ * TODO - remove this hack when Proton-J and -C APIs are properly aligned
+ * The C API defines isSettled as being true if the delivery has been settled locally OR remotely
+ */
+ public boolean isSettled()
+ {
+ synchronized (_amqpReceiver.getAmqpConnection())
+ {
+ return _delivery.isSettled() || ((_delivery instanceof DeliveryImpl && ((DeliveryImpl)_delivery).remotelySettled()));
+ }
+ }
+
+ public void setText(String string)
+ {
+ AmqpValue body = new AmqpValue(string);
+ _message.setBody(body);
+ }
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java Fri Sep 13 12:52:52 2013
@@ -40,7 +40,7 @@ public class AmqpReceiver extends AmqpLi
_protonReceiver.flow(credit);
}
- public AmqpReceivedMessage receiveNoWait()
+ public AmqpMessage receiveNoWait()
{
synchronized (getAmqpConnection())
{
@@ -74,7 +74,7 @@ public class AmqpReceiver extends AmqpLi
Message message = getAmqpConnection().getMessageFactory().createMessage();
message.decode(_buffer, 0, total);
- AmqpReceivedMessage amqpMessage = new AmqpReceivedMessage(currentDelivery, message, this);
+ AmqpMessage amqpMessage = new AmqpMessage(currentDelivery, message, this);
currentDelivery.setContext(amqpMessage);
_protonReceiver.advance();
return amqpMessage;
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java Fri Sep 13 12:52:52 2013
@@ -38,7 +38,7 @@ public class AmqpSender extends AmqpLink
_protonSender = protonSender;
}
- public AmqpSentMessage sendMessage(AmqpMessage amqpMessage)
+ public AmqpSentMessageToken sendMessage(AmqpMessage amqpMessage)
{
synchronized (getAmqpConnection())
{
@@ -66,7 +66,7 @@ public class AmqpSender extends AmqpLink
_protonSender.send(_buffer, 0, encoded);
_protonSender.advance();
- AmqpSentMessage amqpSentMessage = new AmqpSentMessage(del, this);
+ AmqpSentMessageToken amqpSentMessage = new AmqpSentMessageToken(del, this);
del.setContext(amqpSentMessage);
return amqpSentMessage;
Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessageToken.java (from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessage.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessageToken.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessageToken.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessage.java&r1=1519004&r2=1522905&rev=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessage.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessageToken.java Fri Sep 13 12:52:52 2013
@@ -23,13 +23,13 @@ package org.apache.qpid.jms.engine;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery;
-public class AmqpSentMessage
+public class AmqpSentMessageToken
{
private Delivery _delivery;
private AmqpSender _amqpSender;
private AmqpConnection _amqpConnection;
- public AmqpSentMessage(Delivery delivery, AmqpSender sender)
+ public AmqpSentMessageToken(Delivery delivery, AmqpSender sender)
{
_delivery = delivery;
_amqpSender = sender;
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java Fri Sep 13 12:52:52 2013
@@ -98,12 +98,21 @@ public class AmqpSession
return amqpSender;
}
- public AmqpReceiver createAmqpReceiver(String name, String address)
+ public AmqpReceiver createAmqpReceiver(String address)
{
+ String name = address + "->" + UUID.randomUUID().toString();
Receiver protonReceiver = _protonSession.receiver(name);
+
Source source = new Source();
source.setAddress(address);
protonReceiver.setSource(source);
+
+ Target target = new Target();
+ protonReceiver.setTarget(target);
+
+ protonReceiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ protonReceiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
AmqpReceiver amqpReceiver = new AmqpReceiver(this, protonReceiver);
protonReceiver.setContext(amqpReceiver);
protonReceiver.open();
Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionException.java (from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionException.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionException.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java&r1=1519004&r2=1522905&rev=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionException.java Fri Sep 13 12:52:52 2013
@@ -18,10 +18,9 @@
* under the License.
*
*/
-package org.apache.qpid.jms.engine;
+package org.apache.qpid.jms.impl;
-//TODO make me (or wrap me in) a JMSException
-public class ConnectionException extends Exception
+public class ConnectionException extends QpidJmsException
{
private static final long serialVersionUID = 419676688719664719L;
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java Fri Sep 13 12:52:52 2013
@@ -37,8 +37,6 @@ import javax.jms.Topic;
import org.apache.qpid.jms.engine.AmqpConnection;
import org.apache.qpid.jms.engine.AmqpConnectionDriver;
import org.apache.qpid.jms.engine.AmqpSession;
-import org.apache.qpid.jms.engine.ConnectionException;
-import org.apache.qpid.proton.TimeoutException;
/**
* A JMS connection.
@@ -48,6 +46,8 @@ import org.apache.qpid.proton.TimeoutExc
* <li>Other internal classes must use the connection's lock and state-change methods -
* see {@link #lock()}/{@link #releaseLock()} and {@link #stateChanged()} for details.</li>
* </ul>
+ *
+ * TODO wherever we throws JMSException, throw a subclass that has the cause set
*/
public class ConnectionImpl implements Connection
{
@@ -60,6 +60,8 @@ public class ConnectionImpl implements C
private ConnectionLock _connectionLock;
+ private volatile boolean _isStarted;
+
/**
* TODO: accept a client id
* TODO: defer connection to the broker if client has not been set. Defer it until any other method is called.
@@ -87,22 +89,13 @@ public class ConnectionImpl implements C
connect();
}
- catch(InterruptedException e)
+ catch (IOException e)
{
- Thread.currentThread().interrupt();
- JMSException jmse = new JMSException("Interrupted while trying to create connection");
- jmse.setLinkedException(e);
- throw jmse;
- }
- catch (TimeoutException | IOException | ConnectionException e)
- {
- JMSException jmse = new JMSException("Unable to create connection");
- jmse.setLinkedException(e);
- throw jmse;
+ throw new QpidJmsException("Unable to create connection", e);
}
}
- void waitUntil(Predicate condition, long timeoutMillis) throws TimeoutException, InterruptedException
+ void waitUntil(Predicate condition, long timeoutMillis) throws JmsTimeoutException, JmsInterruptedException
{
long deadline = timeoutMillis < 0 ? Long.MAX_VALUE : System.currentTimeMillis() + timeoutMillis;
@@ -122,7 +115,19 @@ public class ConnectionImpl implements C
}
if (wait && !done && !first)
{
- _amqpConnection.wait(timeoutMillis < 0 ? 0 : deadline - System.currentTimeMillis());
+ try
+ {
+ _amqpConnection.wait(timeoutMillis < 0 ? 0 : deadline - System.currentTimeMillis());
+ }
+ catch (InterruptedException e)
+ {
+ //Note we are not setting the interrupted status, as it
+ //is likely that user code will reenter the client code to
+ //perform e.g close/rollback/etc and setting the status
+ //could erroneously make those fail.
+ throw new JmsInterruptedException("Interrupted while waiting for conditition: "
+ + condition.getCurrentState() , e);
+ }
}
wait = deadline > System.currentTimeMillis();
@@ -138,12 +143,12 @@ public class ConnectionImpl implements C
if (!done)
{
- throw new TimeoutException(timeoutMillis, condition.getCurrentState());
+ throw new JmsTimeoutException(timeoutMillis, condition.getCurrentState());
}
}
}
- private void connect() throws IOException, ConnectionException, TimeoutException, InterruptedException
+ private void connect() throws IOException, ConnectionException, JmsTimeoutException, JmsInterruptedException
{
lock();
try
@@ -157,19 +162,20 @@ public class ConnectionImpl implements C
}
}, AmqpConnection.TIMEOUT);
+ //TODO: sort out exception throwing
if(_amqpConnection.getConnectionError().getCondition() != null)
{
- throw new ConnectionException("Connection failed: 1 " + _amqpConnection.getConnectionError());
+ throw new ConnectionException("Connection failed: " + _amqpConnection.getConnectionError());
}
if(_amqpConnection.isAuthenticationError())
{
- throw new ConnectionException("Connection failed: 2");
+ throw new ConnectionException("Connection failed: authentication failure");
}
if(!_amqpConnection.isConnected())
{
- throw new ConnectionException("Connection failed: 3");
+ throw new ConnectionException("Connection failed");
}
}
finally
@@ -204,16 +210,7 @@ public class ConnectionImpl implements C
}
catch(InterruptedException e)
{
- Thread.currentThread().interrupt();
- JMSException jmse = new JMSException("Interrupted while trying to close connection");
- jmse.setLinkedException(e);
- throw jmse;
- }
- catch (TimeoutException | ConnectionException e)
- {
- JMSException jmse = new JMSException("Unable to close connection");
- jmse.setLinkedException(e);
- throw jmse;
+ throw new JmsInterruptedException("Interrupted while trying to close connection", e);
}
finally
{
@@ -221,9 +218,6 @@ public class ConnectionImpl implements C
}
}
- /**
- * TODO the params are ignored - fix this
- */
@Override
public SessionImpl createSession(boolean transacted, int acknowledgeMode) throws JMSException
{
@@ -247,19 +241,6 @@ public class ConnectionImpl implements C
return session;
}
- catch (TimeoutException e)
- {
- JMSException jmse = new JMSException("Unable to create session");
- jmse.setLinkedException(e);
- throw jmse;
- }
- catch(InterruptedException e)
- {
- Thread.currentThread().interrupt();
- JMSException jmse = new JMSException("Interrupted while trying to create session");
- jmse.setLinkedException(e);
- throw jmse;
- }
finally
{
releaseLock();
@@ -267,14 +248,6 @@ public class ConnectionImpl implements C
}
/**
- * TODO add @Override when we start implementing the JMS2 API.
- */
- public Session createSession() throws JMSException
- {
- return createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- /**
* <p>
* Acquire the connection lock.
* </p>
@@ -351,8 +324,7 @@ public class ConnectionImpl implements C
@Override
public void start() throws JMSException
{
- // PHTODO Auto-generated method stub
- throw new UnsupportedOperationException("PHTODO");
+ _isStarted = true;
}
@Override
@@ -376,4 +348,8 @@ public class ConnectionImpl implements C
throw new UnsupportedOperationException("PHTODO");
}
+ boolean isStarted()
+ {
+ return _isStarted;
+ }
}
Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/JmsInterruptedException.java (from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/JmsInterruptedException.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/JmsInterruptedException.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java&r1=1519004&r2=1522905&rev=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/JmsInterruptedException.java Fri Sep 13 12:52:52 2013
@@ -18,15 +18,20 @@
* under the License.
*
*/
-package org.apache.qpid.jms.engine;
+package org.apache.qpid.jms.impl;
-// TODO make me (or wrap me in) a JMSException
-public class LinkException extends Exception
+/**
+ * Used to signal that we have been interrupted.
+ *
+ * When we throw this, the interrupted status is not set on the thread.
+ */
+public class JmsInterruptedException extends QpidJmsException
{
- private static final long serialVersionUID = 419676688719664719L;
+ private static final long serialVersionUID = 384180653752426597L;
- public LinkException(String msg)
+ public JmsInterruptedException(String reason, InterruptedException cause)
{
- super(msg);
+ super(reason, cause);
}
+
}
Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/JmsTimeoutException.java (from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/JmsTimeoutException.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/JmsTimeoutException.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java&r1=1519004&r2=1522905&rev=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/JmsTimeoutException.java Fri Sep 13 12:52:52 2013
@@ -18,15 +18,14 @@
* under the License.
*
*/
-package org.apache.qpid.jms.engine;
+package org.apache.qpid.jms.impl;
-//TODO make me (or wrap me in) a JMSException
-public class ConnectionException extends Exception
+public class JmsTimeoutException extends QpidJmsException
{
- private static final long serialVersionUID = 419676688719664719L;
+ private static final long serialVersionUID = 7486676055343430641L;
- public ConnectionException(String msg)
+ public JmsTimeoutException(long timeoutMillis, String pendingCondition)
{
- super(msg);
+ super("Timed out after " + timeoutMillis + " ms waiting for condition: " + pendingCondition);
}
}
Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkException.java (from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkException.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkException.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java&r1=1519004&r2=1522905&rev=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkException.java Fri Sep 13 12:52:52 2013
@@ -18,10 +18,9 @@
* under the License.
*
*/
-package org.apache.qpid.jms.engine;
+package org.apache.qpid.jms.impl;
-// TODO make me (or wrap me in) a JMSException
-public class LinkException extends Exception
+public class LinkException extends QpidJmsException
{
private static final long serialVersionUID = 419676688719664719L;
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java Fri Sep 13 12:52:52 2013
@@ -24,23 +24,19 @@ import javax.jms.JMSException;
import org.apache.qpid.jms.engine.AmqpConnection;
import org.apache.qpid.jms.engine.AmqpLink;
-import org.apache.qpid.jms.engine.LinkException;
-import org.apache.qpid.proton.TimeoutException;
public class LinkImpl
{
- private SessionImpl _sessionImpl;
private ConnectionImpl _connectionImpl;
private AmqpLink _amqpLink;
- public LinkImpl(SessionImpl sessionImpl, AmqpLink amqpLink)
+ public LinkImpl(ConnectionImpl connectionImpl, AmqpLink amqpLink)
{
- _sessionImpl = sessionImpl;
- _connectionImpl = _sessionImpl.getConnectionImpl();
+ _connectionImpl = connectionImpl;
_amqpLink = amqpLink;
}
- public void establish() throws TimeoutException, InterruptedException, LinkException
+ public void establish() throws LinkException, JmsTimeoutException, JmsInterruptedException
{
_connectionImpl.waitUntil(new SimplePredicate("Link is established or failed", _amqpLink)
{
@@ -52,7 +48,7 @@ public class LinkImpl
}, AmqpConnection.TIMEOUT);
if(!_amqpLink.isEstablished())
{
- throw new LinkException("Failed to establish link " + _amqpLink); // TODO make message less verbose
+ throw new LinkException("Failed to establish link " + _amqpLink);
}
}
@@ -63,40 +59,14 @@ public class LinkImpl
{
_amqpLink.close();
_connectionImpl.stateChanged();
- while(!_amqpLink.isClosed())
+ _connectionImpl.waitUntil(new SimplePredicate("Link is closed", _amqpLink)
{
- try
+ @Override
+ public boolean test()
{
- _connectionImpl.waitUntil(new SimplePredicate("Link is closed", _amqpLink)
- {
- @Override
- public boolean test()
- {
- return _amqpLink.isClosed();
- }
- }, AmqpConnection.TIMEOUT);
+ return _amqpLink.isClosed();
}
- catch (TimeoutException e)
- {
- JMSException jmse = new JMSException("Unable to close link");
- jmse.setLinkedException(e);
- throw jmse;
- }
- catch(InterruptedException e)
- {
- Thread.currentThread().interrupt();
- JMSException jmse = new JMSException("Interrupted while trying to close link");
- jmse.setLinkedException(e);
- throw jmse;
- }
-
- }
-
- //TODO: link errors? E.g:
- // if(_amqpSender.getLinkError().getCondition() != null)
- // {
- // throw new ConnectionException("Sender close failed: " + _amqpSender.getLinkError());
- // }
+ }, AmqpConnection.TIMEOUT);
}
finally
{
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java Fri Sep 13 12:52:52 2013
@@ -32,10 +32,14 @@ public abstract class MessageImpl implem
protected MessageImpl()
{
- //TODO: move to AmqpSession.createMessage()?
_amqpMessage = new AmqpMessage();
}
+ AmqpMessage getAmqpMessage()
+ {
+ return _amqpMessage;
+ }
+
@Override
public String getJMSMessageID() throws JMSException
{
@@ -351,9 +355,5 @@ public abstract class MessageImpl implem
throw new UnsupportedOperationException("PHTODO");
}
- public AmqpMessage getAmqpMessage()
- {
- return _amqpMessage;
- }
}
Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/QpidJmsException.java (from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/QpidJmsException.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/QpidJmsException.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java&r1=1519004&r2=1522905&rev=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/QpidJmsException.java Fri Sep 13 12:52:52 2013
@@ -1,4 +1,5 @@
/*
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,25 +16,28 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
+ *
*/
-package org.apache.qpid.jms.engine;
+package org.apache.qpid.jms.impl;
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.message.Message;
+import javax.jms.JMSException;
-public class AmqpMessage
+public class QpidJmsException extends JMSException
{
- private Message _message;
+ private static final long serialVersionUID = 751932967255393054L;
- public AmqpMessage()
+ public QpidJmsException(String reason)
{
- _message = Proton.message();
+ super(reason);
}
- //TODO: restrict visibility of the Proton Message.
- public Message getMessage()
+ public QpidJmsException(String reason, Exception cause)
{
- return _message;
+ super(reason);
+ if (cause != null)
+ {
+ setLinkedException(cause);
+ initCause(cause);
+ }
}
-
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceivedMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceivedMessageImpl.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceivedMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceivedMessageImpl.java Fri Sep 13 12:52:52 2013
@@ -20,76 +20,357 @@
*/
package org.apache.qpid.jms.impl;
-import org.apache.qpid.jms.engine.AmqpConnection;
-import org.apache.qpid.jms.engine.AmqpReceivedMessage;
-import org.apache.qpid.proton.TimeoutException;
-import org.apache.qpid.proton.message.Message;
+import java.util.Enumeration;
-public class ReceivedMessageImpl
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.engine.AmqpMessage;
+
+public class ReceivedMessageImpl implements javax.jms.Message // TODO inherit from sendable base class
{
- private AmqpReceivedMessage _amqpMessage;
- private ReceiverImpl _receiverImpl;
+ private final AmqpMessage _amqpMessage;
+ private final SessionImpl _sessionImpl;
+ private final ConnectionImpl _connectionImpl;
- public ReceivedMessageImpl(AmqpReceivedMessage amqpMessage, ReceiverImpl receiverImpl)
+ public ReceivedMessageImpl(AmqpMessage amqpMessage, SessionImpl sessionImpl)
{
_amqpMessage = amqpMessage;
- _receiverImpl = receiverImpl;
+ _sessionImpl = sessionImpl;
+ _connectionImpl = _sessionImpl.getConnectionImpl();
}
public void accept(boolean settle)
{
- _receiverImpl.getConnectionImpl().lock();
+ _connectionImpl.lock();
try
{
- _amqpMessage.accept();
- if(settle)
- {
- _amqpMessage.settle();
- }
- _receiverImpl.getConnectionImpl().stateChanged();
+ _amqpMessage.accept(settle);
+ _connectionImpl.stateChanged();
}
finally
{
- _receiverImpl.getConnectionImpl().releaseLock();
+ _connectionImpl.releaseLock();
}
}
- public void settle()
+ AmqpMessage getAmqpMessage()
{
- _receiverImpl.getConnectionImpl().lock();
- try
- {
- _amqpMessage.settle();
- _receiverImpl.getConnectionImpl().stateChanged();
- }
- finally
- {
- _receiverImpl.getConnectionImpl().releaseLock();
- }
+ return _amqpMessage;
}
- public void waitUntilSettled() throws TimeoutException, InterruptedException
+ @Override
+ public String getJMSMessageID() throws JMSException
{
- _receiverImpl.getConnectionImpl().lock();
- try
- {
- _receiverImpl.getConnectionImpl().waitUntil(new SimplePredicate("Message is settled", _amqpMessage)
- {
- @Override
- public boolean test()
- {
- return _amqpMessage.isSettled();
- }
- }, AmqpConnection.TIMEOUT);
- }
- finally
- {
- _receiverImpl.getConnectionImpl().releaseLock();
- }
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setJMSMessageID(String id) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public long getJMSTimestamp() throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setJMSTimestamp(long timestamp) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public byte[] getJMSCorrelationIDAsBytes() throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setJMSCorrelationID(String correlationID) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public String getJMSCorrelationID() throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public Destination getJMSReplyTo() throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setJMSReplyTo(Destination replyTo) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public Destination getJMSDestination() throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setJMSDestination(Destination destination) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public int getJMSDeliveryMode() throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setJMSDeliveryMode(int deliveryMode) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public boolean getJMSRedelivered() throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setJMSRedelivered(boolean redelivered) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public String getJMSType() throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setJMSType(String type) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public long getJMSExpiration() throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setJMSExpiration(long expiration) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public int getJMSPriority() throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setJMSPriority(int priority) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void clearProperties() throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public boolean propertyExists(String name) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public boolean getBooleanProperty(String name) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public byte getByteProperty(String name) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public short getShortProperty(String name) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public int getIntProperty(String name) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public long getLongProperty(String name) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public float getFloatProperty(String name) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public double getDoubleProperty(String name) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public String getStringProperty(String name) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public Object getObjectProperty(String name) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public Enumeration getPropertyNames() throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setBooleanProperty(String name, boolean value) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setByteProperty(String name, byte value) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setShortProperty(String name, short value) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setIntProperty(String name, int value) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setLongProperty(String name, long value) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setFloatProperty(String name, float value) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setDoubleProperty(String name, double value) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setStringProperty(String name, String value) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setObjectProperty(String name, Object value) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void acknowledge() throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
}
- public Message getMessage()
+ @Override
+ public void clearBody() throws JMSException
{
- return _amqpMessage.getMessage();
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
}
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java Fri Sep 13 12:52:52 2013
@@ -20,30 +20,58 @@
*/
package org.apache.qpid.jms.impl;
-import org.apache.qpid.jms.engine.AmqpReceivedMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+
+import org.apache.qpid.jms.engine.AmqpMessage;
import org.apache.qpid.jms.engine.AmqpReceiver;
-import org.apache.qpid.proton.TimeoutException;
-public class ReceiverImpl extends LinkImpl
+public class ReceiverImpl extends LinkImpl implements MessageConsumer
{
private final AmqpReceiver _amqpReceiver;
+ private final SessionImpl _sessionImpl;
- public ReceiverImpl(SessionImpl sessionImpl, AmqpReceiver amqpReceiver)
+ public ReceiverImpl(ConnectionImpl connectionImpl, SessionImpl sessionImpl, AmqpReceiver amqpReceiver)
{
- super(sessionImpl, amqpReceiver);
+ super(connectionImpl, amqpReceiver);
+ _sessionImpl = sessionImpl;
_amqpReceiver = amqpReceiver;
}
- public ReceivedMessageImpl receive(long timeout) throws TimeoutException, InterruptedException
+ @Override
+ public Message receive() throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public Message receive(long timeout) throws JMSException
{
getConnectionImpl().lock();
try
{
+ if(!getConnectionImpl().isStarted())
+ {
+ return null;
+ }
+
MessageReceivedPredicate messageReceievedCondition = new MessageReceivedPredicate();
getConnectionImpl().waitUntil(messageReceievedCondition, timeout);
- getConnectionImpl().stateChanged();
- return new ReceivedMessageImpl(messageReceievedCondition.getReceivedMessage(), this);
+ //TODO: decide what if any particular message impl class to instantiate
+ //TODO: accepting/settling will be acknowledge-mode dependent
+ ReceivedMessageImpl receivedMessageImpl = new ReceivedMessageImpl(messageReceievedCondition.getReceivedMessage(), _sessionImpl);
+ receivedMessageImpl.accept(true);
+ getConnectionImpl().stateChanged();
+ return receivedMessageImpl;
+ }
+ catch (JmsTimeoutException e)
+ {
+ //No message in allotted time, return null to signal this
+ return null;
}
finally
{
@@ -67,7 +95,7 @@ public class ReceiverImpl extends LinkIm
private final class MessageReceivedPredicate extends SimplePredicate
{
- AmqpReceivedMessage _message;
+ AmqpMessage _message;
public MessageReceivedPredicate()
{
@@ -84,10 +112,38 @@ public class ReceiverImpl extends LinkIm
return _message != null;
}
- public AmqpReceivedMessage getReceivedMessage()
+ public AmqpMessage getReceivedMessage()
{
return _message;
}
}
+ @Override
+ public String getMessageSelector() throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public MessageListener getMessageListener() throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public void setMessageListener(MessageListener listener) throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
+ @Override
+ public Message receiveNoWait() throws JMSException
+ {
+ // PHTODO Auto-generated method stub
+ throw new UnsupportedOperationException("PHTODO");
+ }
+
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java Fri Sep 13 12:52:52 2013
@@ -27,8 +27,7 @@ import javax.jms.MessageProducer;
import org.apache.qpid.jms.engine.AmqpMessage;
import org.apache.qpid.jms.engine.AmqpSender;
-import org.apache.qpid.jms.engine.AmqpSentMessage;
-import org.apache.qpid.proton.TimeoutException;
+import org.apache.qpid.jms.engine.AmqpSentMessageToken;
public class SenderImpl extends LinkImpl implements MessageProducer
{
@@ -36,7 +35,7 @@ public class SenderImpl extends LinkImpl
public SenderImpl(SessionImpl sessionImpl, AmqpSender amqpSender)
{
- super(sessionImpl, amqpSender);
+ super(sessionImpl.getConnectionImpl(), amqpSender);
_amqpSender = amqpSender;
}
@@ -48,28 +47,14 @@ public class SenderImpl extends LinkImpl
{
AmqpMessage amqpMessage = getAmqpMessageFromJmsMessage(message);
- AmqpSentMessage sentMessage = _amqpSender.sendMessage(amqpMessage);
+ AmqpSentMessageToken sentMessage = _amqpSender.sendMessage(amqpMessage);
getConnectionImpl().stateChanged();
- SentMessageImpl sentMessageImpl = new SentMessageImpl(sentMessage, this);
+ SentMessageTokenImpl sentMessageImpl = new SentMessageTokenImpl(sentMessage, this);
sentMessageImpl.waitUntilAccepted();
sentMessage.settle();
}
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- JMSException jmse = new JMSException("Interrupted while trying to send messages");
- jmse.setLinkedException(e);
- throw jmse;
- }
- catch (TimeoutException e)
- {
- JMSException jmse = new JMSException("Timed out during send");
- e.printStackTrace();
- jmse.setLinkedException(e);
- throw jmse;
- }
finally
{
getConnectionImpl().releaseLock();
Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageTokenImpl.java (from r1519004, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageImpl.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageTokenImpl.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageTokenImpl.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageImpl.java&r1=1519004&r2=1522905&rev=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageTokenImpl.java Fri Sep 13 12:52:52 2013
@@ -21,28 +21,31 @@
package org.apache.qpid.jms.impl;
import org.apache.qpid.jms.engine.AmqpConnection;
-import org.apache.qpid.jms.engine.AmqpSentMessage;
-import org.apache.qpid.proton.TimeoutException;
+import org.apache.qpid.jms.engine.AmqpSentMessageToken;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
-public class SentMessageImpl
+public class SentMessageTokenImpl
{
- private AmqpSentMessage _sentMessage;
+ private AmqpSentMessageToken _sentMessage;
private SenderImpl _sender;
- public SentMessageImpl(AmqpSentMessage sentMessage, SenderImpl sender)
+ public SentMessageTokenImpl(AmqpSentMessageToken sentMessage, SenderImpl sender)
{
_sentMessage = sentMessage;
_sender = sender;
}
- public void waitUntilAccepted() throws TimeoutException, InterruptedException
+ public void waitUntilAccepted() throws JmsTimeoutException, JmsInterruptedException
{
_sender.getConnectionImpl().waitUntil(new SimplePredicate("Remote delivery state exists", _sentMessage)
{
@Override
public boolean test()
{
- return _sentMessage.getRemoteDeliveryState() != null; //TODO: should we actually check it is *accepted*?
+ DeliveryState remoteDeliveryState = _sentMessage.getRemoteDeliveryState();
+ return remoteDeliveryState != null && Accepted.getInstance().equals(remoteDeliveryState);
+ //TODO: throw an exception if it isn't accepted.
}
}, AmqpConnection.TIMEOUT);
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java Fri Sep 13 12:52:52 2013
@@ -21,10 +21,10 @@
package org.apache.qpid.jms.impl;
import java.io.Serializable;
-import java.util.UUID;
import javax.jms.BytesMessage;
import javax.jms.Destination;
+import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
@@ -46,12 +46,10 @@ import org.apache.qpid.jms.engine.AmqpCo
import org.apache.qpid.jms.engine.AmqpReceiver;
import org.apache.qpid.jms.engine.AmqpSender;
import org.apache.qpid.jms.engine.AmqpSession;
-import org.apache.qpid.jms.engine.ConnectionException;
-import org.apache.qpid.jms.engine.LinkException;
-import org.apache.qpid.proton.TimeoutException;
public class SessionImpl implements Session
{
+ private static final int INITIAL_RECEIVER_CREDIT = 1;
private AmqpSession _amqpSession;
private ConnectionImpl _connectionImpl;
@@ -61,7 +59,7 @@ public class SessionImpl implements Sess
_connectionImpl = connectionImpl;
}
- public void establish() throws TimeoutException, InterruptedException
+ public void establish() throws JmsTimeoutException, JmsInterruptedException
{
_connectionImpl.waitUntil(new SimplePredicate("Session established", _amqpSession)
{
@@ -98,19 +96,6 @@ public class SessionImpl implements Sess
throw new ConnectionException("Session close failed: " + _amqpSession.getSessionError());
}
}
- catch(InterruptedException e)
- {
- Thread.currentThread().interrupt();
- JMSException jmse = new JMSException("Interrupted while trying to close session");
- jmse.setLinkedException(e);
- throw jmse;
- }
- catch (TimeoutException | ConnectionException e)
- {
- JMSException jmse = new JMSException("Unable to close session");
- jmse.setLinkedException(e);
- throw jmse;
- }
finally
{
_connectionImpl.releaseLock();
@@ -133,8 +118,7 @@ public class SessionImpl implements Sess
else if (destination instanceof Queue)
{
Queue queue = (Queue) destination;
- String senderName = "producer-" + queue.getQueueName() + "-" + UUID.randomUUID();
- return createSender(senderName, queue.getQueueName());
+ return createSender(queue.getQueueName());
}
else if(destination instanceof Topic)
{
@@ -147,7 +131,7 @@ public class SessionImpl implements Sess
}
- private SenderImpl createSender(String senderName, String address) throws JMSException
+ private SenderImpl createSender(String address) throws JMSException
{
_connectionImpl.lock();
try
@@ -158,34 +142,51 @@ public class SessionImpl implements Sess
sender.establish();
return sender;
}
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- JMSException jmse = new JMSException("Interrupted while trying to create sender");
- jmse.setLinkedException(e);
- throw jmse;
- }
- catch (TimeoutException | LinkException e)
- {
- JMSException jmse = new JMSException("Unable to create sender: " + e.getMessage());
- jmse.setLinkedException(e);
- throw jmse;
- }
finally
{
_connectionImpl.releaseLock();
}
}
- public ReceiverImpl createReceiver(String name, String address) throws TimeoutException, InterruptedException, LinkException
+ @Override
+ public MessageConsumer createConsumer(Destination destination) throws JMSException
+ {
+ if(destination == null)
+ {
+ throw new InvalidDestinationException("Null destination provided");
+ }
+ else if (destination instanceof Queue)
+ {
+ Queue queue = (Queue) destination;
+ return createReceiver(queue.getQueueName());
+ }
+ else if(destination instanceof Topic)
+ {
+ throw new UnsupportedOperationException("Topics are not yet supported");
+ }
+ else
+ {
+ throw new IllegalArgumentException("Destination expected to be a Queue or a Topic but was: " + destination.getClass());
+ }
+ }
+
+ private ReceiverImpl createReceiver(String address) throws JMSException
{
_connectionImpl.lock();
try
{
- AmqpReceiver amqpReceiver = _amqpSession.createAmqpReceiver(name, address);
- ReceiverImpl receiver = new ReceiverImpl(this, amqpReceiver);
+ AmqpReceiver amqpReceiver = _amqpSession.createAmqpReceiver(address);
+ ReceiverImpl receiver = new ReceiverImpl(_connectionImpl, this, amqpReceiver);
_connectionImpl.stateChanged();
receiver.establish();
+
+ if(_connectionImpl.isStarted())
+ {
+ //issue initial flow for the consumer
+ amqpReceiver.credit(INITIAL_RECEIVER_CREDIT);
+ _connectionImpl.stateChanged();
+ }
+
return receiver;
}
finally
@@ -305,13 +306,6 @@ public class SessionImpl implements Sess
}
@Override
- public MessageConsumer createConsumer(Destination destination) throws JMSException
- {
- // PHTODO Auto-generated method stub
- throw new UnsupportedOperationException("PHTODO");
- }
-
- @Override
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
// PHTODO Auto-generated method stub
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java Fri Sep 13 12:52:52 2013
@@ -21,8 +21,6 @@ package org.apache.qpid.jms.impl;
import javax.jms.JMSException;
import javax.jms.TextMessage;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-
public class TextMessageImpl extends MessageImpl implements TextMessage
{
public TextMessageImpl() throws JMSException
@@ -45,10 +43,7 @@ public class TextMessageImpl extends Mes
@Override
public void setText(String string) throws JMSException
{
- AmqpValue body = new AmqpValue(string);
-
- //TODO: stop accessing the Proton Message directly
- getAmqpMessage().getMessage().setBody(body);
+ getAmqpMessage().setText(string);
}
}
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ConnectionIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ConnectionIntegrationTest.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ConnectionIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ConnectionIntegrationTest.java Fri Sep 13 12:52:52 2013
@@ -29,8 +29,7 @@ import org.apache.qpid.jms.test.testpeer
import org.junit.Test;
// TODO find a way to make the test abort immediately if the TestAmqpPeer throws an exception
-// TODO add tests such as testBrokerDoesNotRespond and testBrokerSendsWrongFrame
-public class ConnectionIntegrationTest
+public class ConnectionIntegrationTest extends QpidJmsTestCase
{
private final IntegrationTestFixture _testFixture = new IntegrationTestFixture();
Added: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/QpidJmsTestCase.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/QpidJmsTestCase.java?rev=1522905&view=auto
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/QpidJmsTestCase.java (added)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/QpidJmsTestCase.java Fri Sep 13 12:52:52 2013
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+public class QpidJmsTestCase
+{
+ private final Logger _logger = Logger.getLogger(getClass().getName());
+
+ private final Map<String, String> _propertiesSetForTest = new HashMap<String, String>();
+
+ @Rule public TestName _testName = new TestName();
+
+ /**
+ * Set a System property for duration of this test only. The tearDown will
+ * guarantee to reset the property to its previous value after the test
+ * completes.
+ *
+ * @param property The property to set
+ * @param value the value to set it to, if null, the property will be cleared
+ */
+ protected void setTestSystemProperty(final String property, final String value)
+ {
+ if (!_propertiesSetForTest.containsKey(property))
+ {
+ // Record the current value so we can revert it later.
+ _propertiesSetForTest.put(property, System.getProperty(property));
+ }
+
+ if (value == null)
+ {
+ System.clearProperty(property);
+ _logger.info("Set system property '" + property + "' to be cleared");
+ }
+ else
+ {
+ System.setProperty(property, value);
+ _logger.info("Set system property '" + property + "' to: '" + value + "'");
+ }
+
+ }
+
+ /**
+ * Restore the System property values that were set by this test run.
+ */
+ protected void revertTestSystemProperties()
+ {
+ if(!_propertiesSetForTest.isEmpty())
+ {
+ for (String key : _propertiesSetForTest.keySet())
+ {
+ String value = _propertiesSetForTest.get(key);
+ if (value != null)
+ {
+ System.setProperty(key, value);
+ _logger.info("Reverted system property '" + key + "' to: '" + value + "'");
+ }
+ else
+ {
+ System.clearProperty(key);
+ _logger.info("Reverted system property '" + key + "' to be cleared");
+ }
+ }
+
+ _propertiesSetForTest.clear();
+ }
+ }
+
+ @After
+ public void tearDown() throws java.lang.Exception
+ {
+ _logger.info("========== tearDown " + getTestName() + " ==========");
+ revertTestSystemProperties();
+ }
+
+ @Before
+ public void setUp() throws Exception
+ {
+ _logger.info("========== start " + getTestName() + " ==========");
+ }
+
+ protected String getTestName()
+ {
+ return getClass().getSimpleName() + "." +_testName.getMethodName();
+ }
+}
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java Fri Sep 13 12:52:52 2013
@@ -18,16 +18,20 @@
*/
package org.apache.qpid.jms;
+import static org.junit.Assert.assertNotNull;
+
import javax.jms.Connection;
import javax.jms.Message;
+import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import org.apache.qpid.jms.impl.ReceivedMessageImpl;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.junit.Test;
-public class SessionIntegrationTest
+public class SessionIntegrationTest extends QpidJmsTestCase
{
private final IntegrationTestFixture _testFixture = new IntegrationTestFixture();
@@ -67,4 +71,31 @@ public class SessionIntegrationTest
producer.send(message);
}
}
+
+ @Test
+ public void testSendReceiveTextMessageWithContent() throws Exception
+ {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer();
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+
+ // TODO check that it's a TextMessage with expected content: String expectedText = "myMessage";
+ ReceivedMessageImpl receivedMessage = (ReceivedMessageImpl) messageConsumer.receive(1000);
+ assertNotNull(receivedMessage);
+
+ testPeer.waitForAllHandlersToComplete();
+ }
+ }
}
Added: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ImmediateWaitUntil.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ImmediateWaitUntil.java?rev=1522905&view=auto
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ImmediateWaitUntil.java (added)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ImmediateWaitUntil.java Fri Sep 13 12:52:52 2013
@@ -0,0 +1,41 @@
+package org.apache.qpid.jms.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Assert;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class ImmediateWaitUntil implements Answer<Void>
+{
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable
+ {
+ //verify the arg types / expected values
+ Object[] args = invocation.getArguments();
+ assertEquals(2, args.length);
+ assertTrue(args[0] instanceof Predicate);
+ assertTrue(args[1] instanceof Long);
+
+ Predicate predicate = (Predicate)args[0];
+
+ if(predicate.test())
+ {
+ return null;
+ }
+ else
+ {
+ throw new JmsTimeoutException(0, "ImmediateWaitUntil predicate test returned false: " + predicate);
+ }
+ }
+
+ public static void mockWaitUntil(ConnectionImpl connectionImpl) throws JmsTimeoutException, JmsInterruptedException
+ {
+ Assert.assertFalse("This method cant mock the method on a real object", connectionImpl.getClass() == ConnectionImpl.class);
+
+ Mockito.doAnswer(new ImmediateWaitUntil()).when(connectionImpl).waitUntil(Matchers.isA(Predicate.class), Matchers.anyLong());
+ }
+}
\ No newline at end of file
Added: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java?rev=1522905&view=auto
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java (added)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java Fri Sep 13 12:52:52 2013
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.qpid.jms.QpidJmsTestCase;
+import org.apache.qpid.jms.engine.AmqpMessage;
+import org.apache.qpid.jms.engine.AmqpReceiver;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ReceiverImplTest extends QpidJmsTestCase
+{
+ private final ConnectionImpl _mockConnection = Mockito.mock(ConnectionImpl.class);
+ private final AmqpReceiver _mockAmqpReceiver = Mockito.mock(AmqpReceiver.class);
+ private final SessionImpl _mockSession = Mockito.mock(SessionImpl.class);
+ private final AmqpMessage _mockAmqpMessage = Mockito.mock(AmqpMessage.class);
+
+ @Test
+ public void testNoMessageReceivedWhenConnectionNotStarted() throws Exception
+ {
+ Mockito.when(_mockConnection.isStarted()).thenReturn(false);
+ Mockito.when(_mockAmqpReceiver.receiveNoWait()).thenReturn(_mockAmqpMessage);
+
+ ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+
+ ReceiverImpl receiver = new ReceiverImpl(_mockConnection, _mockSession, _mockAmqpReceiver);
+
+ assertNull("Should not receive a message when connection is not started", receiver.receive(1));
+ }
+
+ @Test
+ public void testMessageReceivedWhenConnectionIsStarted() throws Exception
+ {
+ Mockito.when(_mockConnection.isStarted()).thenReturn(true);
+ Mockito.when(_mockAmqpReceiver.receiveNoWait()).thenReturn(_mockAmqpMessage);
+ Mockito.when(_mockSession.getConnectionImpl()).thenReturn(_mockConnection);
+
+ ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+
+ ReceiverImpl receiver = new ReceiverImpl(_mockConnection, _mockSession, _mockAmqpReceiver);
+
+ ReceivedMessageImpl messageImpl = (ReceivedMessageImpl) receiver.receive(1);
+ assertNotNull("Should not receive a message when connection is not started", messageImpl);
+ assertEquals("Underlying AmqpMessage should be the one provided", _mockAmqpMessage, messageImpl.getAmqpMessage());
+ }
+}
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java Fri Sep 13 12:52:52 2013
@@ -23,6 +23,7 @@ import static org.hamcrest.MatcherAssert
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.logging.Logger;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
@@ -31,60 +32,80 @@ import org.hamcrest.Matcher;
public abstract class AbstractFrameFieldAndPayloadMatchingHandler extends FrameMatchingHandler
{
- private final Map<Integer, Matcher<?>> _matchers;
- private Map<Integer, Object> _receivedFields;
+ private final Logger _logger = Logger.getLogger(getClass().getName());
+ private final Map<Enum<?>, Matcher<?>> _fieldMatchers;
+ private Map<Enum<?>, Object> _receivedFields;
+
+ /**
+ * @param fieldMatchers a map of field matchers, keyed by enums representing the fields
+ * (the enums just need to have an ordinal number matching the AMQP spec field order,
+ * and preferably a sensible name)
+ */
protected AbstractFrameFieldAndPayloadMatchingHandler(FrameType frameType,
int channel,
UnsignedLong numericDescriptor,
Symbol symbolicDescriptor,
- Map<Integer, Matcher<?>> matchers,
+ Map<Enum<?>, Matcher<?>> fieldMatchers,
Runnable onSuccess)
{
super(frameType, channel, numericDescriptor, symbolicDescriptor, onSuccess);
- _matchers = matchers;
+ _fieldMatchers = fieldMatchers;
}
- protected Map<Integer, Matcher<?>> getMatchers()
+ protected Map<Enum<?>, Matcher<?>> getMatchers()
{
- return _matchers;
+ return _fieldMatchers;
}
+ /**
+ * Returns the received values, keyed by enums representing the fields
+ * (the enums have an ordinal number matching the AMQP spec field order,
+ * and a sensible name)
+ */
@Override
- protected Map<Integer, Object> getReceivedFields()
+ protected Map<Enum<?>, Object> getReceivedFields()
{
return _receivedFields;
}
@Override
- protected void frame(List<Object> described, Binary payload)
+ protected void verifyFrame(List<Object> described, Binary payload)
{
verifyPayload(payload);
verifyFields(described);
-
- succeeded();
}
protected void verifyFields(List<Object> described)
{
- int i = 0;
- HashMap<Integer, Object> valueMap = new HashMap<>();
+ int fieldNumber = 0;
+ HashMap<Enum<?>, Object> valueMap = new HashMap<>();
for(Object value : described)
{
- valueMap.put(i++, value);
+ valueMap.put(getField(fieldNumber++), value);
}
_receivedFields = valueMap;
- for(Map.Entry<Integer, Matcher<?>> entry : _matchers.entrySet())
+ _logger.fine("About to check the fields of the described type."
+ + "\n Received:" + valueMap
+ + "\n Expectations: " + _fieldMatchers);
+ for(Map.Entry<Enum<?>, Matcher<?>> entry : _fieldMatchers.entrySet())
{
@SuppressWarnings("unchecked")
Matcher<Object> matcher = (Matcher<Object>) entry.getValue();
- Integer field = entry.getKey();
-
- assertThat("Field value should match", valueMap.get(field), matcher);
+ Enum<?> field = entry.getKey();
+ assertThat("Field " + field + " value should match", valueMap.get(field), matcher);
}
}
+ /**
+ * Intended to be overridden in most cases (but not necessarily all - hence not marked as abstract)
+ */
+ protected Enum<?> getField(int fieldIndex)
+ {
+ throw new UnsupportedOperationException("getFieldName is expected to be overridden by subclass if it is required");
+ }
+
protected abstract void verifyPayload(Binary payload);
}
\ No newline at end of file
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java?rev=1522905&r1=1522904&r2=1522905&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java Fri Sep 13 12:52:52 2013
@@ -31,7 +31,7 @@ import org.apache.qpid.proton.codec.Data
*/
public class AmqpDataFramer
{
- private static final int CAPACITY = 1024;
+ private static final int CAPACITY = 2024;
private static final byte FRAME_PREAMBLE_SIZE_IN_FOUR_BYTE_WORDS = 2;
public static byte[] encodeFrame(FrameType type, int channel, DescribedType describedType, Binary payload)
@@ -43,7 +43,10 @@ public class AmqpDataFramer
Data frameBody = Proton.data(CAPACITY);
frameBody.putDescribedType(describedType);
frameBody.encode(buffer);
- //TODO: cope with payload
+ if(payload != null)
+ {
+ buffer.put(payload.asByteBuffer());
+ }
int frameSize = buffer.position();
buffer.rewind();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org