You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2021/12/07 13:17:59 UTC
[activemq-artemis] branch main updated: ARTEMIS-3594 - add support for a local target key transformer and an instance of CONSISTENT_HASH_MODULO that can be used to partition in a static cluster
This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new e0b1621 ARTEMIS-3594 - add support for a local target key transformer and an instance of CONSISTENT_HASH_MODULO that can be used to partition in a static cluster
e0b1621 is described below
commit e0b16217a1d198a19bbc8ed2c6cfe9f09bb8b993
Author: gtully <ga...@gmail.com>
AuthorDate: Wed Dec 1 15:27:54 2021 +0000
ARTEMIS-3594 - add support for a local target key transformer and an instance of CONSISTENT_HASH_MODULO that can be used to partition in a static cluster
---
.../balancing/BrokerBalancerConfiguration.java | 15 +-
...ration.java => NamedPropertyConfiguration.java} | 6 +-
.../deployers/impl/FileConfigurationParser.java | 25 +-
.../core/server/balancing/BrokerBalancer.java | 29 +-
.../server/balancing/BrokerBalancerManager.java | 29 +-
.../balancing/policies/ConsistentHashPolicy.java | 6 +-
.../balancing/policies/DefaultPolicyFactory.java | 49 ----
.../server/balancing/policies/PolicyFactory.java | 6 +-
.../balancing/policies/PolicyFactoryResolver.java | 19 +-
.../transformer/ConsistentHashModulo.java | 47 +++
.../KeyTransformer.java} | 14 +-
.../TransformerFactory.java} | 8 +-
.../transformer/TransformerFactoryResolver.java | 62 ++++
.../resources/schema/artemis-configuration.xsd | 27 ++
.../core/config/impl/FileConfigurationTest.java | 11 +-
.../balancing/BrokerBalancerManagerTest.java | 27 +-
.../core/server/balancing/BrokerBalancerTest.java | 43 ++-
.../policies/PolicyFactoryResolverTest.java} | 29 +-
.../transformer/ConsistentHashModuloTest.java | 55 ++++
.../TransformerFactoryResolverTest.java | 43 +++
.../artemis/tests/util/ActiveMQTestBase.java | 2 +-
.../resources/ConfigurationTest-full-config.xml | 8 +
.../ConfigurationTest-xinclude-config.xml | 8 +
docs/user-manual/en/broker-balancers.md | 7 +
.../balancing/AutoClientIDShardClusterTest.java | 325 +++++++++++++++++++++
.../integration/balancing/BalancingTestBase.java | 10 +-
.../tests/integration/balancing/TargetKeyTest.java | 9 +-
27 files changed, 778 insertions(+), 141 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java
index 1da1c04..e20d033 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java
@@ -28,7 +28,8 @@ public class BrokerBalancerConfiguration implements Serializable {
private String localTargetFilter = null;
private int cacheTimeout = -1;
private PoolConfiguration poolConfiguration = null;
- private PolicyConfiguration policyConfiguration = null;
+ private NamedPropertyConfiguration policyConfiguration = null;
+ private NamedPropertyConfiguration transformerConfiguration = null;
public String getName() {
return name;
@@ -75,11 +76,11 @@ public class BrokerBalancerConfiguration implements Serializable {
return this;
}
- public PolicyConfiguration getPolicyConfiguration() {
+ public NamedPropertyConfiguration getPolicyConfiguration() {
return policyConfiguration;
}
- public BrokerBalancerConfiguration setPolicyConfiguration(PolicyConfiguration policyConfiguration) {
+ public BrokerBalancerConfiguration setPolicyConfiguration(NamedPropertyConfiguration policyConfiguration) {
this.policyConfiguration = policyConfiguration;
return this;
}
@@ -92,4 +93,12 @@ public class BrokerBalancerConfiguration implements Serializable {
this.poolConfiguration = poolConfiguration;
return this;
}
+
+ public void setTransformerConfiguration(NamedPropertyConfiguration configuration) {
+ this.transformerConfiguration = configuration;
+ }
+
+ public NamedPropertyConfiguration getTransformerConfiguration() {
+ return transformerConfiguration;
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/PolicyConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/NamedPropertyConfiguration.java
similarity index 85%
rename from artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/PolicyConfiguration.java
rename to artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/NamedPropertyConfiguration.java
index f1f8630..eefa61d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/PolicyConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/NamedPropertyConfiguration.java
@@ -20,7 +20,7 @@ package org.apache.activemq.artemis.core.config.balancing;
import java.io.Serializable;
import java.util.Map;
-public class PolicyConfiguration implements Serializable {
+public class NamedPropertyConfiguration implements Serializable {
private String name;
private Map<String, String> properties;
@@ -29,7 +29,7 @@ public class PolicyConfiguration implements Serializable {
return name;
}
- public PolicyConfiguration setName(String name) {
+ public NamedPropertyConfiguration setName(String name) {
this.name = name;
return this;
}
@@ -38,7 +38,7 @@ public class PolicyConfiguration implements Serializable {
return properties;
}
- public PolicyConfiguration setProperties(Map<String, String> properties) {
+ public NamedPropertyConfiguration setProperties(Map<String, String> properties) {
this.properties = properties;
return this;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 1e92a83..bd27103 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -47,7 +47,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
-import org.apache.activemq.artemis.core.config.balancing.PolicyConfiguration;
+import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
@@ -93,6 +93,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactoryResolver;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
+import org.apache.activemq.artemis.core.server.balancing.transformer.TransformerFactoryResolver;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
@@ -2653,7 +2654,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
brokerBalancerConfiguration.setCacheTimeout(getInteger(e, "cache-timeout",
brokerBalancerConfiguration.getCacheTimeout(), Validators.MINUS_ONE_OR_GE_ZERO));
- PolicyConfiguration policyConfiguration = null;
+ NamedPropertyConfiguration policyConfiguration = null;
PoolConfiguration poolConfiguration = null;
NodeList children = e.getChildNodes();
@@ -2661,20 +2662,34 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
Node child = children.item(j);
if (child.getNodeName().equals("policy")) {
- policyConfiguration = new PolicyConfiguration();
- parsePolicyConfiguration((Element)child, policyConfiguration);
+ policyConfiguration = new NamedPropertyConfiguration();
+ parsePolicyConfiguration((Element) child, policyConfiguration);
brokerBalancerConfiguration.setPolicyConfiguration(policyConfiguration);
} else if (child.getNodeName().equals("pool")) {
poolConfiguration = new PoolConfiguration();
parsePoolConfiguration((Element) child, config, poolConfiguration);
brokerBalancerConfiguration.setPoolConfiguration(poolConfiguration);
+ } else if (child.getNodeName().equals("local-target-key-transformer")) {
+ policyConfiguration = new NamedPropertyConfiguration();
+ parseTransformerConfiguration((Element) child, policyConfiguration);
+ brokerBalancerConfiguration.setTransformerConfiguration(policyConfiguration);
}
}
config.getBalancerConfigurations().add(brokerBalancerConfiguration);
}
- private void parsePolicyConfiguration(final Element e, final PolicyConfiguration policyConfiguration) throws ClassNotFoundException {
+ private void parseTransformerConfiguration(final Element e, final NamedPropertyConfiguration policyConfiguration) throws ClassNotFoundException {
+ String name = e.getAttribute("name");
+
+ TransformerFactoryResolver.getInstance().resolve(name);
+
+ policyConfiguration.setName(name);
+
+ policyConfiguration.setProperties(getMapOfChildPropertyElements(e));
+ }
+
+ private void parsePolicyConfiguration(final Element e, final NamedPropertyConfiguration policyConfiguration) throws ClassNotFoundException {
String name = e.getAttribute("name");
PolicyFactoryResolver.getInstance().resolve(name);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
index c256209..a39c738 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
+import org.apache.activemq.artemis.core.server.balancing.transformer.KeyTransformer;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.jboss.logging.Logger;
@@ -40,7 +41,6 @@ public class BrokerBalancer implements ActiveMQComponent {
public static final String CLIENT_ID_PREFIX = ActiveMQDefaultConfiguration.DEFAULT_INTERNAL_NAMING_PREFIX + "balancer.client.";
-
private final String name;
private final TargetKey targetKey;
@@ -55,6 +55,8 @@ public class BrokerBalancer implements ActiveMQComponent {
private final Policy policy;
+ private final KeyTransformer transformer;
+
private final Cache<String, TargetResult> cache;
private volatile boolean started = false;
@@ -93,11 +95,21 @@ public class BrokerBalancer implements ActiveMQComponent {
}
- public BrokerBalancer(final String name, final TargetKey targetKey, final String targetKeyFilter, final Target localTarget, final String localTargetFilter, final Pool pool, final Policy policy, final int cacheTimeout) {
+ public BrokerBalancer(final String name,
+ final TargetKey targetKey,
+ final String targetKeyFilter,
+ final Target localTarget,
+ final String localTargetFilter,
+ final Pool pool,
+ final Policy policy,
+ KeyTransformer transformer,
+ final int cacheTimeout) {
this.name = name;
this.targetKey = targetKey;
+ this.transformer = transformer;
+
this.targetKeyResolver = new TargetKeyResolver(targetKey, targetKeyFilter);
this.localTarget = new TargetResult(localTarget);
@@ -149,7 +161,7 @@ public class BrokerBalancer implements ActiveMQComponent {
public TargetResult getTarget(String key) {
- if (this.localTargetFilter != null && this.localTargetFilter.matcher(key).matches()) {
+ if (this.localTargetFilter != null && this.localTargetFilter.matcher(transform(key)).matches()) {
if (logger.isDebugEnabled()) {
logger.debug("The " + targetKey + "[" + key + "] matches the localTargetFilter " + localTargetFilter.pattern());
}
@@ -201,4 +213,15 @@ public class BrokerBalancer implements ActiveMQComponent {
return result != null ? result : TargetResult.REFUSED_UNAVAILABLE_RESULT;
}
+
+ private String transform(String key) {
+ String result = key;
+ if (transformer != null) {
+ result = transformer.transform(key);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Key: " + key + ", transformed to " + result);
+ }
+ }
+ return result;
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java
index 10afe15..f9b8002 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java
@@ -21,7 +21,7 @@ import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.cluster.DiscoveryGroup;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
-import org.apache.activemq.artemis.core.config.balancing.PolicyConfiguration;
+import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@@ -39,6 +39,9 @@ import org.apache.activemq.artemis.core.server.balancing.targets.ActiveMQTargetF
import org.apache.activemq.artemis.core.server.balancing.targets.LocalTarget;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetFactory;
+import org.apache.activemq.artemis.core.server.balancing.transformer.KeyTransformer;
+import org.apache.activemq.artemis.core.server.balancing.transformer.TransformerFactory;
+import org.apache.activemq.artemis.core.server.balancing.transformer.TransformerFactoryResolver;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.jboss.logging.Logger;
@@ -95,13 +98,19 @@ public final class BrokerBalancerManager implements ActiveMQComponent {
}
Policy policy = null;
- PolicyConfiguration policyConfiguration = config.getPolicyConfiguration();
+ NamedPropertyConfiguration policyConfiguration = config.getPolicyConfiguration();
if (policyConfiguration != null) {
policy = deployPolicy(policyConfiguration, pool);
}
+ KeyTransformer transformer = null;
+ NamedPropertyConfiguration transformerConfiguration = config.getTransformerConfiguration();
+ if (transformerConfiguration != null) {
+ transformer = deployTransformer(transformerConfiguration);
+ }
+
BrokerBalancer balancer = new BrokerBalancer(config.getName(), config.getTargetKey(), config.getTargetKeyFilter(),
- localTarget, config.getLocalTargetFilter(), pool, policy, config.getCacheTimeout());
+ localTarget, config.getLocalTargetFilter(), pool, policy, transformer, config.getCacheTimeout());
balancerControllers.put(balancer.getName(), balancer);
@@ -160,10 +169,10 @@ public final class BrokerBalancerManager implements ActiveMQComponent {
return pool;
}
- private Policy deployPolicy(PolicyConfiguration policyConfig, Pool pool) throws ClassNotFoundException {
+ private Policy deployPolicy(NamedPropertyConfiguration policyConfig, Pool pool) throws ClassNotFoundException {
PolicyFactory policyFactory = PolicyFactoryResolver.getInstance().resolve(policyConfig.getName());
- Policy policy = policyFactory.createPolicy(policyConfig.getName());
+ Policy policy = policyFactory.create();
policy.init(policyConfig.getProperties());
@@ -174,6 +183,16 @@ public final class BrokerBalancerManager implements ActiveMQComponent {
return policy;
}
+ private KeyTransformer deployTransformer(NamedPropertyConfiguration configuration) throws Exception {
+ TransformerFactory factory = TransformerFactoryResolver.getInstance().resolve(configuration.getName());
+
+ KeyTransformer transformer = factory.create();
+
+ transformer.init(configuration.getProperties());
+
+ return transformer;
+ }
+
public BrokerBalancer getBalancer(String name) {
return balancerControllers.get(name);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/ConsistentHashPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/ConsistentHashPolicy.java
index 77d4076..4c0dd46 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/ConsistentHashPolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/ConsistentHashPolicy.java
@@ -31,10 +31,6 @@ public class ConsistentHashPolicy extends AbstractPolicy {
super(NAME);
}
- protected ConsistentHashPolicy(String name) {
- super(name);
- }
-
@Override
public Target selectTarget(List<Target> targets, String key) {
if (targets.size() > 1) {
@@ -60,7 +56,7 @@ public class ConsistentHashPolicy extends AbstractPolicy {
return null;
}
- private int getHash(String str) {
+ public static int getHash(String str) {
final int FNV_INIT = 0x811c9dc5;
final int FNV_PRIME = 0x01000193;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/DefaultPolicyFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/DefaultPolicyFactory.java
deleted file mode 100644
index aa39787..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/DefaultPolicyFactory.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.core.server.balancing.policies;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.Supplier;
-
-public class DefaultPolicyFactory extends PolicyFactory {
- private static final Map<String, Supplier<AbstractPolicy>> supportedPolicies = new HashMap<>();
-
- static {
- supportedPolicies.put(ConsistentHashPolicy.NAME, () -> new ConsistentHashPolicy());
- supportedPolicies.put(FirstElementPolicy.NAME, () -> new FirstElementPolicy());
- supportedPolicies.put(LeastConnectionsPolicy.NAME, () -> new LeastConnectionsPolicy());
- supportedPolicies.put(RoundRobinPolicy.NAME, () -> new RoundRobinPolicy());
- }
-
- @Override
- public String[] getSupportedPolicies() {
- return supportedPolicies.keySet().toArray(new String[supportedPolicies.size()]);
- }
-
- @Override
- public AbstractPolicy createPolicy(String policyName) {
- Supplier<AbstractPolicy> policySupplier = supportedPolicies.get(policyName);
-
- if (policySupplier == null) {
- throw new IllegalArgumentException("Policy not supported: " + policyName);
- }
-
- return policySupplier.get();
- }
-}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java
index 4c745ee..227fda9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java
@@ -17,8 +17,6 @@
package org.apache.activemq.artemis.core.server.balancing.policies;
-public abstract class PolicyFactory {
- public abstract String[] getSupportedPolicies();
-
- public abstract Policy createPolicy(String policyName);
+public interface PolicyFactory {
+ Policy create();
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolver.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolver.java
index dab4f93..45a61db 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolver.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolver.java
@@ -36,7 +36,10 @@ public class PolicyFactoryResolver {
private final Map<String, PolicyFactory> policyFactories = new HashMap<>();
private PolicyFactoryResolver() {
- registerPolicyFactory(new DefaultPolicyFactory());
+ policyFactories.put(ConsistentHashPolicy.NAME, () -> new ConsistentHashPolicy());
+ policyFactories.put(FirstElementPolicy.NAME, () -> new FirstElementPolicy());
+ policyFactories.put(LeastConnectionsPolicy.NAME, () -> new LeastConnectionsPolicy());
+ policyFactories.put(RoundRobinPolicy.NAME, () -> new RoundRobinPolicy());
loadPolicyFactories();
}
@@ -56,19 +59,15 @@ public class PolicyFactoryResolver {
PolicyFactory.class, BrokerBalancer.class.getClassLoader());
for (PolicyFactory policyFactory : serviceLoader) {
- registerPolicyFactory(policyFactory);
+ policyFactories.put(keyFromClassName(policyFactory.getClass().getName()), policyFactory);
}
}
- public void registerPolicyFactory(PolicyFactory policyFactory) {
- for (String policyName : policyFactory.getSupportedPolicies()) {
- policyFactories.put(policyName, policyFactory);
- }
+ public void registerPolicyFactory(String name, PolicyFactory policyFactory) {
+ policyFactories.put(name, policyFactory);
}
- public void unregisterPolicyFactory(PolicyFactory policyFactory) {
- for (String policyName : policyFactory.getSupportedPolicies()) {
- policyFactories.remove(policyName, policyFactory);
- }
+ String keyFromClassName(String name) {
+ return name.substring(0, name.indexOf("PolicyFactory"));
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/ConsistentHashModulo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/ConsistentHashModulo.java
new file mode 100644
index 0000000..a092e86
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/ConsistentHashModulo.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.balancing.transformer;
+
+import java.util.Map;
+
+import org.apache.activemq.artemis.core.server.balancing.policies.ConsistentHashPolicy;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver;
+
+public class ConsistentHashModulo implements KeyTransformer {
+ public static final String NAME = "CONSISTENT_HASH_MODULO";
+ public static final String MODULO = "modulo";
+ int modulo = 0;
+
+ @Override
+ public String transform(String str) {
+ if (TargetKeyResolver.DEFAULT_KEY_VALUE.equals(str)) {
+ // we only want to transform resolved keys
+ return str;
+ }
+ if (modulo == 0) {
+ return str;
+ }
+ int hash = ConsistentHashPolicy.getHash(str);
+ return String.valueOf( hash % modulo );
+ }
+
+ @Override
+ public void init(Map<String, String> properties) {
+ modulo = Integer.parseInt(properties.get(MODULO));
+ }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/KeyTransformer.java
similarity index 76%
copy from artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java
copy to artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/KeyTransformer.java
index 4c745ee..dc0224e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/KeyTransformer.java
@@ -15,10 +15,16 @@
* limitations under the License.
*/
-package org.apache.activemq.artemis.core.server.balancing.policies;
+package org.apache.activemq.artemis.core.server.balancing.transformer;
-public abstract class PolicyFactory {
- public abstract String[] getSupportedPolicies();
+import java.util.Map;
- public abstract Policy createPolicy(String policyName);
+public interface KeyTransformer {
+
+ default void init(Map<String, String> properties) {
+ }
+
+ default String transform(String key) {
+ return key;
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactory.java
similarity index 78%
copy from artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java
copy to artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactory.java
index 4c745ee..af60fda 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactory.java
@@ -15,10 +15,8 @@
* limitations under the License.
*/
-package org.apache.activemq.artemis.core.server.balancing.policies;
+package org.apache.activemq.artemis.core.server.balancing.transformer;
-public abstract class PolicyFactory {
- public abstract String[] getSupportedPolicies();
-
- public abstract Policy createPolicy(String policyName);
+public interface TransformerFactory {
+ KeyTransformer create();
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactoryResolver.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactoryResolver.java
new file mode 100644
index 0000000..46dc635
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactoryResolver.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.balancing.transformer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+import org.apache.activemq.artemis.core.server.balancing.BrokerBalancer;
+
+public class TransformerFactoryResolver {
+ private static TransformerFactoryResolver instance;
+
+ public static TransformerFactoryResolver getInstance() {
+ if (instance == null) {
+ instance = new TransformerFactoryResolver();
+ }
+ return instance;
+ }
+
+ private final Map<String, TransformerFactory> factories = new HashMap<>();
+
+ private TransformerFactoryResolver() {
+ factories.put(ConsistentHashModulo.NAME, () -> new ConsistentHashModulo());
+ loadFactories(); // let service loader override
+ }
+
+ public TransformerFactory resolve(String policyName) throws ClassNotFoundException {
+ TransformerFactory factory = factories.get(policyName);
+ if (factory == null) {
+ throw new ClassNotFoundException("No TransformerFactory found for " + policyName);
+ }
+ return factory;
+ }
+
+ private void loadFactories() {
+ ServiceLoader<TransformerFactory> serviceLoader = ServiceLoader.load(
+ TransformerFactory.class, BrokerBalancer.class.getClassLoader());
+ for (TransformerFactory factory : serviceLoader) {
+ factories.put(keyFromClassName(factory.getClass().getName()), factory);
+ }
+ }
+
+ String keyFromClassName(String name) {
+ return name.substring(0, name.indexOf("TransformerFactory"));
+ }
+}
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 27d3134..6d29d83 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2136,6 +2136,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
+ <xsd:element name="local-target-key-transformer" type="brokerBalancerKeyTransformerType" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ the local target key transformer configuration
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
</xsd:sequence>
<xsd:attribute name="name" type="xsd:string" use="required">
<xsd:annotation>
@@ -2176,6 +2183,26 @@
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>
+ <xsd:complexType name="brokerBalancerKeyTransformerType">
+ <xsd:sequence>
+ <xsd:element ref="property" maxOccurs="unbounded" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ properties to configure a key transformer
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+ </xsd:sequence>
+ <xsd:attribute name="name" type="xsd:ID" use="required">
+ <xsd:annotation>
+ <xsd:documentation>
+ the name of the policy
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attributeGroup ref="xml:specialAttrs"/>
+ </xsd:complexType>
+
<xsd:complexType name="brokerBalancerPoolType">
<xsd:sequence maxOccurs="unbounded">
<xsd:element name="username" type="xsd:string" maxOccurs="1" minOccurs="0">
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index d33b3ee..344aae0 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -76,6 +76,8 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.apache.activemq.artemis.core.server.balancing.transformer.ConsistentHashModulo.MODULO;
+
public class FileConfigurationTest extends ConfigurationImplTest {
@BeforeClass
@@ -265,13 +267,20 @@ public class FileConfigurationTest extends ConfigurationImplTest {
}
}
- Assert.assertEquals(4, conf.getBalancerConfigurations().size());
+ Assert.assertEquals(5, conf.getBalancerConfigurations().size());
for (BrokerBalancerConfiguration bc : conf.getBalancerConfigurations()) {
if (bc.getName().equals("simple-local")) {
Assert.assertEquals(bc.getTargetKey(), TargetKey.CLIENT_ID);
Assert.assertNotNull(bc.getLocalTargetFilter());
Assert.assertNotNull(bc.getTargetKeyFilter());
Assert.assertNull(bc.getPolicyConfiguration());
+ } else if (bc.getName().equals("simple-local-with-transformer")) {
+ Assert.assertEquals(bc.getTargetKey(), TargetKey.CLIENT_ID);
+ Assert.assertNotNull(bc.getLocalTargetFilter());
+ Assert.assertNotNull(bc.getTargetKeyFilter());
+ Assert.assertNull(bc.getPolicyConfiguration());
+ Assert.assertNotNull(bc.getTransformerConfiguration());
+ Assert.assertNotNull(bc.getTransformerConfiguration().getProperties().get(MODULO));
} else if (bc.getName().equals("simple-balancer")) {
Assert.assertEquals(bc.getTargetKey(), TargetKey.USER_NAME);
Assert.assertNull(bc.getLocalTargetFilter());
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManagerTest.java
index b98c7ec..a5bb93d 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManagerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManagerTest.java
@@ -17,12 +17,16 @@
package org.apache.activemq.artemis.core.server.balancing;
+import java.util.HashMap;
+
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
-import org.apache.activemq.artemis.core.config.balancing.PolicyConfiguration;
+import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.balancing.policies.ConsistentHashPolicy;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
+import org.apache.activemq.artemis.core.server.balancing.transformer.ConsistentHashModulo;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.junit.After;
import org.junit.Before;
@@ -62,7 +66,7 @@ public class BrokerBalancerManagerTest {
BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration();
brokerBalancerConfiguration.setName("partition-local-pool");
- PolicyConfiguration policyConfig = new PolicyConfiguration();
+ NamedPropertyConfiguration policyConfig = new NamedPropertyConfiguration();
policyConfig.setName(ConsistentHashPolicy.NAME);
brokerBalancerConfiguration.setPolicyConfiguration(policyConfig);
@@ -84,4 +88,23 @@ public class BrokerBalancerManagerTest {
underTest.deployBrokerBalancer(brokerBalancerConfiguration);
}
+
+ @Test()
+ public void deployLocalOnlyWithPolicy() throws Exception {
+
+ ManagementService mockManagementService = Mockito.mock(ManagementService.class);
+ Mockito.when(mockServer.getManagementService()).thenReturn(mockManagementService);
+
+ BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration();
+ brokerBalancerConfiguration.setName("partition-local-consistent-hash").setTargetKey(TargetKey.CLIENT_ID).setLocalTargetFilter(String.valueOf(2));
+ NamedPropertyConfiguration policyConfig = new NamedPropertyConfiguration();
+ policyConfig.setName(ConsistentHashModulo.NAME);
+ HashMap<String, String> properties = new HashMap<>();
+ properties.put(ConsistentHashModulo.MODULO, String.valueOf(2));
+ policyConfig.setProperties(properties);
+ brokerBalancerConfiguration.setTransformerConfiguration(policyConfig);
+
+
+ underTest.deployBrokerBalancer(brokerBalancerConfiguration);
+ }
}
\ No newline at end of file
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
index 7f8ca0e..732ea53 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
@@ -18,7 +18,6 @@
package org.apache.activemq.artemis.core.server.balancing;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -29,7 +28,7 @@ import org.apache.activemq.artemis.core.server.balancing.targets.LocalTarget;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
-import org.junit.After;
+import org.apache.activemq.artemis.core.server.balancing.transformer.KeyTransformer;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -44,38 +43,34 @@ public class BrokerBalancerTest {
@Before
public void setUp() {
-
ActiveMQServer mockServer = mock(ActiveMQServer.class);
Mockito.when(mockServer.getNodeID()).thenReturn(SimpleString.toSimpleString("UUID"));
-
localTarget = new LocalTarget(null, mockServer);
+ }
+ @Test
+ public void getTarget() {
Pool pool = null;
Policy policy = null;
underTest = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}",
- localTarget, "^FOO.*", pool, policy, 0);
- try {
- underTest.start();
- } catch (Exception e) {
- fail(e.getMessage());
- }
- }
-
- @After
- public void after() {
- if (underTest != null) {
- try {
- underTest.stop();
- } catch (Exception e) {
- fail(e.getMessage());
- }
- }
+ localTarget, "^FOO.*", pool, policy, null, 0);
+ assertEquals( localTarget, underTest.getTarget("FOO_EE").getTarget());
+ assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT, underTest.getTarget("BAR_EE"));
}
@Test
- public void getTarget() {
- assertEquals( localTarget, underTest.getTarget("FOO_EE").getTarget());
- assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT, underTest.getTarget("BAR_EE"));
+ public void getLocalTargetWithTransformer() throws Exception {
+ Pool pool = null;
+ Policy policy = null;
+ KeyTransformer keyTransformer = new KeyTransformer() {
+ @Override
+ public String transform(String key) {
+ return key.substring("TRANSFORM_TO".length() + 1);
+ }
+ };
+ underTest = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}",
+ localTarget, "^FOO.*", pool, policy, keyTransformer, 0);
+ assertEquals( localTarget, underTest.getTarget("TRANSFORM_TO_FOO_EE").getTarget());
}
}
\ No newline at end of file
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolverTest.java
similarity index 52%
copy from artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java
copy to artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolverTest.java
index 4c745ee..0804ba4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolverTest.java
@@ -14,11 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.activemq.artemis.core.server.balancing.policies;
-public abstract class PolicyFactory {
- public abstract String[] getSupportedPolicies();
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class PolicyFactoryResolverTest {
+
+ @Test
+ public void resolveOk() throws Exception {
+ PolicyFactoryResolver instance = PolicyFactoryResolver.getInstance();
+ assertNotNull(instance.resolve(ConsistentHashPolicy.NAME));
+ }
+
+ @Test(expected = ClassNotFoundException.class)
+ public void resolveError() throws Exception {
+ PolicyFactoryResolver instance = PolicyFactoryResolver.getInstance();
+ assertNotNull(instance.resolve("NOT PRESENT"));
+ }
- public abstract Policy createPolicy(String policyName);
-}
+ @Test
+ public void keyFromName() throws Exception {
+ PolicyFactoryResolver instance = PolicyFactoryResolver.getInstance();
+ assertEquals("New", instance.keyFromClassName("NewPolicyFactory"));
+ }
+}
\ No newline at end of file
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/transformer/ConsistentHashModuloTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/transformer/ConsistentHashModuloTest.java
new file mode 100644
index 0000000..137181b
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/transformer/ConsistentHashModuloTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.balancing.transformer;
+
+import java.util.HashMap;
+
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ConsistentHashModuloTest {
+
+ @Test
+ public void transform() {
+ ConsistentHashModulo underTest = new ConsistentHashModulo();
+
+ assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, underTest.transform(TargetKeyResolver.DEFAULT_KEY_VALUE));
+
+ assertEquals("AA", underTest.transform("AA")); // default modulo 0 does nothing
+
+ HashMap<String, String> properties = new HashMap<>();
+
+ final int modulo = 2;
+ properties.put(ConsistentHashModulo.MODULO, String.valueOf(modulo));
+ underTest.init(properties);
+
+ String hash1 = underTest.transform("AAA");
+ int v1 = Integer.parseInt(hash1);
+
+ String hash2 = underTest.transform("BBB");
+ int v2 = Integer.parseInt(hash2);
+
+ assertNotEquals(hash1, hash2);
+ assertNotEquals(v1, v2);
+ assertTrue(v1 < modulo && v2 < modulo);
+ }
+}
\ No newline at end of file
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactoryResolverTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactoryResolverTest.java
new file mode 100644
index 0000000..d102da1
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactoryResolverTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.balancing.transformer;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TransformerFactoryResolverTest {
+
+ @Test
+ public void resolveOk() throws Exception {
+ TransformerFactoryResolver instance = TransformerFactoryResolver.getInstance();
+ assertNotNull(instance.resolve(ConsistentHashModulo.NAME));
+ }
+
+ @Test(expected = ClassNotFoundException.class)
+ public void resolveError() throws Exception {
+ TransformerFactoryResolver instance = TransformerFactoryResolver.getInstance();
+ assertNotNull(instance.resolve("NOT PRESENT"));
+ }
+
+ @Test
+ public void keyFromName() throws Exception {
+ TransformerFactoryResolver instance = TransformerFactoryResolver.getInstance();
+ assertEquals("New", instance.keyFromClassName("NewTransformerFactory"));
+ }
+}
\ No newline at end of file
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 7887458..7d6b04e 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -1964,7 +1964,7 @@ public abstract class ActiveMQTestBase extends Assert {
}
}
- if (bindingCount == expectedBindingCount && totConsumers == expectedConsumerCount) {
+ if (bindingCount == expectedBindingCount && (expectedConsumerCount == -1 || totConsumers == expectedConsumerCount)) {
return true;
}
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 0a772b1..5c58d82 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -158,6 +158,14 @@
<target-key-filter>^[^.]+</target-key-filter>
<local-target-filter>DEFAULT</local-target-filter>
</broker-balancer>
+ <broker-balancer name="simple-local-with-transformer">
+ <target-key>CLIENT_ID</target-key>
+ <target-key-filter>^[^.]+</target-key-filter>
+ <local-target-filter>DEFAULT</local-target-filter>
+ <local-target-key-transformer name="CONSISTENT_HASH_MODULO">
+ <property key="modulo" value="2"></property>
+ </local-target-key-transformer>
+ </broker-balancer>
<broker-balancer name="simple-balancer">
<target-key>USER_NAME</target-key>
<policy name="FIRST_ELEMENT"/>
diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index faaef30..e5edf10 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -149,6 +149,14 @@
<target-key-filter>^[^.]+</target-key-filter>
<local-target-filter>DEFAULT</local-target-filter>
</broker-balancer>
+ <broker-balancer name="simple-local-with-transformer">
+ <target-key>CLIENT_ID</target-key>
+ <target-key-filter>^[^.]+</target-key-filter>
+ <local-target-filter>DEFAULT</local-target-filter>
+ <local-target-key-transformer name="CONSISTENT_HASH_MODULO">
+ <property key="modulo" value="2"></property>
+ </local-target-key-transformer>
+ </broker-balancer>
<broker-balancer name="simple-balancer">
<target-key>USER_NAME</target-key>
<policy name="FIRST_ELEMENT"/>
diff --git a/docs/user-manual/en/broker-balancers.md b/docs/user-manual/en/broker-balancers.md
index be42656..c821877 100644
--- a/docs/user-manual/en/broker-balancers.md
+++ b/docs/user-manual/en/broker-balancers.md
@@ -106,12 +106,19 @@ So a broker balancer with the cache enabled doesn't strictly follow the configur
By default, the cache is enabled and will never timeout. See below
for more details about setting the `cache-timeout` parameter.
+## Key transformers
+A `local-target-key-transformer` allows target key transformation before matching against any local-target-filter. One use case is
+CLIENT_ID sharding across a cluster of N brokers. With a consistent hash % N transformation, each client id
+can map exclusively to just one of the brokers. The included transformers are:
+* `CONSISTENT_HASH_MODULO` that takes a single `modulo` property to configure the bound.
+
## Defining broker balancers
A broker balancer is defined by the `broker-balancer` element, it includes the following items:
* the `name` attribute defines the name of the broker balancer and is used to reference the balancer from an acceptor;
* the `target-key` element defines what key to select a target broker, the supported values are: `CLIENT_ID`, `SNI_HOST`, `SOURCE_IP`, `USER_NAME`, `ROLE_NAME`, default is `SOURCE_IP`, see [target key](#target-key) for further details;
* the `target-key-filter` element defines a regular expression to filter the resolved keys;
* the `local-target-filter` element defines a regular expression to match the keys that have to return a local target;
+* the `local-target-key-transformer` element defines a key transformer, see [key transformers](#key-transformers);
* the `cache-timeout` element is the time period for a target broker to remain in the cache, measured in milliseconds, setting `0` will disable the cache, default is `-1`, meaning no expiration;
* the `pool` element defines the pool to group the target brokers, see [pools](#pools).
* the `policy` element defines the policy used to select the target brokers from the pool, see [policies](#policies);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AutoClientIDShardClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AutoClientIDShardClusterTest.java
new file mode 100644
index 0000000..18b605f
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AutoClientIDShardClusterTest.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.balancing;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
+import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver;
+import org.apache.activemq.artemis.core.server.balancing.transformer.ConsistentHashModulo;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class AutoClientIDShardClusterTest extends BalancingTestBase {
+
+ @Parameterized.Parameters(name = "protocol: {0}")
+ public static Collection<Object[]> data() {
+ final String[] protocols = new String[] {AMQP_PROTOCOL, CORE_PROTOCOL, OPENWIRE_PROTOCOL};
+ Collection<Object[]> data = new ArrayList<>();
+ for (String protocol : protocols) {
+ data.add(new Object[] {protocol});
+ }
+ return data;
+ }
+
+ private final String protocol;
+ final int numMessages = 50;
+ AtomicInteger toSend = new AtomicInteger(numMessages);
+
+ public AutoClientIDShardClusterTest(String protocol) {
+ this.protocol = protocol;
+ }
+
+ protected void setupServers() throws Exception {
+ for (int i = 0; i < 2; i++) {
+ setupLiveServer(i, true, HAType.SharedNothingReplication, true, false);
+ servers[i].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
+ servers[i].addProtocolManagerFactory(new OpenWireProtocolManagerFactory());
+ }
+ setupClusterConnection("cluster0", name.getMethodName(), MessageLoadBalancingType.ON_DEMAND, 1, true, 0, 1);
+ setupClusterConnection("cluster1", name.getMethodName(), MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 0);
+ toSend.set(numMessages);
+ }
+
+ Runnable producer = new Runnable() {
+ final AtomicInteger producerSeq = new AtomicInteger();
+
+ @Override
+ public void run() {
+ while (toSend.get() > 0) {
+ try {
+ ConnectionFactory connectionFactory = createFactory(protocol, "producer", "admin", "admin");
+ try (Connection connection = connectionFactory.createConnection()) {
+ connection.start();
+ try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+ javax.jms.Topic topic = session.createTopic(name.getMethodName());
+ try (MessageProducer producer = session.createProducer(topic)) {
+ for (int i = 0; i < 10 && toSend.get() > 0; i++) {
+ Message message = session.createTextMessage();
+ message.setIntProperty("SEQ", producerSeq.get() + 1);
+ producer.send(message);
+ producerSeq.incrementAndGet();
+ toSend.decrementAndGet();
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ }
+ }
+ } catch (Exception ok) {
+ }
+ }
+ }
+ };
+
+ class DurableSub implements Runnable {
+
+ final String id;
+ int receivedInOrder = -1;
+ int lastReceived;
+ int maxReceived;
+ AtomicBoolean consumerDone = new AtomicBoolean();
+ AtomicBoolean orderShot = new AtomicBoolean();
+ CountDownLatch registered = new CountDownLatch(1);
+
+ DurableSub(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public void run() {
+ while (!consumerDone.get()) {
+ try {
+ ConnectionFactory connectionFactory = createFactory(protocol, "ClientId-" + id, "admin", "admin");
+ Connection connection = null;
+ try {
+ connection = connectionFactory.createConnection();
+ connection.start();
+ try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+ javax.jms.Topic topic = session.createTopic(name.getMethodName());
+ try (TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, "Sub-" + id)) {
+ registered.countDown();
+ for (int i = 0; i < 5; i++) {
+ Message message = durableSubscriber.receive(500);
+ if (message != null) {
+ lastReceived = message.getIntProperty("SEQ");
+ if (lastReceived > maxReceived) {
+ maxReceived = lastReceived;
+ }
+ if (receivedInOrder < 0) {
+ receivedInOrder = lastReceived;
+ } else if (receivedInOrder == lastReceived - 1) {
+ receivedInOrder++;
+ } else {
+ if (!orderShot.get()) {
+ System.err.println("Sub: " + id + ", received: out of order " + lastReceived + ", last in order: " + receivedInOrder);
+ }
+ orderShot.set(true);
+ }
+ } else {
+ // no point trying again if there is nothing for us now.
+ break;
+ }
+ }
+ TimeUnit.MILLISECONDS.sleep(500);
+ }
+ }
+ } finally {
+ if (connection != null) {
+ connection.close(); // seems openwire not jms2.0 auto closable always
+ }
+ }
+ } catch (Exception ok) {
+ }
+ }
+ }
+ }
+
+ @Ignore("not totally reliable, but does show the root cause of the problem being solved")
+ public void testWithoutOutSharding() throws Exception {
+ setupServers();
+ startServers(0, 1);
+
+ // two bouncy durable consumers
+ DurableSub sub0 = new DurableSub("0");
+ DurableSub sub1 = new DurableSub("1");
+
+ ExecutorService executorService = Executors.newFixedThreadPool(3);
+ try {
+ executorService.submit(sub0);
+ executorService.submit(sub1);
+
+ // waiting for registration before production to give bridges a chance
+ assertTrue(sub0.registered.await(20, TimeUnit.SECONDS));
+ assertTrue(sub1.registered.await(20, TimeUnit.SECONDS));
+
+ assertTrue(waitForBindings(servers[0], name.getMethodName(), true, 2, -1, 10000));
+ assertTrue(waitForBindings(servers[1], name.getMethodName(), true, 2, -1, 10000));
+
+ // wait for remote bindings!
+ assertTrue(waitForBindings(servers[0], name.getMethodName(), false, 2, -1, 10000));
+ assertTrue(waitForBindings(servers[1], name.getMethodName(), false, 2, -1, 10000));
+
+ // produce a few every second with failover randomize=true so we produce on all nodes
+ executorService.submit(producer);
+
+ assertTrue("All sent", Wait.waitFor(() -> toSend.get() == 0));
+
+ assertTrue("All received sub0", Wait.waitFor(() -> sub0.maxReceived == numMessages));
+
+ assertTrue("All received sub1", Wait.waitFor(() -> sub1.maxReceived == numMessages));
+
+ // with bouncing, one 'may' be out of order, hence ignored
+ assertTrue(sub0.orderShot.get() || sub1.orderShot.get());
+
+ } finally {
+ sub0.consumerDone.set(true);
+ sub1.consumerDone.set(true);
+ executorService.shutdown();
+ stopServers(0, 1);
+ }
+ }
+
+ @Test
+ public void testWithConsistentHashClientIDModTwo() throws Exception {
+ setupServers();
+
+ addBalancerWithClientIdConsistentHashMod();
+
+ startServers(0, 1);
+
+ // two bouncy durable consumers
+ DurableSub sub0 = new DurableSub("0");
+ DurableSub sub1 = new DurableSub("1");
+
+ ExecutorService executorService = Executors.newFixedThreadPool(3);
+ try {
+ executorService.submit(sub0);
+ executorService.submit(sub1);
+
+ // waiting for registration before production to give bridges a chance
+ assertTrue(sub0.registered.await(5, TimeUnit.SECONDS));
+ assertTrue(sub1.registered.await(5, TimeUnit.SECONDS));
+
+ assertTrue(waitForBindings(servers[0], name.getMethodName(), true, 1, 1, 2000));
+ assertTrue(waitForBindings(servers[1], name.getMethodName(), true, 1, 1, 2000));
+
+ // wait for remote bindings!
+ assertTrue(waitForBindings(servers[0], name.getMethodName(), false, 1, 1, 10000));
+ assertTrue(waitForBindings(servers[1], name.getMethodName(), false, 1, 1, 10000));
+
+ // produce a few every second with failover randomize=true so we produce on all nodes
+ executorService.submit(producer);
+
+ assertTrue("All sent", Wait.waitFor(() -> toSend.get() == 0));
+
+ assertTrue("All received sub0", Wait.waitFor(() -> sub0.maxReceived == numMessages));
+
+ assertTrue("All received sub1", Wait.waitFor(() -> sub1.maxReceived == numMessages));
+
+ // with partition, none will be out of order
+ assertFalse(sub0.orderShot.get() && sub1.orderShot.get());
+
+ } finally {
+ sub0.consumerDone.set(true);
+ sub1.consumerDone.set(true);
+ executorService.shutdown();
+ stopServers(0, 1);
+ }
+ }
+
+ private void addBalancerWithClientIdConsistentHashMod() {
+ final int numberOfNodes = 2;
+ for (int node = 0; node < numberOfNodes; node++) {
+ Configuration configuration = servers[node].getConfiguration();
+ BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration().setName(BROKER_BALANCER_NAME);
+ brokerBalancerConfiguration.setTargetKey(TargetKey.CLIENT_ID).setLocalTargetFilter(TargetKeyResolver.DEFAULT_KEY_VALUE + "|" + node);
+ NamedPropertyConfiguration transformerConfig = new NamedPropertyConfiguration();
+ transformerConfig.setName(ConsistentHashModulo.NAME);
+ HashMap<String, String> properties = new HashMap<>();
+ properties.put(ConsistentHashModulo.MODULO, String.valueOf(numberOfNodes));
+ transformerConfig.setProperties(properties);
+ brokerBalancerConfiguration.setTransformerConfiguration(transformerConfig);
+
+ configuration.setBalancerConfigurations(Collections.singletonList(brokerBalancerConfiguration));
+
+ TransportConfiguration acceptor = getDefaultServerAcceptor(node);
+ acceptor.getParams().put("redirect-to", BROKER_BALANCER_NAME);
+ }
+ }
+
+ protected ConnectionFactory createFactory(String protocol, String clientID, String user, String password) throws Exception {
+ StringBuilder urlBuilder = new StringBuilder();
+
+ switch (protocol) {
+
+ case CORE_PROTOCOL: {
+ urlBuilder.append("(tcp://localhost:61616,tcp://localhost:61617)?connectionLoadBalancingPolicyClassName=org.apache.activemq.artemis.api.core.client.loadbalance.RandomConnectionLoadBalancingPolicy");
+ urlBuilder.append("&clientID=");
+ urlBuilder.append(clientID);
+
+ return new ActiveMQConnectionFactory(urlBuilder.toString(), user, password);
+ }
+ case AMQP_PROTOCOL: {
+
+ urlBuilder.append("failover:(amqp://localhost:61616,amqp://localhost:61617)?failover.randomize=true");
+ urlBuilder.append("&jms.clientID=");
+ urlBuilder.append(clientID);
+
+ return new JmsConnectionFactory(user, password, urlBuilder.toString());
+ }
+ case OPENWIRE_PROTOCOL: {
+
+ urlBuilder.append("failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=true&maxReconnectAttempts=0&startupMaxReconnectAttempts=0");
+ urlBuilder.append("&jms.clientID=");
+ urlBuilder.append(clientID);
+
+ return new org.apache.activemq.ActiveMQConnectionFactory(user, password, urlBuilder.toString());
+ }
+ default:
+ throw new IllegalStateException("Unexpected value: " + protocol);
+ }
+ }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java
index 890a813..c626325 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java
@@ -27,7 +27,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
-import org.apache.activemq.artemis.core.config.balancing.PolicyConfiguration;
+import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
@@ -88,7 +88,7 @@ public class BalancingTestBase extends ClusterTestBase {
brokerBalancerConfiguration.setTargetKey(targetKey).setLocalTargetFilter(localTargetFilter)
.setPoolConfiguration(new PoolConfiguration().setCheckPeriod(1000).setQuorumSize(quorumSize)
.setLocalTargetEnabled(localTargetEnabled).setClusterConnection(clusterConnection))
- .setPolicyConfiguration(new PolicyConfiguration().setName(policyName).setProperties(properties));
+ .setPolicyConfiguration(new NamedPropertyConfiguration().setName(policyName).setProperties(properties));
configuration.setBalancerConfigurations(Collections.singletonList(brokerBalancerConfiguration));
@@ -105,7 +105,7 @@ public class BalancingTestBase extends ClusterTestBase {
brokerBalancerConfiguration.setTargetKey(targetKey).setLocalTargetFilter(localTargetFilter)
.setPoolConfiguration(new PoolConfiguration().setCheckPeriod(1000).setQuorumSize(quorumSize)
.setLocalTargetEnabled(localTargetEnabled).setDiscoveryGroupName("dg1"))
- .setPolicyConfiguration(new PolicyConfiguration().setName(policyName).setProperties(properties));
+ .setPolicyConfiguration(new NamedPropertyConfiguration().setName(policyName).setProperties(properties));
configuration.setBalancerConfigurations(Collections.singletonList(brokerBalancerConfiguration));
@@ -129,7 +129,7 @@ public class BalancingTestBase extends ClusterTestBase {
brokerBalancerConfiguration.setTargetKey(targetKey).setLocalTargetFilter(localTargetFilter)
.setPoolConfiguration(new PoolConfiguration().setCheckPeriod(1000).setQuorumSize(quorumSize)
.setLocalTargetEnabled(localTargetEnabled).setStaticConnectors(staticConnectors))
- .setPolicyConfiguration(new PolicyConfiguration().setName(policyName).setProperties(properties));
+ .setPolicyConfiguration(new NamedPropertyConfiguration().setName(policyName).setProperties(properties));
configuration.setBalancerConfigurations(Collections.singletonList(brokerBalancerConfiguration));
@@ -215,7 +215,7 @@ public class BalancingTestBase extends ClusterTestBase {
urlBuilder.append(")");
}
- urlBuilder.append("?failover.startupMaxReconnectAttempts=" + retries);
+ urlBuilder.append("?failover.startupMaxReconnectAttempts=" + retries + "&failover.randomize=true");
if (clientID != null) {
urlBuilder.append("&jms.clientID=");
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/TargetKeyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/TargetKeyTest.java
index 5ccd8c9..446156d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/TargetKeyTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/TargetKeyTest.java
@@ -88,15 +88,10 @@ public class TargetKeyTest extends BalancingTestBase {
@Before
public void setup() throws Exception {
- PolicyFactoryResolver.getInstance().registerPolicyFactory(
+ PolicyFactoryResolver.getInstance().registerPolicyFactory(MOCK_POLICY_NAME,
new PolicyFactory() {
@Override
- public String[] getSupportedPolicies() {
- return new String[] {MOCK_POLICY_NAME};
- }
-
- @Override
- public Policy createPolicy(String policyName) {
+ public Policy create() {
return new FirstElementPolicy(MOCK_POLICY_NAME) {
@Override
public Target selectTarget(List<Target> targets, String key) {