You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2015/03/02 14:40:15 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5594 -
mqtt and virtual topic subs;
more refined removing of destinations, as we don't want to remove all
descendant destination in a wildcard case
Repository: activemq
Updated Branches:
refs/heads/master adef03e5a -> 4f5774493
https://issues.apache.org/jira/browse/AMQ-5594 - mqtt and virtual topic subs; more refined removing of destinations, as we don't want to remove all descendant destination in a wildcard case
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4f577449
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4f577449
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4f577449
Branch: refs/heads/master
Commit: 4f577449344334ed95403317f06332d33429ca4b
Parents: adef03e
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Mon Mar 2 14:40:07 2015 +0100
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Mon Mar 2 14:40:07 2015 +0100
----------------------------------------------------------------------
.../activemq/broker/region/AbstractRegion.java | 2 +-
.../activemq/transport/mqtt/PahoMQTTTest.java | 63 +++++++++++++-------
.../broker/jmx/HealthViewMBeanTest.java | 1 +
.../activemq/filter/DestinationMapTest.java | 7 +++
.../activemq/security/SecurityJMXTest.java | 2 +-
.../activemq/security/simple-auth-broker.xml | 2 +-
6 files changed, 53 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/4f577449/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
index 53e8cdd..4f487bf 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
@@ -218,7 +218,7 @@ public abstract class AbstractRegion implements Region {
dest.removeSubscription(context, sub, 0l);
}
}
- destinationMap.removeAll(destination);
+ destinationMap.remove(destination, dest);
dispose(context, dest);
DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
if (destinationInterceptor != null) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/4f577449/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
index 2031c5d..fac843b 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
@@ -16,34 +16,26 @@
*/
package org.apache.activemq.transport.mqtt;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.util.Wait;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.*;
+
public class PahoMQTTTest extends MQTTTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class);
@@ -162,10 +154,39 @@ public class PahoMQTTTest extends MQTTTestSupport {
public boolean isSatisified() throws Exception {
return listener.result != null;
}
- });
+ }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200));
+
assertTrue(client.getPendingDeliveryTokens().length == 0);
assertEquals(expectedResult, listener.result);
+
+ expectedResult = "should get everything";
+ listener.result = null;
+ client.publish(ACCOUNT_PREFIX + "a/1/2", expectedResult.getBytes(), 0, false);
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return listener.result != null;
+ }
+ }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200));
+ assertEquals(expectedResult, listener.result);
+ assertTrue(client.getPendingDeliveryTokens().length == 0);
+
+ client.unsubscribe(ACCOUNT_PREFIX + "a/+/#");
+ client.unsubscribe(ACCOUNT_PREFIX + "#");
+ assertTrue(client.getPendingDeliveryTokens().length == 0);
+
+ expectedResult = "should still get 1/2/3";
+ listener.result = null;
+ client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(), 0, false);
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return listener.result != null;
+ }
+ }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200));
+ assertEquals(expectedResult, listener.result);
+ assertTrue(client.getPendingDeliveryTokens().length == 0);
}
@Test(timeout = 300000)
http://git-wip-us.apache.org/repos/asf/activemq/blob/4f577449/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java
index 9998be9..18eb0c7 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java
@@ -66,6 +66,7 @@ public class HealthViewMBeanTest extends EmbeddedBrokerTestSupport {
answer.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 * 64);
answer.getSystemUsage().getTempUsage().setLimit(1024 * 1024 * 64);
answer.getSystemUsage().getStoreUsage().setLimit(1024 * 1024 * 64);
+ answer.getSystemUsage().getJobSchedulerUsage().setLimit(1024 * 1024 * 64);
answer.setUseJmx(true);
answer.setSchedulerSupport(true);
http://git-wip-us.apache.org/repos/asf/activemq/blob/4f577449/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java
index 3cfbc64..755c3c3 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java
@@ -337,6 +337,13 @@ public class DestinationMapTest extends TestCase {
map.removeAll(createDestination("FOO.>"));
assertMapValue("FOO.A", null);
+
+ put("FOO.A", v1);
+ put("FOO.>", v2);
+
+ map.remove(createDestination("FOO.>"), v2);
+
+ assertMapValue("FOO.A", v1);
}
protected void loadSample2() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/4f577449/activemq-unit-tests/src/test/java/org/apache/activemq/security/SecurityJMXTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/SecurityJMXTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/SecurityJMXTest.java
index b62fccb..1222b3b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/security/SecurityJMXTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/SecurityJMXTest.java
@@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
public class SecurityJMXTest extends TestCase {
- private static final Logger LOG = LoggerFactory.getLogger(SimpleAuthenticationPluginTest.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SecurityJMXTest.class);
private BrokerService broker;
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/4f577449/activemq-unit-tests/src/test/resources/org/apache/activemq/security/simple-auth-broker.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/simple-auth-broker.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/simple-auth-broker.xml
index b682728..cb42730 100644
--- a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/simple-auth-broker.xml
+++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/simple-auth-broker.xml
@@ -43,7 +43,7 @@
<!-- Use a non-default port in case the default port is in use -->
<managementContext>
- <managementContext connectorPort="1199"/>
+ <managementContext connectorPort="1199" createConnector="true"/>
</managementContext>
<destinationPolicy>