You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/12/02 04:44:16 UTC
[3/4] activemq-artemis git commit: Add 'routing-type' to divert
Add 'routing-type' to divert
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/77684850
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/77684850
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/77684850
Branch: refs/heads/ARTEMIS-780
Commit: 77684850b19cf0e5c167381d58bea7a9d04dfbff
Parents: f575900
Author: jbertram <jb...@apache.com>
Authored: Thu Dec 1 22:30:37 2016 -0600
Committer: jbertram <jb...@apache.com>
Committed: Thu Dec 1 22:43:55 2016 -0600
----------------------------------------------------------------------
.../config/ActiveMQDefaultConfiguration.java | 10 ++
.../core/management/ActiveMQServerControl.java | 10 ++
.../api/core/management/DivertControl.java | 6 +
.../artemis/core/server/RoutingType.java | 10 +-
.../core/config/DivertConfiguration.java | 21 +++
.../artemis/core/config/impl/Validators.java | 14 ++
.../deployers/impl/FileConfigurationParser.java | 4 +-
.../impl/ActiveMQServerControlImpl.java | 14 +-
.../core/management/impl/DivertControlImpl.java | 10 ++
.../core/server/ActiveMQMessageBundle.java | 3 +
.../core/server/impl/ActiveMQServerImpl.java | 2 +-
.../artemis/core/server/impl/DivertImpl.java | 23 +++-
.../resources/schema/artemis-configuration.xsd | 16 +++
.../tests/integration/divert/DivertTest.java | 136 +++++++++++++++++++
.../ActiveMQServerControlUsingCoreTest.java | 12 ++
.../management/DivertControlUsingCoreTest.java | 5 +
16 files changed, 291 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index e75c663..b4518fe 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -360,6 +360,9 @@ public final class ActiveMQDefaultConfiguration {
// whether this is an exclusive divert
private static boolean DEFAULT_DIVERT_EXCLUSIVE = false;
+ // how the divert should handle the message's routing type
+ private static String DEFAULT_DIVERT_ROUTING_TYPE = RoutingType.STRIP.toString();
+
// If true then the server will request a backup on another node
private static boolean DEFAULT_HAPOLICY_REQUEST_BACKUP = false;
@@ -1007,6 +1010,13 @@ public final class ActiveMQDefaultConfiguration {
}
/**
+ * how the divert should handle the message's routing type
+ */
+ public static String getDefaultDivertRoutingType() {
+ return DEFAULT_DIVERT_ROUTING_TYPE;
+ }
+
+ /**
* If true then the server will request a backup on another node
*/
public static boolean isDefaultHapolicyRequestBackup() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index cd257c6..1797c9a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -899,6 +899,16 @@ public interface ActiveMQServerControl {
@Parameter(name = "filterString", desc = "Filter of the divert") String filterString,
@Parameter(name = "transformerClassName", desc = "Class name of the divert's transformer") String transformerClassName) throws Exception;
+ @Operation(desc = "Create a Divert", impact = MBeanOperationInfo.ACTION)
+ void createDivert(@Parameter(name = "name", desc = "Name of the divert") String name,
+ @Parameter(name = "routingName", desc = "Routing name of the divert") String routingName,
+ @Parameter(name = "address", desc = "Address to divert from") String address,
+ @Parameter(name = "forwardingAddress", desc = "Address to divert to") String forwardingAddress,
+ @Parameter(name = "exclusive", desc = "Is the divert exclusive?") boolean exclusive,
+ @Parameter(name = "filterString", desc = "Filter of the divert") String filterString,
+ @Parameter(name = "transformerClassName", desc = "Class name of the divert's transformer") String transformerClassName,
+ @Parameter(name = "routingType", desc = "How should the routing-type on the diverted messages be set?") String routingType) throws Exception;
+
@Operation(desc = "Destroy a Divert", impact = MBeanOperationInfo.ACTION)
void destroyDivert(@Parameter(name = "name", desc = "Name of the divert") String name) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/DivertControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/DivertControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/DivertControl.java
index c99646b..7c103ca 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/DivertControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/DivertControl.java
@@ -65,4 +65,10 @@ public interface DivertControl {
*/
@Attribute(desc = "name of the org.apache.activemq.artemis.core.server.cluster.Transformer implementation associated with this divert")
String getTransformerClassName();
+
+ /**
+ * Returns the routing type used by this divert.
+ */
+ @Attribute(desc = "routing type used by this divert")
+ String getRoutingType();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java
index 2f17335..c9b1d09 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java
@@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.server;
public enum RoutingType {
- MULTICAST, ANYCAST;
+ MULTICAST, ANYCAST, STRIP, PASS;
public byte getType() {
switch (this) {
@@ -26,6 +26,10 @@ public enum RoutingType {
return 0;
case ANYCAST:
return 1;
+ case STRIP:
+ return 2;
+ case PASS:
+ return 3;
default:
return -1;
}
@@ -37,6 +41,10 @@ public enum RoutingType {
return MULTICAST;
case 1:
return ANYCAST;
+ case 2:
+ return STRIP;
+ case 3:
+ return PASS;
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java
index a769f17..5326c72 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.config;
import java.io.Serializable;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.utils.UUIDGenerator;
public class DivertConfiguration implements Serializable {
@@ -39,6 +40,8 @@ public class DivertConfiguration implements Serializable {
private String transformerClassName = null;
+ private RoutingType routingType = RoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType());
+
public DivertConfiguration() {
}
@@ -70,6 +73,10 @@ public class DivertConfiguration implements Serializable {
return transformerClassName;
}
+ public RoutingType getRoutingType() {
+ return routingType;
+ }
+
/**
* @param name the name to set
*/
@@ -130,6 +137,14 @@ public class DivertConfiguration implements Serializable {
return this;
}
+ /**
+ * @param routingType the routingType to set
+ */
+ public DivertConfiguration setRoutingType(final RoutingType routingType) {
+ this.routingType = routingType;
+ return this;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
@@ -141,6 +156,7 @@ public class DivertConfiguration implements Serializable {
result = prime * result + ((name == null) ? 0 : name.hashCode());
result = prime * result + ((routingName == null) ? 0 : routingName.hashCode());
result = prime * result + ((transformerClassName == null) ? 0 : transformerClassName.hashCode());
+ result = prime * result + ((routingType == null) ? 0 : routingType.hashCode());
return result;
}
@@ -185,6 +201,11 @@ public class DivertConfiguration implements Serializable {
return false;
} else if (!transformerClassName.equals(other.transformerClassName))
return false;
+ if (routingType == null) {
+ if (other.routingType != null)
+ return false;
+ } else if (!routingType.equals(other.routingType))
+ return false;
return true;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
index bc57978..3e9bb4c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.config.impl;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
@@ -164,6 +165,19 @@ public final class Validators {
}
};
+ public static final Validator ROUTING_TYPE = new Validator() {
+ @Override
+ public void validate(final String name, final Object value) {
+ String val = (String) value;
+ if (val == null || !val.equals(RoutingType.ANYCAST.toString()) &&
+ !val.equals(RoutingType.MULTICAST.toString()) &&
+ !val.equals(RoutingType.PASS.toString()) &&
+ !val.equals(RoutingType.STRIP.toString())) {
+ throw ActiveMQMessageBundle.BUNDLE.invalidRoutingType(val);
+ }
+ }
+ };
+
public static final Validator MAX_QUEUE_CONSUMERS = new Validator() {
@Override
public void validate(String name, Object value) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 44d1a07..7b98602 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -1554,6 +1554,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
String transformerClassName = getString(e, "transformer-class-name", null, Validators.NO_CHECK);
+ RoutingType routingType = RoutingType.valueOf(getString(e, "routing-type", ActiveMQDefaultConfiguration.getDefaultDivertRoutingType(), Validators.ROUTING_TYPE));
+
String filterString = null;
NodeList children = e.getChildNodes();
@@ -1566,7 +1568,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
}
}
- DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName);
+ DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName).setRoutingType(routingType);
mainConfig.getDivertConfigurations().add(config);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 82f3943..4464062 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -1895,11 +1895,23 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
final boolean exclusive,
final String filterString,
final String transformerClassName) throws Exception {
+ createDivert(name, routingName, address, forwardingAddress, exclusive, filterString, transformerClassName, ActiveMQDefaultConfiguration.getDefaultDivertRoutingType());
+ }
+
+ @Override
+ public void createDivert(final String name,
+ final String routingName,
+ final String address,
+ final String forwardingAddress,
+ final boolean exclusive,
+ final String filterString,
+ final String transformerClassName,
+ final String routingType) throws Exception {
checkStarted();
clearIO();
try {
- DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName);
+ DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName).setRoutingType(RoutingType.valueOf(routingType));
server.deployDivert(config);
} finally {
blockOnIO();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/DivertControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/DivertControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/DivertControlImpl.java
index 6c47778..e87e333 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/DivertControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/DivertControlImpl.java
@@ -99,6 +99,16 @@ public class DivertControlImpl extends AbstractControl implements DivertControl
}
@Override
+ public String getRoutingType() {
+ clearIO();
+ try {
+ return configuration.getRoutingType().toString();
+ } finally {
+ blockOnIO();
+ }
+ }
+
+ @Override
public String getUniqueName() {
clearIO();
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 1c20ba5..ee8f0ef 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -410,4 +410,7 @@ public interface ActiveMQMessageBundle {
IllegalArgumentException invalidRoutingTypeForAddress(RoutingType routingType,
String address,
Set<RoutingType> supportedRoutingTypes);
+
+ @Message(id = 119208, value = "Invalid routing type {0}", format = Message.Format.MESSAGE_FORMAT)
+ IllegalArgumentException invalidRoutingType(String val);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index aadcba9..aebcb9a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1802,7 +1802,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
Filter filter = FilterImpl.createFilter(config.getFilterString());
- Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()), sName, new SimpleString(config.getRoutingName()), config.isExclusive(), filter, transformer, postOffice, storageManager);
+ Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()), sName, new SimpleString(config.getRoutingName()), config.isExclusive(), filter, transformer, postOffice, storageManager, config.getRoutingType());
Binding binding = new DivertBinding(storageManager.generateID(), sAddress, divert);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index 5782379..fd55521 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -16,12 +16,14 @@
*/
package org.apache.activemq.artemis.core.server.impl;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.jboss.logging.Logger;
@@ -49,6 +51,8 @@ public class DivertImpl implements Divert {
private final StorageManager storageManager;
+ private final RoutingType routingType;
+
public DivertImpl(final SimpleString forwardAddress,
final SimpleString uniqueName,
final SimpleString routingName,
@@ -56,7 +60,8 @@ public class DivertImpl implements Divert {
final Filter filter,
final Transformer transformer,
final PostOffice postOffice,
- final StorageManager storageManager) {
+ final StorageManager storageManager,
+ final RoutingType routingType) {
this.forwardAddress = forwardAddress;
this.uniqueName = uniqueName;
@@ -72,6 +77,8 @@ public class DivertImpl implements Divert {
this.postOffice = postOffice;
this.storageManager = storageManager;
+
+ this.routingType = routingType;
}
@Override
@@ -97,6 +104,20 @@ public class DivertImpl implements Divert {
copy.setExpiration(message.getExpiration());
+ switch (routingType) {
+ case ANYCAST:
+ copy.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
+ break;
+ case MULTICAST:
+ copy.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
+ break;
+ case STRIP:
+ copy.removeProperty(Message.HDR_ROUTING_TYPE);
+ break;
+ case PASS:
+ break;
+ }
+
if (transformer != null) {
copy = transformer.transform(copy);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 7069c09..c9d1f5b 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -1568,6 +1568,22 @@
</xsd:element>
<xsd:element ref="filter" maxOccurs="1" minOccurs="0"/>
+
+ <xsd:element name="routing-type" default="STRIP" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ how should the routing-type on the diverted messages be set?
+ </xsd:documentation>
+ </xsd:annotation>
+ <xsd:simpleType>
+ <xsd:restriction base="xsd:string">
+ <xsd:enumeration value="ANYCAST"/>
+ <xsd:enumeration value="MULTICAST"/>
+ <xsd:enumeration value="STRIP"/>
+ <xsd:enumeration value="PASS"/>
+ </xsd:restriction>
+ </xsd:simpleType>
+ </xsd:element>
</xsd:all>
<xsd:attribute name="name" type="xsd:ID" use="required">
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
index a9501d8..8774088 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
@@ -122,6 +122,142 @@ public class DivertTest extends ActiveMQTestBase {
}
@Test
+ public void testSingleNonExclusiveDivertWithRoutingType() throws Exception {
+ final String testAddress = "testAddress";
+
+ final String forwardAddress = "forwardAddress";
+
+ DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress);
+
+ Configuration config = createDefaultInVMConfig().addDivertConfiguration(divertConf);
+
+ ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false));
+
+ server.start();
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = createSessionFactory(locator);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ final SimpleString queueName1 = new SimpleString("queue1");
+
+ final SimpleString queueName2 = new SimpleString("queue2");
+
+ session.createQueue(new SimpleString(forwardAddress), RoutingType.ANYCAST, queueName1, null, false);
+
+ session.createQueue(new SimpleString(testAddress), RoutingType.MULTICAST, queueName2, null, false);
+
+ session.start();
+
+ ClientProducer producer = session.createProducer(new SimpleString(testAddress));
+
+ ClientConsumer consumer1 = session.createConsumer(queueName1);
+
+ ClientConsumer consumer2 = session.createConsumer(queueName2);
+
+ final int numMessages = 1;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = session.createMessage(false);
+
+ message.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
+
+ message.putIntProperty(propKey, i);
+
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = consumer1.receive(DivertTest.TIMEOUT);
+
+ Assert.assertNotNull(message);
+
+ Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ Assert.assertNull(consumer1.receiveImmediate());
+
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = consumer2.receive(DivertTest.TIMEOUT);
+
+ Assert.assertNotNull(message);
+
+ Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ Assert.assertNull(consumer2.receiveImmediate());
+ }
+
+ @Test
+ public void testSingleExclusiveDivertWithRoutingType() throws Exception {
+ final String testAddress = "testAddress";
+
+ final String forwardAddress = "forwardAddress";
+
+ DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress).setExclusive(true);
+
+ Configuration config = createDefaultInVMConfig().addDivertConfiguration(divertConf);
+
+ ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false));
+
+ server.start();
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = createSessionFactory(locator);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ final SimpleString queueName1 = new SimpleString("queue1");
+
+ final SimpleString queueName2 = new SimpleString("queue2");
+
+ session.createQueue(new SimpleString(forwardAddress), RoutingType.ANYCAST, queueName1, null, false);
+
+ session.createQueue(new SimpleString(testAddress), RoutingType.MULTICAST, queueName2, null, false);
+
+ session.start();
+
+ ClientProducer producer = session.createProducer(new SimpleString(testAddress));
+
+ ClientConsumer consumer1 = session.createConsumer(queueName1);
+
+ final int numMessages = 1;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = session.createMessage(false);
+
+ message.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
+
+ message.putIntProperty(propKey, i);
+
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = consumer1.receive(DivertTest.TIMEOUT);
+
+ Assert.assertNotNull(message);
+
+ Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ Assert.assertNull(consumer1.receiveImmediate());
+ }
+
+ @Test
public void testSingleDivertWithExpiry() throws Exception {
final String testAddress = "testAddress";
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index 985b495..280fdc4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -713,6 +713,18 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
}
@Override
+ public void createDivert(String name,
+ String routingName,
+ String address,
+ String forwardingAddress,
+ boolean exclusive,
+ String filterString,
+ String transformerClassName,
+ String routingType) throws Exception {
+ proxy.invokeOperation("createDivert", name, routingName, address, forwardingAddress, exclusive, filterString, transformerClassName, routingType);
+ }
+
+ @Override
public void destroyDivert(String name) throws Exception {
proxy.invokeOperation("destroyDivert", name);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java
index 61ecda2..48528ce 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java
@@ -62,6 +62,11 @@ public class DivertControlUsingCoreTest extends DivertControlTest {
}
@Override
+ public String getRoutingType() {
+ return (String) proxy.retrieveAttributeValue("routingType");
+ }
+
+ @Override
public String getUniqueName() {
return (String) proxy.retrieveAttributeValue("uniqueName");
}