You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/09 20:42:58 UTC

[35/58] [abbrv] activemq-artemis git commit: Added openwire parameters as bean properties so that it can be passed via the new protocol manager bean util.

Added openwire parameters as bean properties so that it
can be passed via the new protocol manager bean util.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d427a78b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d427a78b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d427a78b

Branch: refs/heads/refactor-openwire
Commit: d427a78b6bb0389910178d5cc531ab67d5cd8c19
Parents: df3e515
Author: Howard Gao <ho...@gmail.com>
Authored: Wed Feb 17 20:50:33 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Mar 9 14:41:41 2016 -0500

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   | 25 ++-----------
 .../openwire/OpenWireProtocolManager.java       | 38 +++++++++++++++++---
 .../artemiswrapper/OpenwireArtemisBaseTest.java | 20 ++++++++---
 .../transport/failover/FailoverClusterTest.java | 11 ++++--
 .../failover/FailoverComplexClusterTest.java    | 16 ++++++---
 .../failover/FailoverPriorityTest.java          | 22 ++++++++----
 .../failover/FailoverUpdateURIsTest.java        |  6 +++-
 7 files changed, 94 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d427a78b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 61e93cb..a6f0f34 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -55,7 +55,6 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.ConcurrentHashSet;
-import org.apache.activemq.artemis.utils.ConfigurationHelper;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
@@ -153,10 +152,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
 
    private String defaultSocketURIString;
 
-   private boolean rebalance;
-   private boolean updateClusterClients;
-   private boolean updateClusterClientsOnRemove;
-
    public OpenWireConnection(Acceptor acceptorUsed,
                              Connection connection,
                              OpenWireProtocolManager openWireProtocolManager,
@@ -167,12 +162,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       this.wireFormat = wf;
       this.creationTime = System.currentTimeMillis();
       this.defaultSocketURIString = connection.getLocalAddress();
-
-      //Clebert: These are parameters specific to openwire cluster with defaults as specified at
-      //http://activemq.apache.org/failover-transport-reference.html
-      rebalance = ConfigurationHelper.getBooleanProperty("rebalance-cluster-clients", true, acceptorUsed.getConfiguration());
-      updateClusterClients = ConfigurationHelper.getBooleanProperty("update-cluster-clients", true, acceptorUsed.getConfiguration());
-      updateClusterClientsOnRemove = ConfigurationHelper.getBooleanProperty("update-cluster-clients-on-remove", true, acceptorUsed.getConfiguration());
    }
 
    @Override
@@ -200,10 +189,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       return info.getPassword();
    }
 
-   public boolean isRebalance() {
-      return rebalance;
-   }
-
    private ConnectionInfo getConnectionInfo() {
       if (state == null) {
          return null;
@@ -539,9 +524,9 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
          Response resp = new ExceptionResponse(e);
          return resp;
       }
-      if (info.isManageable() && this.isUpdateClusterClients()) {
+      if (info.isManageable() && protocolManager.isUpdateClusterClients()) {
          // send ConnectionCommand
-         ConnectionControl command = protocolManager.newConnectionControl(rebalance);
+         ConnectionControl command = protocolManager.newConnectionControl();
          command.setFaultTolerant(protocolManager.isFaultTolerantConfiguration());
          if (info.isFailoverReconnect()) {
             command.setRebalanceConnection(false);
@@ -1274,16 +1259,12 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
 
    public void updateClient(ConnectionControl control) {
       //      if (!destroyed && context.isFaultTolerant()) {
-      if (updateClusterClients) {
+      if (protocolManager.isUpdateClusterClients()) {
          dispatchAsync(control);
       }
       //      }
    }
 
-   public boolean isUpdateClusterClients() {
-      return updateClusterClients;
-   }
-
    public AMQConnectionContext initContext(ConnectionInfo info) {
       WireFormatInfo wireFormatInfo = wireFormat.getPreferedWireFormatInfo();
       // Older clients should have been defaulting this field to true.. but

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d427a78b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 525844c..bd26b07 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -148,6 +148,12 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
 
    private final ScheduledExecutorService scheduledPool;
 
+   //bean properties
+   //http://activemq.apache.org/failover-transport-reference.html
+   private boolean rebalanceClusterClients = false;
+   private boolean updateClusterClients = false;
+   private boolean updateClusterClientsOnRemove = false;
+
    public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
       this.factory = factory;
       this.server = server;
@@ -189,7 +195,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       }
 
       for (OpenWireConnection c : this.connections) {
-         ConnectionControl control = newConnectionControl(c.isRebalance());
+         ConnectionControl control = newConnectionControl();
          c.updateClient(control);
       }
    }
@@ -422,13 +428,13 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       return brokerName;
    }
 
-   protected ConnectionControl newConnectionControl(boolean rebalance) {
+   protected ConnectionControl newConnectionControl() {
       ConnectionControl control = new ConnectionControl();
 
-      String uri = generateMembersURI(rebalance);
+      String uri = generateMembersURI(rebalanceClusterClients);
       control.setConnectedBrokers(uri);
 
-      control.setRebalanceConnection(rebalance);
+      control.setRebalanceConnection(rebalanceClusterClients);
       return control;
    }
 
@@ -814,4 +820,28 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       brokerInfo.setPeerBrokerInfos(null);
       connection.dispatchAsync(brokerInfo);
    }
+
+   public void setRebalanceClusterClients(boolean rebalance) {
+      this.rebalanceClusterClients = rebalance;
+   }
+
+   public boolean isRebalanceClusterClients() {
+      return this.rebalanceClusterClients;
+   }
+
+   public void setUpdateClusterClients(boolean updateClusterClients) {
+      this.updateClusterClients = updateClusterClients;
+   }
+
+   public boolean isUpdateClusterClients() {
+      return this.updateClusterClients;
+   }
+
+   public  void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
+      this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
+   }
+
+   public boolean isUpdateClusterClientsOnRemove() {
+      return this.updateClusterClientsOnRemove;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d427a78b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
index be9cf06..5c8d3b6 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
@@ -19,7 +19,9 @@ package org.apache.activemq.broker.artemiswrapper;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
@@ -31,6 +33,8 @@ import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
 import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.artemis.utils.uri.URISchema;
+import org.apache.activemq.artemis.utils.uri.URISupport;
 import org.apache.activemq.broker.BrokerService;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -88,7 +92,7 @@ public class OpenwireArtemisBaseTest {
    public String CLUSTER_PASSWORD = "OPENWIRECLUSTER";
 
    protected Configuration createConfig(final int serverID) throws Exception {
-      return createConfig("localhost", serverID);
+      return createConfig("localhost", serverID, Collections.EMPTY_MAP);
    }
 
    protected Configuration createConfig(final String hostAddress, final int serverID, final int port) throws Exception {
@@ -111,6 +115,10 @@ public class OpenwireArtemisBaseTest {
    }
 
    protected Configuration createConfig(final String hostAddress, final int serverID) throws Exception {
+      return createConfig(hostAddress, serverID, Collections.EMPTY_MAP);
+   }
+
+   protected Configuration createConfig(final String hostAddress, final int serverID, Map<String, String> params) throws Exception {
       ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false).
               setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(1000 * 1024).setJournalType(JournalType.NIO).
               setJournalDirectory(getJournalDir(serverID, false)).
@@ -123,7 +131,7 @@ public class OpenwireArtemisBaseTest {
 
       configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true));
 
-      configuration.addAcceptorConfiguration("netty", newURI(hostAddress, serverID));
+      configuration.addAcceptorConfiguration("netty", newURI(hostAddress, serverID) + "?" + URISupport.createQueryString(params));
       configuration.addConnectorConfiguration("netty-connector", newURI(hostAddress, serverID));
 
       return configuration;
@@ -171,8 +179,12 @@ public class OpenwireArtemisBaseTest {
       return "tcp://" + localhostAddress + ":" + (61616 + serverID);
    }
 
-   protected static String newURIwithPort(String localhostAddress, int port) {
-      return "tcp://" + localhostAddress + ":" + port;
+   protected static String newURIwithPort(String localhostAddress, int port) throws Exception {
+      return newURIwithPort(localhostAddress, port, Collections.EMPTY_MAP);
+   }
+
+   protected static String newURIwithPort(String localhostAddress, int port, Map<String, String> params) throws Exception {
+      return "tcp://" + localhostAddress + ":" + port + "?" + URISupport.createQueryString(params);
    }
 
    public static JMSServerControl createJMSServerControl(final MBeanServer mbeanServer) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d427a78b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
index bf43caa..74fa6aa 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
@@ -22,8 +22,10 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -50,8 +52,13 @@ public class FailoverClusterTest extends OpenwireArtemisBaseTest {
 
    @Before
    public void setUp() throws Exception {
-      Configuration config1 = createConfig(1);
-      Configuration config2 = createConfig(2);
+      Map<String, String> params = new HashMap<String, String>();
+
+      params.put("rebalanceClusterClients", "true");
+      params.put("updateClusterClients", "true");
+
+      Configuration config1 = createConfig("localhost", 1, params);
+      Configuration config2 = createConfig("localhost", 2, params);
 
       deployClusterConfiguration(config1, 2);
       deployClusterConfiguration(config2, 1);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d427a78b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
index 1d902e3..fd9ce1f 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
@@ -65,9 +65,15 @@ public class FailoverComplexClusterTest extends OpenwireArtemisBaseTest {
 
    //default setup for most tests
    private void commonSetup() throws Exception {
-      Configuration config0 = createConfig(0);
-      Configuration config1 = createConfig(1);
-      Configuration config2 = createConfig(2);
+      Map<String, String> params = new HashMap<String, String>();
+
+      params.put("rebalanceClusterClients", "true");
+      params.put("updateClusterClients", "true");
+      params.put("updateClusterClientsOnRemove", "true");
+
+      Configuration config0 = createConfig("localhost", 0, params);
+      Configuration config1 = createConfig("localhost", 1, params);
+      Configuration config2 = createConfig("localhost", 2, params);
 
       deployClusterConfiguration(config0, 1, 2);
       deployClusterConfiguration(config1, 0, 2);
@@ -248,9 +254,9 @@ public class FailoverComplexClusterTest extends OpenwireArtemisBaseTest {
    @Test
    public void testFailOverWithUpdateClientsOnRemove() throws Exception {
       // Broker A
-      Configuration config0 = createConfig(0, "?rebalance-cluster-client=true&update-cluster-clients=true&update-cluster-clients-on-remove=true");
+      Configuration config0 = createConfig(0, "?rebalanceClusterClients=true&updateClusterClients=true&updateClusterClientsOnRemove=true");
       // Broker B
-      Configuration config1 = createConfig(1, "?rebalance-cluster-client=true&update-cluster-clients=true&update-cluster-clients-on-remove=true");
+      Configuration config1 = createConfig(1, "?rebalanceClusterClients=true&updateClusterClients=true&updateClusterClientsOnRemove=true");
 
       deployClusterConfiguration(config0, 1);
       deployClusterConfiguration(config1, 0);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d427a78b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
index 6e559e7..6f4b27e 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.failover;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.ActiveMQConnection;
@@ -49,11 +50,16 @@ public class FailoverPriorityTest extends OpenwireArtemisBaseTest {
    private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>();
    private EmbeddedJMS[] servers = new EmbeddedJMS[3];
    private String clientUrl;
+   private Map<String, String> params = new HashMap<String, String>();
 
    @Before
    public void setUp() throws Exception {
       urls.put(0, BROKER_A_CLIENT_TC_ADDRESS);
       urls.put(1, BROKER_B_CLIENT_TC_ADDRESS);
+      params.clear();
+      params.put("rebalanceClusterClients", "true");
+      params.put("updateClusterClients", "true");
+      params.put("updateClusterClientsOnRemove", "true");
    }
 
    @After
@@ -136,7 +142,7 @@ public class FailoverPriorityTest extends OpenwireArtemisBaseTest {
 
    @Test
    public void testThreeBrokers() throws Exception {
-      commonSetup();
+      setupThreeBrokers();
       Thread.sleep(1000);
 
       setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + "," + BROKER_C_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false");
@@ -262,11 +268,15 @@ public class FailoverPriorityTest extends OpenwireArtemisBaseTest {
       }
    }
 
-   //default setup for most tests
-   private void commonSetup() throws Exception {
-      Configuration config0 = createConfig("127.0.0.1", 0);
-      Configuration config1 = createConfig("127.0.0.1", 1);
-      Configuration config2 = createConfig("127.0.0.1", 2);
+   private void setupThreeBrokers() throws Exception {
+
+      params.put("rebalanceClusterClients", "false");
+      params.put("updateClusterClients", "false");
+      params.put("updateClusterClientsOnRemove", "false");
+
+      Configuration config0 = createConfig("127.0.0.1", 0, params);
+      Configuration config1 = createConfig("127.0.0.1", 1, params);
+      Configuration config2 = createConfig("127.0.0.1", 2, params);
 
       deployClusterConfiguration(config0, 1, 2);
       deployClusterConfiguration(config1, 0, 2);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d427a78b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
index 002a788..0a127dd 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
@@ -18,6 +18,8 @@ package org.apache.activemq.transport.failover;
 
 import java.io.File;
 import java.io.FileOutputStream;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
@@ -117,7 +119,9 @@ public class FailoverUpdateURIsTest extends OpenwireArtemisBaseTest {
 
    @Test
    public void testAutoUpdateURIs() throws Exception {
-      Configuration config0 = createConfig(0);
+      Map<String, String> params = new HashMap<String, String>();
+      params.put("updateClusterClients", "true");
+      Configuration config0 = createConfig("localhost", 0, params);
       deployClusterConfiguration(config0, 10);
       server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
       server0.start();