You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/05/02 21:43:43 UTC
[2/2] qpid-jms-amqp-0-x git commit: QPID-8141: [JMS AMQP 0-x] Avoid
repeated exchange.declare when publishing when BURL address is in-use.
QPID-8141: [JMS AMQP 0-x] Avoid repeated exchange.declare when publishing when BURL address is in-use.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/commit/6a5ffcf4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/tree/6a5ffcf4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/diff/6a5ffcf4
Branch: refs/heads/master
Commit: 6a5ffcf484c5bfab4b0e8ca3453baf9a7ba0c1c0
Parents: a762b9d
Author: Keith Wall <kw...@apache.org>
Authored: Wed May 2 22:32:30 2018 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Wed May 2 22:42:43 2018 +0100
----------------------------------------------------------------------
.../java/org/apache/qpid/client/AMQSession.java | 40 ++++++++------------
.../AddressBasedDestinationTest.java | 23 +++++++----
2 files changed, 32 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6a5ffcf4/client/src/main/java/org/apache/qpid/client/AMQSession.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/AMQSession.java b/client/src/main/java/org/apache/qpid/client/AMQSession.java
index cba764e..b5d4870 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -30,7 +30,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.Objects;
import java.util.UUID;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -150,8 +150,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
protected final boolean DAEMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER);
- private final Set<AMQDestination>
- _resolvedDestinations = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<AMQDestination, Boolean>()));
+ private final Map<AMQDestination, AMQDestination>
+ _resolvedDestinations = Collections.synchronizedMap(new WeakHashMap<AMQDestination, AMQDestination>());
private final long _dispatcherShutdownTimeoutMs;
@@ -661,7 +661,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
void setResolved(final AMQDestination dest)
{
- _resolvedDestinations.add(dest);
+ _resolvedDestinations.put(dest, dest);
}
void setUnresolved(final AMQDestination dest)
@@ -676,30 +676,22 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
boolean isResolved(final AMQDestination dest)
{
- if (!_resolvedDestinations.contains(dest))
+ AMQDestination resolvedDest = _resolvedDestinations.get(dest);
+ if (resolvedDest == dest)
{
- return false;
- }
-
- if (dest.getAddressType() == AMQDestination.QUEUE_TYPE)
- {
- // verify legacy fields are set
- return dest.getQueueName() != null
- && dest.getQueueName().equals(dest.getAddressName())
- && dest.getExchangeName() != null
- && dest.getExchangeClass() != null
- && dest.getRoutingKey() != null;
+ return true;
}
- else if (dest.getAddressType() == AMQDestination.TOPIC_TYPE)
+ else if (resolvedDest == null)
{
- // verify legacy fields are set
- return dest.getExchangeName() != null
- && dest.getExchangeName().equals(dest.getAddressName())
- && dest.getExchangeClass() != null
- && (dest.getSubject() == null
- || (dest.getSubject() != null && dest.getSubject().equals(dest.getRoutingKey())));
+ return false;
}
- return false;
+
+ // verify legacy fields are equal
+ return Objects.equals(dest.getQueueName(), resolvedDest.getQueueName()) &&
+ Objects.equals(dest.getExchangeName(), resolvedDest.getExchangeName()) &&
+ Objects.equals(dest.getExchangeClass(), resolvedDest.getExchangeClass()) &&
+ Objects.equals(dest.getRoutingKey(), resolvedDest.getRoutingKey()) &&
+ Objects.equals(dest.getSubject(), resolvedDest.getSubject());
}
public abstract int resolveAddressType(AMQDestination dest) throws QpidException;
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6a5ffcf4/systests/src/test/java/org/apache/qpid/systest/destination/AddressBasedDestinationTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/destination/AddressBasedDestinationTest.java b/systests/src/test/java/org/apache/qpid/systest/destination/AddressBasedDestinationTest.java
index 395b410..1665283 100644
--- a/systests/src/test/java/org/apache/qpid/systest/destination/AddressBasedDestinationTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/destination/AddressBasedDestinationTest.java
@@ -26,6 +26,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -373,16 +374,21 @@ public class AddressBasedDestinationTest extends JmsTestBase
}
}
+ /** QPID-8141 - Publishing to a duplicately declared queue silently dropped messages. */
@Test
- public void ensureQueueDestinationAlwaysResolved() throws Exception
+ public void publishToDuplicatelyDeclaredQueue() throws Exception
{
String address = String.format("ADDR:%s; {create: always, node: {type: queue}}", getTestName());
Session session = _connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(null);
- producer.send(session.createQueue(address), session.createTextMessage("A"));
- producer.send(session.createQueue(address), session.createTextMessage("B"));
+ Queue queue = session.createQueue(address);
+ Queue dupQueue = session.createQueue(address);
+ assertNotSame(queue, dupQueue);
+
+ producer.send(queue, session.createTextMessage("A"));
+ producer.send(dupQueue, session.createTextMessage("B"));
session.commit();
MessageConsumer consumer = session.createConsumer(session.createQueue(address));
@@ -397,17 +403,20 @@ public class AddressBasedDestinationTest extends JmsTestBase
assertEquals("Unexpected content of message B", "B", ((TextMessage) messageB).getText());
}
-
@Test
- public void ensureTopicDestinationAlwaysResolved() throws Exception
+ public void publishToDuplicatelyDeclaredTopic() throws Exception
{
String address = String.format("ADDR:amq.topic/%s; {node: {type: topic}}", getTestName());
Session session = _connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(session.createTopic(address));
MessageProducer producer = session.createProducer(null);
- producer.send(session.createTopic(address), session.createTextMessage("A"));
- producer.send(session.createTopic(address), session.createTextMessage("B"));
+ Topic topic = session.createTopic(address);
+ Topic dupTopic = session.createTopic(address);
+ assertNotSame(topic, dupTopic);
+
+ producer.send(topic, session.createTextMessage("A"));
+ producer.send(dupTopic, session.createTextMessage("B"));
session.commit();
Message messageA = consumer.receive(getReceiveTimeout());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org