You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2021/12/07 13:17:59 UTC

[activemq-artemis] branch main updated: ARTEMIS-3594 - add support for a local target key transformer and an instance of CONSISTENT_HASH_MODULO that can be used to partition in a static cluster

This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new e0b1621  ARTEMIS-3594 - add support for a local target key transformer and an instance of CONSISTENT_HASH_MODULO that can be used to partition in a static cluster
e0b1621 is described below

commit e0b16217a1d198a19bbc8ed2c6cfe9f09bb8b993
Author: gtully <ga...@gmail.com>
AuthorDate: Wed Dec 1 15:27:54 2021 +0000

    ARTEMIS-3594 - add support for a local target key transformer and an instance of CONSISTENT_HASH_MODULO that can be used to partition in a static cluster
---
 .../balancing/BrokerBalancerConfiguration.java     |  15 +-
 ...ration.java => NamedPropertyConfiguration.java} |   6 +-
 .../deployers/impl/FileConfigurationParser.java    |  25 +-
 .../core/server/balancing/BrokerBalancer.java      |  29 +-
 .../server/balancing/BrokerBalancerManager.java    |  29 +-
 .../balancing/policies/ConsistentHashPolicy.java   |   6 +-
 .../balancing/policies/DefaultPolicyFactory.java   |  49 ----
 .../server/balancing/policies/PolicyFactory.java   |   6 +-
 .../balancing/policies/PolicyFactoryResolver.java  |  19 +-
 .../transformer/ConsistentHashModulo.java          |  47 +++
 .../KeyTransformer.java}                           |  14 +-
 .../TransformerFactory.java}                       |   8 +-
 .../transformer/TransformerFactoryResolver.java    |  62 ++++
 .../resources/schema/artemis-configuration.xsd     |  27 ++
 .../core/config/impl/FileConfigurationTest.java    |  11 +-
 .../balancing/BrokerBalancerManagerTest.java       |  27 +-
 .../core/server/balancing/BrokerBalancerTest.java  |  43 ++-
 .../policies/PolicyFactoryResolverTest.java}       |  29 +-
 .../transformer/ConsistentHashModuloTest.java      |  55 ++++
 .../TransformerFactoryResolverTest.java            |  43 +++
 .../artemis/tests/util/ActiveMQTestBase.java       |   2 +-
 .../resources/ConfigurationTest-full-config.xml    |   8 +
 .../ConfigurationTest-xinclude-config.xml          |   8 +
 docs/user-manual/en/broker-balancers.md            |   7 +
 .../balancing/AutoClientIDShardClusterTest.java    | 325 +++++++++++++++++++++
 .../integration/balancing/BalancingTestBase.java   |  10 +-
 .../tests/integration/balancing/TargetKeyTest.java |   9 +-
 27 files changed, 778 insertions(+), 141 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java
index 1da1c04..e20d033 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java
@@ -28,7 +28,8 @@ public class BrokerBalancerConfiguration implements Serializable {
    private String localTargetFilter = null;
    private int cacheTimeout = -1;
    private PoolConfiguration poolConfiguration = null;
-   private PolicyConfiguration policyConfiguration = null;
+   private NamedPropertyConfiguration policyConfiguration = null;
+   private NamedPropertyConfiguration transformerConfiguration = null;
 
    public String getName() {
       return name;
@@ -75,11 +76,11 @@ public class BrokerBalancerConfiguration implements Serializable {
       return this;
    }
 
-   public PolicyConfiguration getPolicyConfiguration() {
+   public NamedPropertyConfiguration getPolicyConfiguration() {
       return policyConfiguration;
    }
 
-   public BrokerBalancerConfiguration setPolicyConfiguration(PolicyConfiguration policyConfiguration) {
+   public BrokerBalancerConfiguration setPolicyConfiguration(NamedPropertyConfiguration policyConfiguration) {
       this.policyConfiguration = policyConfiguration;
       return this;
    }
@@ -92,4 +93,12 @@ public class BrokerBalancerConfiguration implements Serializable {
       this.poolConfiguration = poolConfiguration;
       return this;
    }
+
+   public void setTransformerConfiguration(NamedPropertyConfiguration configuration) {
+      this.transformerConfiguration = configuration;
+   }
+
+   public NamedPropertyConfiguration getTransformerConfiguration() {
+      return transformerConfiguration;
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/PolicyConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/NamedPropertyConfiguration.java
similarity index 85%
rename from artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/PolicyConfiguration.java
rename to artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/NamedPropertyConfiguration.java
index f1f8630..eefa61d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/PolicyConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/NamedPropertyConfiguration.java
@@ -20,7 +20,7 @@ package org.apache.activemq.artemis.core.config.balancing;
 import java.io.Serializable;
 import java.util.Map;
 
-public class PolicyConfiguration implements Serializable {
+public class NamedPropertyConfiguration implements Serializable {
    private String name;
 
    private Map<String, String> properties;
@@ -29,7 +29,7 @@ public class PolicyConfiguration implements Serializable {
       return name;
    }
 
-   public PolicyConfiguration setName(String name) {
+   public NamedPropertyConfiguration setName(String name) {
       this.name = name;
       return this;
    }
@@ -38,7 +38,7 @@ public class PolicyConfiguration implements Serializable {
       return properties;
    }
 
-   public PolicyConfiguration setProperties(Map<String, String> properties) {
+   public NamedPropertyConfiguration setProperties(Map<String, String> properties) {
       this.properties = properties;
       return this;
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 1e92a83..bd27103 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -47,7 +47,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
-import org.apache.activemq.artemis.core.config.balancing.PolicyConfiguration;
+import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
 import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
 import org.apache.activemq.artemis.core.config.BridgeConfiguration;
 import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
@@ -93,6 +93,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
 import org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactoryResolver;
 import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
+import org.apache.activemq.artemis.core.server.balancing.transformer.TransformerFactoryResolver;
 import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
 import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
@@ -2653,7 +2654,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
       brokerBalancerConfiguration.setCacheTimeout(getInteger(e, "cache-timeout",
          brokerBalancerConfiguration.getCacheTimeout(), Validators.MINUS_ONE_OR_GE_ZERO));
 
-      PolicyConfiguration policyConfiguration = null;
+      NamedPropertyConfiguration policyConfiguration = null;
       PoolConfiguration poolConfiguration = null;
       NodeList children = e.getChildNodes();
 
@@ -2661,20 +2662,34 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
          Node child = children.item(j);
 
          if (child.getNodeName().equals("policy")) {
-            policyConfiguration = new PolicyConfiguration();
-            parsePolicyConfiguration((Element)child, policyConfiguration);
+            policyConfiguration = new NamedPropertyConfiguration();
+            parsePolicyConfiguration((Element) child, policyConfiguration);
             brokerBalancerConfiguration.setPolicyConfiguration(policyConfiguration);
          } else if (child.getNodeName().equals("pool")) {
             poolConfiguration = new PoolConfiguration();
             parsePoolConfiguration((Element) child, config, poolConfiguration);
             brokerBalancerConfiguration.setPoolConfiguration(poolConfiguration);
+         } else if (child.getNodeName().equals("local-target-key-transformer")) {
+            policyConfiguration = new NamedPropertyConfiguration();
+            parseTransformerConfiguration((Element) child, policyConfiguration);
+            brokerBalancerConfiguration.setTransformerConfiguration(policyConfiguration);
          }
       }
 
       config.getBalancerConfigurations().add(brokerBalancerConfiguration);
    }
 
-   private void parsePolicyConfiguration(final Element e, final PolicyConfiguration policyConfiguration) throws ClassNotFoundException {
+   private void parseTransformerConfiguration(final Element e, final NamedPropertyConfiguration policyConfiguration) throws ClassNotFoundException {
+      String name = e.getAttribute("name");
+
+      TransformerFactoryResolver.getInstance().resolve(name);
+
+      policyConfiguration.setName(name);
+
+      policyConfiguration.setProperties(getMapOfChildPropertyElements(e));
+   }
+
+   private void parsePolicyConfiguration(final Element e, final NamedPropertyConfiguration policyConfiguration) throws ClassNotFoundException {
       String name = e.getAttribute("name");
 
       PolicyFactoryResolver.getInstance().resolve(name);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
index c256209..a39c738 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.server.balancing.targets.Target;
 import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
 import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver;
 import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
+import org.apache.activemq.artemis.core.server.balancing.transformer.KeyTransformer;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.jboss.logging.Logger;
 
@@ -40,7 +41,6 @@ public class BrokerBalancer implements ActiveMQComponent {
 
    public static final String CLIENT_ID_PREFIX = ActiveMQDefaultConfiguration.DEFAULT_INTERNAL_NAMING_PREFIX + "balancer.client.";
 
-
    private final String name;
 
    private final TargetKey targetKey;
@@ -55,6 +55,8 @@ public class BrokerBalancer implements ActiveMQComponent {
 
    private final Policy policy;
 
+   private final KeyTransformer transformer;
+
    private final Cache<String, TargetResult> cache;
 
    private volatile boolean started = false;
@@ -93,11 +95,21 @@ public class BrokerBalancer implements ActiveMQComponent {
    }
 
 
-   public BrokerBalancer(final String name, final TargetKey targetKey, final String targetKeyFilter, final Target localTarget, final String localTargetFilter, final Pool pool, final Policy policy, final int cacheTimeout) {
+   public BrokerBalancer(final String name,
+                         final TargetKey targetKey,
+                         final String targetKeyFilter,
+                         final Target localTarget,
+                         final String localTargetFilter,
+                         final Pool pool,
+                         final Policy policy,
+                         KeyTransformer transformer,
+                         final int cacheTimeout) {
       this.name = name;
 
       this.targetKey = targetKey;
 
+      this.transformer = transformer;
+
       this.targetKeyResolver = new TargetKeyResolver(targetKey, targetKeyFilter);
 
       this.localTarget = new TargetResult(localTarget);
@@ -149,7 +161,7 @@ public class BrokerBalancer implements ActiveMQComponent {
 
    public TargetResult getTarget(String key) {
 
-      if (this.localTargetFilter != null && this.localTargetFilter.matcher(key).matches()) {
+      if (this.localTargetFilter != null && this.localTargetFilter.matcher(transform(key)).matches()) {
          if (logger.isDebugEnabled()) {
             logger.debug("The " + targetKey + "[" + key + "] matches the localTargetFilter " + localTargetFilter.pattern());
          }
@@ -201,4 +213,15 @@ public class BrokerBalancer implements ActiveMQComponent {
 
       return result != null ? result : TargetResult.REFUSED_UNAVAILABLE_RESULT;
    }
+
+   private String transform(String key) {
+      String result = key;
+      if (transformer != null) {
+         result = transformer.transform(key);
+         if (logger.isDebugEnabled()) {
+            logger.debug("Key: " + key + ", transformed to " + result);
+         }
+      }
+      return result;
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java
index 10afe15..f9b8002 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java
@@ -21,7 +21,7 @@ import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.cluster.DiscoveryGroup;
 import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
-import org.apache.activemq.artemis.core.config.balancing.PolicyConfiguration;
+import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
 import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@@ -39,6 +39,9 @@ import org.apache.activemq.artemis.core.server.balancing.targets.ActiveMQTargetF
 import org.apache.activemq.artemis.core.server.balancing.targets.LocalTarget;
 import org.apache.activemq.artemis.core.server.balancing.targets.Target;
 import org.apache.activemq.artemis.core.server.balancing.targets.TargetFactory;
+import org.apache.activemq.artemis.core.server.balancing.transformer.KeyTransformer;
+import org.apache.activemq.artemis.core.server.balancing.transformer.TransformerFactory;
+import org.apache.activemq.artemis.core.server.balancing.transformer.TransformerFactoryResolver;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.jboss.logging.Logger;
 
@@ -95,13 +98,19 @@ public final class BrokerBalancerManager implements ActiveMQComponent {
       }
 
       Policy policy = null;
-      PolicyConfiguration policyConfiguration = config.getPolicyConfiguration();
+      NamedPropertyConfiguration policyConfiguration = config.getPolicyConfiguration();
       if (policyConfiguration != null) {
          policy = deployPolicy(policyConfiguration, pool);
       }
 
+      KeyTransformer transformer = null;
+      NamedPropertyConfiguration transformerConfiguration = config.getTransformerConfiguration();
+      if (transformerConfiguration != null) {
+         transformer = deployTransformer(transformerConfiguration);
+      }
+
       BrokerBalancer balancer = new BrokerBalancer(config.getName(), config.getTargetKey(), config.getTargetKeyFilter(),
-         localTarget, config.getLocalTargetFilter(), pool, policy, config.getCacheTimeout());
+                                                   localTarget, config.getLocalTargetFilter(), pool, policy, transformer, config.getCacheTimeout());
 
       balancerControllers.put(balancer.getName(), balancer);
 
@@ -160,10 +169,10 @@ public final class BrokerBalancerManager implements ActiveMQComponent {
       return pool;
    }
 
-   private Policy deployPolicy(PolicyConfiguration policyConfig, Pool pool) throws ClassNotFoundException {
+   private Policy deployPolicy(NamedPropertyConfiguration policyConfig, Pool pool) throws ClassNotFoundException {
       PolicyFactory policyFactory = PolicyFactoryResolver.getInstance().resolve(policyConfig.getName());
 
-      Policy policy = policyFactory.createPolicy(policyConfig.getName());
+      Policy policy = policyFactory.create();
 
       policy.init(policyConfig.getProperties());
 
@@ -174,6 +183,16 @@ public final class BrokerBalancerManager implements ActiveMQComponent {
       return policy;
    }
 
+   private KeyTransformer deployTransformer(NamedPropertyConfiguration configuration) throws Exception {
+      TransformerFactory factory = TransformerFactoryResolver.getInstance().resolve(configuration.getName());
+
+      KeyTransformer transformer = factory.create();
+
+      transformer.init(configuration.getProperties());
+
+      return transformer;
+   }
+
    public BrokerBalancer getBalancer(String name) {
       return balancerControllers.get(name);
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/ConsistentHashPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/ConsistentHashPolicy.java
index 77d4076..4c0dd46 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/ConsistentHashPolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/ConsistentHashPolicy.java
@@ -31,10 +31,6 @@ public class ConsistentHashPolicy extends AbstractPolicy {
       super(NAME);
    }
 
-   protected ConsistentHashPolicy(String name) {
-      super(name);
-   }
-
    @Override
    public Target selectTarget(List<Target> targets, String key) {
       if (targets.size() > 1) {
@@ -60,7 +56,7 @@ public class ConsistentHashPolicy extends AbstractPolicy {
       return null;
    }
 
-   private int getHash(String str) {
+   public static int getHash(String str) {
       final int FNV_INIT = 0x811c9dc5;
       final int FNV_PRIME = 0x01000193;
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/DefaultPolicyFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/DefaultPolicyFactory.java
deleted file mode 100644
index aa39787..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/DefaultPolicyFactory.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.core.server.balancing.policies;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.Supplier;
-
-public class DefaultPolicyFactory extends PolicyFactory {
-   private static final Map<String, Supplier<AbstractPolicy>> supportedPolicies = new HashMap<>();
-
-   static {
-      supportedPolicies.put(ConsistentHashPolicy.NAME, () -> new ConsistentHashPolicy());
-      supportedPolicies.put(FirstElementPolicy.NAME, () -> new FirstElementPolicy());
-      supportedPolicies.put(LeastConnectionsPolicy.NAME, () -> new LeastConnectionsPolicy());
-      supportedPolicies.put(RoundRobinPolicy.NAME, () -> new RoundRobinPolicy());
-   }
-
-   @Override
-   public String[] getSupportedPolicies() {
-      return supportedPolicies.keySet().toArray(new String[supportedPolicies.size()]);
-   }
-
-   @Override
-   public AbstractPolicy createPolicy(String policyName) {
-      Supplier<AbstractPolicy> policySupplier = supportedPolicies.get(policyName);
-
-      if (policySupplier == null) {
-         throw new IllegalArgumentException("Policy not supported: " + policyName);
-      }
-
-      return policySupplier.get();
-   }
-}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java
index 4c745ee..227fda9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java
@@ -17,8 +17,6 @@
 
 package org.apache.activemq.artemis.core.server.balancing.policies;
 
-public abstract class PolicyFactory {
-   public abstract String[] getSupportedPolicies();
-
-   public abstract Policy createPolicy(String policyName);
+public interface PolicyFactory {
+   Policy create();
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolver.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolver.java
index dab4f93..45a61db 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolver.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolver.java
@@ -36,7 +36,10 @@ public class PolicyFactoryResolver {
    private final Map<String, PolicyFactory> policyFactories = new HashMap<>();
 
    private PolicyFactoryResolver() {
-      registerPolicyFactory(new DefaultPolicyFactory());
+      policyFactories.put(ConsistentHashPolicy.NAME, () -> new ConsistentHashPolicy());
+      policyFactories.put(FirstElementPolicy.NAME, () -> new FirstElementPolicy());
+      policyFactories.put(LeastConnectionsPolicy.NAME, () -> new LeastConnectionsPolicy());
+      policyFactories.put(RoundRobinPolicy.NAME, () -> new RoundRobinPolicy());
 
       loadPolicyFactories();
    }
@@ -56,19 +59,15 @@ public class PolicyFactoryResolver {
          PolicyFactory.class, BrokerBalancer.class.getClassLoader());
 
       for (PolicyFactory policyFactory : serviceLoader) {
-         registerPolicyFactory(policyFactory);
+         policyFactories.put(keyFromClassName(policyFactory.getClass().getName()), policyFactory);
       }
    }
 
-   public void registerPolicyFactory(PolicyFactory policyFactory) {
-      for (String policyName : policyFactory.getSupportedPolicies()) {
-         policyFactories.put(policyName, policyFactory);
-      }
+   public void registerPolicyFactory(String name, PolicyFactory policyFactory) {
+      policyFactories.put(name, policyFactory);
    }
 
-   public void unregisterPolicyFactory(PolicyFactory policyFactory) {
-      for (String policyName : policyFactory.getSupportedPolicies()) {
-         policyFactories.remove(policyName, policyFactory);
-      }
+   String keyFromClassName(String name) {
+      return name.substring(0, name.indexOf("PolicyFactory"));
    }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/ConsistentHashModulo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/ConsistentHashModulo.java
new file mode 100644
index 0000000..a092e86
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/ConsistentHashModulo.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.balancing.transformer;
+
+import java.util.Map;
+
+import org.apache.activemq.artemis.core.server.balancing.policies.ConsistentHashPolicy;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver;
+
+public class ConsistentHashModulo implements KeyTransformer {
+   public static final String NAME = "CONSISTENT_HASH_MODULO";
+   public static final String MODULO = "modulo";
+   int modulo = 0;
+
+   @Override
+   public String transform(String str) {
+      if (TargetKeyResolver.DEFAULT_KEY_VALUE.equals(str)) {
+         // we only want to transform resolved keys
+         return str;
+      }
+      if (modulo == 0) {
+         return str;
+      }
+      int hash = ConsistentHashPolicy.getHash(str);
+      return String.valueOf( hash % modulo );
+   }
+
+   @Override
+   public void init(Map<String, String> properties) {
+      modulo = Integer.parseInt(properties.get(MODULO));
+   }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/KeyTransformer.java
similarity index 76%
copy from artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java
copy to artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/KeyTransformer.java
index 4c745ee..dc0224e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/KeyTransformer.java
@@ -15,10 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.artemis.core.server.balancing.policies;
+package org.apache.activemq.artemis.core.server.balancing.transformer;
 
-public abstract class PolicyFactory {
-   public abstract String[] getSupportedPolicies();
+import java.util.Map;
 
-   public abstract Policy createPolicy(String policyName);
+public interface KeyTransformer {
+
+   default void init(Map<String, String> properties) {
+   }
+
+   default String transform(String key) {
+      return key;
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactory.java
similarity index 78%
copy from artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java
copy to artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactory.java
index 4c745ee..af60fda 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactory.java
@@ -15,10 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.artemis.core.server.balancing.policies;
+package org.apache.activemq.artemis.core.server.balancing.transformer;
 
-public abstract class PolicyFactory {
-   public abstract String[] getSupportedPolicies();
-
-   public abstract Policy createPolicy(String policyName);
+public interface TransformerFactory {
+   KeyTransformer create();
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactoryResolver.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactoryResolver.java
new file mode 100644
index 0000000..46dc635
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactoryResolver.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.balancing.transformer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+import org.apache.activemq.artemis.core.server.balancing.BrokerBalancer;
+
+public class TransformerFactoryResolver {
+   private static TransformerFactoryResolver instance;
+
+   public static TransformerFactoryResolver getInstance() {
+      if (instance == null) {
+         instance = new TransformerFactoryResolver();
+      }
+      return instance;
+   }
+
+   private final Map<String, TransformerFactory> factories = new HashMap<>();
+
+   private TransformerFactoryResolver() {
+      factories.put(ConsistentHashModulo.NAME, () -> new ConsistentHashModulo());
+      loadFactories(); // let service loader override
+   }
+
+   public TransformerFactory resolve(String policyName) throws ClassNotFoundException {
+      TransformerFactory factory = factories.get(policyName);
+      if (factory == null) {
+         throw new ClassNotFoundException("No TransformerFactory found for " + policyName);
+      }
+      return factory;
+   }
+
+   private void loadFactories() {
+      ServiceLoader<TransformerFactory> serviceLoader = ServiceLoader.load(
+         TransformerFactory.class, BrokerBalancer.class.getClassLoader());
+      for (TransformerFactory factory : serviceLoader) {
+         factories.put(keyFromClassName(factory.getClass().getName()), factory);
+      }
+   }
+
+   String keyFromClassName(String name) {
+      return name.substring(0, name.indexOf("TransformerFactory"));
+   }
+}
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 27d3134..6d29d83 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2136,6 +2136,13 @@
                </xsd:documentation>
             </xsd:annotation>
          </xsd:element>
+         <xsd:element name="local-target-key-transformer" type="brokerBalancerKeyTransformerType" maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  the local target key transformer configuration
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
       </xsd:sequence>
       <xsd:attribute name="name" type="xsd:string" use="required">
          <xsd:annotation>
@@ -2176,6 +2183,26 @@
       <xsd:attributeGroup ref="xml:specialAttrs"/>
    </xsd:complexType>
 
+   <xsd:complexType name="brokerBalancerKeyTransformerType">
+      <xsd:sequence>
+         <xsd:element ref="property" maxOccurs="unbounded" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  properties to configure a key transformer
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+      </xsd:sequence>
+      <xsd:attribute name="name" type="xsd:ID" use="required">
+         <xsd:annotation>
+            <xsd:documentation>
+               the name of the policy
+            </xsd:documentation>
+         </xsd:annotation>
+      </xsd:attribute>
+      <xsd:attributeGroup ref="xml:specialAttrs"/>
+   </xsd:complexType>
+
    <xsd:complexType name="brokerBalancerPoolType">
       <xsd:sequence maxOccurs="unbounded">
          <xsd:element name="username" type="xsd:string" maxOccurs="1" minOccurs="0">
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index d33b3ee..344aae0 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -76,6 +76,8 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.apache.activemq.artemis.core.server.balancing.transformer.ConsistentHashModulo.MODULO;
+
 public class FileConfigurationTest extends ConfigurationImplTest {
 
    @BeforeClass
@@ -265,13 +267,20 @@ public class FileConfigurationTest extends ConfigurationImplTest {
          }
       }
 
-      Assert.assertEquals(4, conf.getBalancerConfigurations().size());
+      Assert.assertEquals(5, conf.getBalancerConfigurations().size());
       for (BrokerBalancerConfiguration bc : conf.getBalancerConfigurations()) {
          if (bc.getName().equals("simple-local")) {
             Assert.assertEquals(bc.getTargetKey(), TargetKey.CLIENT_ID);
             Assert.assertNotNull(bc.getLocalTargetFilter());
             Assert.assertNotNull(bc.getTargetKeyFilter());
             Assert.assertNull(bc.getPolicyConfiguration());
+         } else if (bc.getName().equals("simple-local-with-transformer")) {
+            Assert.assertEquals(bc.getTargetKey(), TargetKey.CLIENT_ID);
+            Assert.assertNotNull(bc.getLocalTargetFilter());
+            Assert.assertNotNull(bc.getTargetKeyFilter());
+            Assert.assertNull(bc.getPolicyConfiguration());
+            Assert.assertNotNull(bc.getTransformerConfiguration());
+            Assert.assertNotNull(bc.getTransformerConfiguration().getProperties().get(MODULO));
          } else if (bc.getName().equals("simple-balancer")) {
             Assert.assertEquals(bc.getTargetKey(), TargetKey.USER_NAME);
             Assert.assertNull(bc.getLocalTargetFilter());
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManagerTest.java
index b98c7ec..a5bb93d 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManagerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManagerTest.java
@@ -17,12 +17,16 @@
 
 package org.apache.activemq.artemis.core.server.balancing;
 
+import java.util.HashMap;
+
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
-import org.apache.activemq.artemis.core.config.balancing.PolicyConfiguration;
+import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
 import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.balancing.policies.ConsistentHashPolicy;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
+import org.apache.activemq.artemis.core.server.balancing.transformer.ConsistentHashModulo;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.junit.After;
 import org.junit.Before;
@@ -62,7 +66,7 @@ public class BrokerBalancerManagerTest {
 
       BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration();
       brokerBalancerConfiguration.setName("partition-local-pool");
-      PolicyConfiguration policyConfig = new PolicyConfiguration();
+      NamedPropertyConfiguration policyConfig = new NamedPropertyConfiguration();
       policyConfig.setName(ConsistentHashPolicy.NAME);
       brokerBalancerConfiguration.setPolicyConfiguration(policyConfig);
 
@@ -84,4 +88,23 @@ public class BrokerBalancerManagerTest {
 
       underTest.deployBrokerBalancer(brokerBalancerConfiguration);
    }
+
+   @Test()
+   public void deployLocalOnlyWithPolicy() throws Exception {
+
+      ManagementService mockManagementService = Mockito.mock(ManagementService.class);
+      Mockito.when(mockServer.getManagementService()).thenReturn(mockManagementService);
+
+      BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration();
+      brokerBalancerConfiguration.setName("partition-local-consistent-hash").setTargetKey(TargetKey.CLIENT_ID).setLocalTargetFilter(String.valueOf(2));
+      NamedPropertyConfiguration policyConfig = new NamedPropertyConfiguration();
+      policyConfig.setName(ConsistentHashModulo.NAME);
+      HashMap<String, String> properties = new HashMap<>();
+      properties.put(ConsistentHashModulo.MODULO, String.valueOf(2));
+      policyConfig.setProperties(properties);
+      brokerBalancerConfiguration.setTransformerConfiguration(policyConfig);
+
+
+      underTest.deployBrokerBalancer(brokerBalancerConfiguration);
+   }
 }
\ No newline at end of file
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
index 7f8ca0e..732ea53 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
@@ -18,7 +18,6 @@
 package org.apache.activemq.artemis.core.server.balancing;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -29,7 +28,7 @@ import org.apache.activemq.artemis.core.server.balancing.targets.LocalTarget;
 import org.apache.activemq.artemis.core.server.balancing.targets.Target;
 import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
 import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
-import org.junit.After;
+import org.apache.activemq.artemis.core.server.balancing.transformer.KeyTransformer;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -44,38 +43,34 @@ public class BrokerBalancerTest {
 
    @Before
    public void setUp() {
-
       ActiveMQServer mockServer = mock(ActiveMQServer.class);
       Mockito.when(mockServer.getNodeID()).thenReturn(SimpleString.toSimpleString("UUID"));
-
       localTarget = new LocalTarget(null, mockServer);
+   }
 
+   @Test
+   public void getTarget() {
       Pool pool = null;
       Policy policy = null;
       underTest  = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}",
-                                                   localTarget, "^FOO.*", pool, policy, 0);
-      try {
-         underTest.start();
-      } catch (Exception e) {
-         fail(e.getMessage());
-      }
-   }
-
-   @After
-   public void after() {
-      if (underTest != null) {
-         try {
-            underTest.stop();
-         } catch (Exception e) {
-            fail(e.getMessage());
-         }
-      }
+                                      localTarget, "^FOO.*", pool, policy, null, 0);
+      assertEquals( localTarget, underTest.getTarget("FOO_EE").getTarget());
+      assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT, underTest.getTarget("BAR_EE"));
    }
 
    @Test
-   public void getTarget() {
-      assertEquals( localTarget, underTest.getTarget("FOO_EE").getTarget());
-      assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT, underTest.getTarget("BAR_EE"));
+   public void getLocalTargetWithTransformer() throws Exception {
+      Pool pool = null;
+      Policy policy = null;
+      KeyTransformer keyTransformer = new KeyTransformer() {
+         @Override
+         public String transform(String key) {
+            return key.substring("TRANSFORM_TO".length() + 1);
+         }
+      };
+      underTest  = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}",
+                                      localTarget, "^FOO.*", pool, policy, keyTransformer, 0);
+      assertEquals( localTarget, underTest.getTarget("TRANSFORM_TO_FOO_EE").getTarget());
    }
 
 }
\ No newline at end of file
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolverTest.java
similarity index 52%
copy from artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java
copy to artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolverTest.java
index 4c745ee..0804ba4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolverTest.java
@@ -14,11 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.activemq.artemis.core.server.balancing.policies;
 
-public abstract class PolicyFactory {
-   public abstract String[] getSupportedPolicies();
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class PolicyFactoryResolverTest {
+
+   @Test
+   public void resolveOk() throws Exception {
+      PolicyFactoryResolver instance = PolicyFactoryResolver.getInstance();
+      assertNotNull(instance.resolve(ConsistentHashPolicy.NAME));
+   }
+
+   @Test(expected = ClassNotFoundException.class)
+   public void resolveError() throws Exception {
+      PolicyFactoryResolver instance = PolicyFactoryResolver.getInstance();
+      assertNotNull(instance.resolve("NOT PRESENT"));
+   }
 
-   public abstract Policy createPolicy(String policyName);
-}
+   @Test
+   public void keyFromName() throws Exception {
+      PolicyFactoryResolver instance = PolicyFactoryResolver.getInstance();
+      assertEquals("New", instance.keyFromClassName("NewPolicyFactory"));
+   }
+}
\ No newline at end of file
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/transformer/ConsistentHashModuloTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/transformer/ConsistentHashModuloTest.java
new file mode 100644
index 0000000..137181b
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/transformer/ConsistentHashModuloTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.balancing.transformer;
+
+import java.util.HashMap;
+
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ConsistentHashModuloTest {
+
+   @Test
+   public void transform() {
+      ConsistentHashModulo underTest = new ConsistentHashModulo();
+
+      assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, underTest.transform(TargetKeyResolver.DEFAULT_KEY_VALUE));
+
+      assertEquals("AA", underTest.transform("AA")); // default modulo 0 does nothing
+
+      HashMap<String, String> properties = new HashMap<>();
+
+      final int modulo = 2;
+      properties.put(ConsistentHashModulo.MODULO, String.valueOf(modulo));
+      underTest.init(properties);
+
+      String hash1 = underTest.transform("AAA");
+      int v1 = Integer.parseInt(hash1);
+
+      String hash2 = underTest.transform("BBB");
+      int v2 = Integer.parseInt(hash2);
+
+      assertNotEquals(hash1, hash2);
+      assertNotEquals(v1, v2);
+      assertTrue(v1 < modulo && v2 < modulo);
+   }
+}
\ No newline at end of file
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactoryResolverTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactoryResolverTest.java
new file mode 100644
index 0000000..d102da1
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactoryResolverTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.balancing.transformer;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TransformerFactoryResolverTest {
+
+   @Test
+   public void resolveOk() throws Exception {
+      TransformerFactoryResolver instance = TransformerFactoryResolver.getInstance();
+      assertNotNull(instance.resolve(ConsistentHashModulo.NAME));
+   }
+
+   @Test(expected = ClassNotFoundException.class)
+   public void resolveError() throws Exception {
+      TransformerFactoryResolver instance = TransformerFactoryResolver.getInstance();
+      assertNotNull(instance.resolve("NOT PRESENT"));
+   }
+
+   @Test
+   public void keyFromName() throws Exception {
+      TransformerFactoryResolver instance = TransformerFactoryResolver.getInstance();
+      assertEquals("New", instance.keyFromClassName("NewTransformerFactory"));
+   }
+}
\ No newline at end of file
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 7887458..7d6b04e 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -1964,7 +1964,7 @@ public abstract class ActiveMQTestBase extends Assert {
             }
          }
 
-         if (bindingCount == expectedBindingCount && totConsumers == expectedConsumerCount) {
+         if (bindingCount == expectedBindingCount && (expectedConsumerCount == -1 || totConsumers == expectedConsumerCount)) {
             return true;
          }
 
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 0a772b1..5c58d82 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -158,6 +158,14 @@
             <target-key-filter>^[^.]+</target-key-filter>
             <local-target-filter>DEFAULT</local-target-filter>
          </broker-balancer>
+         <broker-balancer name="simple-local-with-transformer">
+            <target-key>CLIENT_ID</target-key>
+            <target-key-filter>^[^.]+</target-key-filter>
+            <local-target-filter>DEFAULT</local-target-filter>
+            <local-target-key-transformer name="CONSISTENT_HASH_MODULO">
+               <property key="modulo" value="2"></property>
+            </local-target-key-transformer>
+         </broker-balancer>
          <broker-balancer name="simple-balancer">
             <target-key>USER_NAME</target-key>
             <policy name="FIRST_ELEMENT"/>
diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index faaef30..e5edf10 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -149,6 +149,14 @@
             <target-key-filter>^[^.]+</target-key-filter>
             <local-target-filter>DEFAULT</local-target-filter>
          </broker-balancer>
+         <broker-balancer name="simple-local-with-transformer">
+            <target-key>CLIENT_ID</target-key>
+            <target-key-filter>^[^.]+</target-key-filter>
+            <local-target-filter>DEFAULT</local-target-filter>
+            <local-target-key-transformer name="CONSISTENT_HASH_MODULO">
+               <property key="modulo" value="2"></property>
+            </local-target-key-transformer>
+         </broker-balancer>
          <broker-balancer name="simple-balancer">
             <target-key>USER_NAME</target-key>
             <policy name="FIRST_ELEMENT"/>
diff --git a/docs/user-manual/en/broker-balancers.md b/docs/user-manual/en/broker-balancers.md
index be42656..c821877 100644
--- a/docs/user-manual/en/broker-balancers.md
+++ b/docs/user-manual/en/broker-balancers.md
@@ -106,12 +106,19 @@ So a broker balancer with the cache enabled doesn't strictly follow the configur
 By default, the cache is enabled and will never timeout. See below
 for more details about setting the `cache-timeout` parameter.
 
+## Key transformers
+A `local-target-key-transformer` allows target key transformation before matching against any local-target-filter. One use case is
+CLIENT_ID sharding across a cluster of N brokers. With a consistent hash % N transformation, each client id
+can map exclusively to just one of the brokers. The included transformers are:
+* `CONSISTENT_HASH_MODULO` that takes a single `modulo` property to configure the bound.
+
 ## Defining broker balancers
 A broker balancer is defined by the `broker-balancer` element, it includes the following items:
 * the `name` attribute defines the name of the broker balancer and is used to reference the balancer from an acceptor;
 * the `target-key` element defines what key to select a target broker, the supported values are: `CLIENT_ID`, `SNI_HOST`, `SOURCE_IP`, `USER_NAME`, `ROLE_NAME`, default is `SOURCE_IP`, see [target key](#target-key) for further details;
 * the `target-key-filter` element defines a regular expression to filter the resolved keys;
 * the `local-target-filter` element defines a regular expression to match the keys that have to return a local target;
+* the `local-target-key-transformer` element defines a key transformer, see [key transformers](#key-transformers);
 * the `cache-timeout` element is the time period for a target broker to remain in the cache, measured in milliseconds, setting `0` will disable the cache, default is `-1`, meaning no expiration;
 * the `pool` element defines the pool to group the target brokers, see [pools](#pools).
 * the `policy` element defines the policy used to select the target brokers from the pool, see [policies](#policies);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AutoClientIDShardClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AutoClientIDShardClusterTest.java
new file mode 100644
index 0000000..18b605f
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AutoClientIDShardClusterTest.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.balancing;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
+import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver;
+import org.apache.activemq.artemis.core.server.balancing.transformer.ConsistentHashModulo;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class AutoClientIDShardClusterTest extends BalancingTestBase {
+
+   @Parameterized.Parameters(name = "protocol: {0}")
+   public static Collection<Object[]> data() {
+      final String[] protocols = new String[] {AMQP_PROTOCOL, CORE_PROTOCOL, OPENWIRE_PROTOCOL};
+      Collection<Object[]> data = new ArrayList<>();
+      for (String protocol : protocols) {
+         data.add(new Object[] {protocol});
+      }
+      return data;
+   }
+
+   private final String protocol;
+   final int numMessages = 50;
+   AtomicInteger toSend = new AtomicInteger(numMessages);
+
+   public AutoClientIDShardClusterTest(String protocol) {
+      this.protocol = protocol;
+   }
+
+   protected void setupServers() throws Exception {
+      for (int i = 0; i < 2; i++) {
+         setupLiveServer(i, true, HAType.SharedNothingReplication, true, false);
+         servers[i].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
+         servers[i].addProtocolManagerFactory(new OpenWireProtocolManagerFactory());
+      }
+      setupClusterConnection("cluster0", name.getMethodName(), MessageLoadBalancingType.ON_DEMAND, 1, true, 0, 1);
+      setupClusterConnection("cluster1", name.getMethodName(), MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 0);
+      toSend.set(numMessages);
+   }
+
+   Runnable producer = new Runnable() {
+      final AtomicInteger producerSeq = new AtomicInteger();
+
+      @Override
+      public void run() {
+         while (toSend.get() > 0) {
+            try {
+               ConnectionFactory connectionFactory = createFactory(protocol, "producer", "admin", "admin");
+               try (Connection connection = connectionFactory.createConnection()) {
+                  connection.start();
+                  try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+                     javax.jms.Topic topic = session.createTopic(name.getMethodName());
+                     try (MessageProducer producer = session.createProducer(topic)) {
+                        for (int i = 0; i < 10 && toSend.get() > 0; i++) {
+                           Message message = session.createTextMessage();
+                           message.setIntProperty("SEQ", producerSeq.get() + 1);
+                           producer.send(message);
+                           producerSeq.incrementAndGet();
+                           toSend.decrementAndGet();
+                        }
+                        TimeUnit.MILLISECONDS.sleep(100);
+                     }
+                  }
+               }
+            } catch (Exception ok) {
+            }
+         }
+      }
+   };
+
+   class DurableSub implements Runnable {
+
+      final String id;
+      int receivedInOrder = -1;
+      int lastReceived;
+      int maxReceived;
+      AtomicBoolean consumerDone = new AtomicBoolean();
+      AtomicBoolean orderShot = new AtomicBoolean();
+      CountDownLatch registered = new CountDownLatch(1);
+
+      DurableSub(String id) {
+         this.id = id;
+      }
+
+      @Override
+      public void run() {
+         while (!consumerDone.get()) {
+            try {
+               ConnectionFactory connectionFactory = createFactory(protocol, "ClientId-" + id, "admin", "admin");
+               Connection connection = null;
+               try {
+                  connection = connectionFactory.createConnection();
+                  connection.start();
+                  try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+                     javax.jms.Topic topic = session.createTopic(name.getMethodName());
+                     try (TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, "Sub-" + id)) {
+                        registered.countDown();
+                        for (int i = 0; i < 5; i++) {
+                           Message message = durableSubscriber.receive(500);
+                           if (message != null) {
+                              lastReceived = message.getIntProperty("SEQ");
+                              if (lastReceived > maxReceived) {
+                                 maxReceived = lastReceived;
+                              }
+                              if (receivedInOrder < 0) {
+                                 receivedInOrder = lastReceived;
+                              } else if (receivedInOrder == lastReceived - 1) {
+                                 receivedInOrder++;
+                              } else {
+                                 if (!orderShot.get()) {
+                                    System.err.println("Sub: " + id + ", received: out of order " + lastReceived + ", last in order: " + receivedInOrder);
+                                 }
+                                 orderShot.set(true);
+                              }
+                           } else {
+                              // no point trying again if there is nothing for us now.
+                              break;
+                           }
+                        }
+                        TimeUnit.MILLISECONDS.sleep(500);
+                     }
+                  }
+               } finally {
+                  if (connection != null) {
+                     connection.close(); // seems openwire not jms2.0 auto closable always
+                  }
+               }
+            } catch (Exception ok) {
+            }
+         }
+      }
+   }
+
+   @Ignore("not totally reliable, but does show the root cause of the problem being solved")
+   public void testWithoutOutSharding() throws Exception {
+      setupServers();
+      startServers(0, 1);
+
+      // two bouncy durable consumers
+      DurableSub sub0 = new DurableSub("0");
+      DurableSub sub1 = new DurableSub("1");
+
+      ExecutorService executorService = Executors.newFixedThreadPool(3);
+      try {
+         executorService.submit(sub0);
+         executorService.submit(sub1);
+
+         // waiting for registration before production to give bridges a chance
+         assertTrue(sub0.registered.await(20, TimeUnit.SECONDS));
+         assertTrue(sub1.registered.await(20, TimeUnit.SECONDS));
+
+         assertTrue(waitForBindings(servers[0], name.getMethodName(), true, 2, -1, 10000));
+         assertTrue(waitForBindings(servers[1], name.getMethodName(), true, 2, -1, 10000));
+
+         // wait for remote bindings!
+         assertTrue(waitForBindings(servers[0], name.getMethodName(), false, 2, -1, 10000));
+         assertTrue(waitForBindings(servers[1], name.getMethodName(), false, 2, -1, 10000));
+
+         // produce a few every second with failover randomize=true so we produce on all nodes
+         executorService.submit(producer);
+
+         assertTrue("All sent", Wait.waitFor(() -> toSend.get() == 0));
+
+         assertTrue("All received sub0", Wait.waitFor(() -> sub0.maxReceived == numMessages));
+
+         assertTrue("All received sub1", Wait.waitFor(() -> sub1.maxReceived == numMessages));
+
+         // with bouncing, one 'may' be out of order, hence ignored
+         assertTrue(sub0.orderShot.get() || sub1.orderShot.get());
+
+      } finally {
+         sub0.consumerDone.set(true);
+         sub1.consumerDone.set(true);
+         executorService.shutdown();
+         stopServers(0, 1);
+      }
+   }
+
+   @Test
+   public void testWithConsistentHashClientIDModTwo() throws Exception {
+      setupServers();
+
+      addBalancerWithClientIdConsistentHashMod();
+
+      startServers(0, 1);
+
+      // two bouncy durable consumers
+      DurableSub sub0 = new DurableSub("0");
+      DurableSub sub1 = new DurableSub("1");
+
+      ExecutorService executorService = Executors.newFixedThreadPool(3);
+      try {
+         executorService.submit(sub0);
+         executorService.submit(sub1);
+
+         // waiting for registration before production to give bridges a chance
+         assertTrue(sub0.registered.await(5, TimeUnit.SECONDS));
+         assertTrue(sub1.registered.await(5, TimeUnit.SECONDS));
+
+         assertTrue(waitForBindings(servers[0], name.getMethodName(), true, 1, 1, 2000));
+         assertTrue(waitForBindings(servers[1], name.getMethodName(), true, 1, 1, 2000));
+
+         // wait for remote bindings!
+         assertTrue(waitForBindings(servers[0], name.getMethodName(), false, 1, 1, 10000));
+         assertTrue(waitForBindings(servers[1], name.getMethodName(), false, 1, 1, 10000));
+
+         // produce a few every second with failover randomize=true so we produce on all nodes
+         executorService.submit(producer);
+
+         assertTrue("All sent", Wait.waitFor(() -> toSend.get() == 0));
+
+         assertTrue("All received sub0", Wait.waitFor(() -> sub0.maxReceived == numMessages));
+
+         assertTrue("All received sub1", Wait.waitFor(() -> sub1.maxReceived == numMessages));
+
+         // with partition, none will be out of order
+         assertFalse(sub0.orderShot.get() && sub1.orderShot.get());
+
+      } finally {
+         sub0.consumerDone.set(true);
+         sub1.consumerDone.set(true);
+         executorService.shutdown();
+         stopServers(0, 1);
+      }
+   }
+
+   private void addBalancerWithClientIdConsistentHashMod() {
+      final int numberOfNodes = 2;
+      for (int node = 0; node < numberOfNodes; node++) {
+         Configuration configuration = servers[node].getConfiguration();
+         BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration().setName(BROKER_BALANCER_NAME);
+         brokerBalancerConfiguration.setTargetKey(TargetKey.CLIENT_ID).setLocalTargetFilter(TargetKeyResolver.DEFAULT_KEY_VALUE + "|" + node);
+         NamedPropertyConfiguration transformerConfig = new NamedPropertyConfiguration();
+         transformerConfig.setName(ConsistentHashModulo.NAME);
+         HashMap<String, String> properties = new HashMap<>();
+         properties.put(ConsistentHashModulo.MODULO, String.valueOf(numberOfNodes));
+         transformerConfig.setProperties(properties);
+         brokerBalancerConfiguration.setTransformerConfiguration(transformerConfig);
+
+         configuration.setBalancerConfigurations(Collections.singletonList(brokerBalancerConfiguration));
+
+         TransportConfiguration acceptor = getDefaultServerAcceptor(node);
+         acceptor.getParams().put("redirect-to", BROKER_BALANCER_NAME);
+      }
+   }
+
+   protected ConnectionFactory createFactory(String protocol, String clientID, String user, String password) throws Exception {
+      StringBuilder urlBuilder = new StringBuilder();
+
+      switch (protocol) {
+
+         case CORE_PROTOCOL: {
+            urlBuilder.append("(tcp://localhost:61616,tcp://localhost:61617)?connectionLoadBalancingPolicyClassName=org.apache.activemq.artemis.api.core.client.loadbalance.RandomConnectionLoadBalancingPolicy");
+            urlBuilder.append("&clientID=");
+            urlBuilder.append(clientID);
+
+            return new ActiveMQConnectionFactory(urlBuilder.toString(), user, password);
+         }
+         case AMQP_PROTOCOL: {
+
+            urlBuilder.append("failover:(amqp://localhost:61616,amqp://localhost:61617)?failover.randomize=true");
+            urlBuilder.append("&jms.clientID=");
+            urlBuilder.append(clientID);
+
+            return new JmsConnectionFactory(user, password, urlBuilder.toString());
+         }
+         case OPENWIRE_PROTOCOL: {
+
+            urlBuilder.append("failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=true&maxReconnectAttempts=0&startupMaxReconnectAttempts=0");
+            urlBuilder.append("&jms.clientID=");
+            urlBuilder.append(clientID);
+
+            return new org.apache.activemq.ActiveMQConnectionFactory(user, password, urlBuilder.toString());
+         }
+         default:
+            throw new IllegalStateException("Unexpected value: " + protocol);
+      }
+   }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java
index 890a813..c626325 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java
@@ -27,7 +27,7 @@ import java.util.Map;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
-import org.apache.activemq.artemis.core.config.balancing.PolicyConfiguration;
+import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
 import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
@@ -88,7 +88,7 @@ public class BalancingTestBase extends ClusterTestBase {
       brokerBalancerConfiguration.setTargetKey(targetKey).setLocalTargetFilter(localTargetFilter)
          .setPoolConfiguration(new PoolConfiguration().setCheckPeriod(1000).setQuorumSize(quorumSize)
             .setLocalTargetEnabled(localTargetEnabled).setClusterConnection(clusterConnection))
-         .setPolicyConfiguration(new PolicyConfiguration().setName(policyName).setProperties(properties));
+         .setPolicyConfiguration(new NamedPropertyConfiguration().setName(policyName).setProperties(properties));
 
       configuration.setBalancerConfigurations(Collections.singletonList(brokerBalancerConfiguration));
 
@@ -105,7 +105,7 @@ public class BalancingTestBase extends ClusterTestBase {
       brokerBalancerConfiguration.setTargetKey(targetKey).setLocalTargetFilter(localTargetFilter)
          .setPoolConfiguration(new PoolConfiguration().setCheckPeriod(1000).setQuorumSize(quorumSize)
             .setLocalTargetEnabled(localTargetEnabled).setDiscoveryGroupName("dg1"))
-         .setPolicyConfiguration(new PolicyConfiguration().setName(policyName).setProperties(properties));
+         .setPolicyConfiguration(new NamedPropertyConfiguration().setName(policyName).setProperties(properties));
 
       configuration.setBalancerConfigurations(Collections.singletonList(brokerBalancerConfiguration));
 
@@ -129,7 +129,7 @@ public class BalancingTestBase extends ClusterTestBase {
       brokerBalancerConfiguration.setTargetKey(targetKey).setLocalTargetFilter(localTargetFilter)
          .setPoolConfiguration(new PoolConfiguration().setCheckPeriod(1000).setQuorumSize(quorumSize)
             .setLocalTargetEnabled(localTargetEnabled).setStaticConnectors(staticConnectors))
-         .setPolicyConfiguration(new PolicyConfiguration().setName(policyName).setProperties(properties));
+         .setPolicyConfiguration(new NamedPropertyConfiguration().setName(policyName).setProperties(properties));
 
       configuration.setBalancerConfigurations(Collections.singletonList(brokerBalancerConfiguration));
 
@@ -215,7 +215,7 @@ public class BalancingTestBase extends ClusterTestBase {
                urlBuilder.append(")");
             }
 
-            urlBuilder.append("?failover.startupMaxReconnectAttempts=" + retries);
+            urlBuilder.append("?failover.startupMaxReconnectAttempts=" + retries + "&failover.randomize=true");
 
             if (clientID != null) {
                urlBuilder.append("&jms.clientID=");
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/TargetKeyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/TargetKeyTest.java
index 5ccd8c9..446156d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/TargetKeyTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/TargetKeyTest.java
@@ -88,15 +88,10 @@ public class TargetKeyTest extends BalancingTestBase {
 
    @Before
    public void setup() throws Exception {
-      PolicyFactoryResolver.getInstance().registerPolicyFactory(
+      PolicyFactoryResolver.getInstance().registerPolicyFactory(MOCK_POLICY_NAME,
          new PolicyFactory() {
             @Override
-            public String[] getSupportedPolicies() {
-               return new String[] {MOCK_POLICY_NAME};
-            }
-
-            @Override
-            public Policy createPolicy(String policyName) {
+            public Policy create() {
                return new FirstElementPolicy(MOCK_POLICY_NAME) {
                   @Override
                   public Target selectTarget(List<Target> targets, String key) {