You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/04/11 18:37:51 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1790 Improve Topology Member Finding

ARTEMIS-1790 Improve Topology Member Finding

When finding out if a connector belong to a target node it compares
the whole parameter map which is not necessary. Also in understanding
the connector the best place is to delegate it to the corresponding
remoting connection who understands it. (e.g. INVMConnection knows
whether the connector belongs to a target node by checking it's
serverID only. The netty ones only need to match host and port, and
understanding that localhost and 127.0.0.1 are same thing).


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6818762d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6818762d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6818762d

Branch: refs/heads/master
Commit: 6818762da8eff86447c4edfe4896661b790d506c
Parents: 3384d67
Author: Howard Gao <ho...@gmail.com>
Authored: Wed Apr 11 10:52:52 2018 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Apr 11 14:37:44 2018 -0400

----------------------------------------------------------------------
 .../artemis/core/client/impl/Topology.java      |  5 +-
 .../core/client/impl/TopologyMemberImpl.java    | 12 ++--
 .../remoting/impl/netty/NettyConnection.java    | 39 +++++++++++++
 .../spi/core/protocol/RemotingConnection.java   |  4 ++
 .../artemis/spi/core/remoting/Connection.java   |  4 ++
 .../core/remoting/impl/invm/InVMConnection.java | 17 ++++++
 .../core/server/impl/LiveOnlyActivation.java    |  5 +-
 .../core/server/impl/ScaleDownHandler.java      |  2 +-
 .../remoting/impl/invm/InVMConnectionTest.java  | 58 ++++++++++++++++++++
 .../impl/netty/NettyConnectionTest.java         | 57 +++++++++++++++++++
 10 files changed, 194 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6818762d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java
index 966167f..5db3e6b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Executor;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Connector;
 import org.jboss.logging.Logger;
 
@@ -372,9 +373,9 @@ public final class Topology {
       return topology.get(nodeID);
    }
 
-   public synchronized TopologyMemberImpl getMember(final TransportConfiguration configuration) {
+   public synchronized TopologyMemberImpl getMember(final RemotingConnection rc) {
       for (TopologyMemberImpl member : topology.values()) {
-         if (member.isMember(configuration)) {
+         if (member.isMember(rc)) {
             return member;
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6818762d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
index c82a939..3cbabac 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
@@ -105,12 +105,16 @@ public final class TopologyMemberImpl implements TopologyMember {
       return connector;
    }
 
+   /**
+    * We only need to check if the connection point to the same node,
+    * don't need to compare the whole params map.
+    * @param connection The connection to the target node
+    * @return true if the connection point to the same node
+    * as this member represents.
+    */
    @Override
    public boolean isMember(RemotingConnection connection) {
-      TransportConfiguration connectorConfig = connection.getTransportConnection() != null ? connection.getTransportConnection().getConnectorConfig() : null;
-
-      return isMember(connectorConfig);
-
+      return connection.isSameTarget(getConnector().getA(), getConnector().getB());
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6818762d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index de3a1ee..3f400dc 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -512,6 +512,45 @@ public class NettyConnection implements Connection {
    }
 
    @Override
+   public boolean isSameTarget(TransportConfiguration... configs) {
+      boolean result = false;
+      for (TransportConfiguration cfg : configs) {
+         if (cfg == null) {
+            continue;
+         }
+         if (NettyConnectorFactory.class.getName().equals(cfg.getFactoryClassName())) {
+            if (configuration.get(TransportConstants.PORT_PROP_NAME).equals(cfg.getParams().get(TransportConstants.PORT_PROP_NAME))) {
+               //port same, check host
+               Object hostParam = configuration.get(TransportConstants.HOST_PROP_NAME);
+               if (hostParam != null) {
+                  if (hostParam.equals(cfg.getParams().get(TransportConstants.HOST_PROP_NAME))) {
+                     result = true;
+                     break;
+                  } else {
+                     //check special 'localhost' case
+                     if (isLocalhost((String) configuration.get(TransportConstants.HOST_PROP_NAME)) && isLocalhost((String) cfg.getParams().get(TransportConstants.HOST_PROP_NAME))) {
+                        result = true;
+                        break;
+                     }
+                  }
+               } else if (cfg.getParams().get(TransportConstants.HOST_PROP_NAME) == null) {
+                  result = true;
+                  break;
+               }
+            }
+         }
+      }
+      return result;
+   }
+
+   //here we consider 'localhost' is equivalent to '127.0.0.1'
+   //other values of 127.0.0.x is not and the user makes sure
+   //not to mix use of 'localhost' and '127.0.0.x'
+   private boolean isLocalhost(String hostname) {
+      return "127.0.0.1".equals(hostname) || "localhost".equals(hostname);
+   }
+
+   @Override
    public final String toString() {
       return super.toString() + "[ID=" + getID() + ", local= " + channel.localAddress() + ", remote=" + channel.remoteAddress() + "]";
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6818762d/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
index a906a32..f7ed73a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
@@ -21,6 +21,7 @@ import java.util.List;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
@@ -244,4 +245,7 @@ public interface RemotingConnection extends BufferHandler {
     */
    String getTransportLocalAddress();
 
+   default boolean isSameTarget(TransportConfiguration... configs) {
+      return getTransportConnection().isSameTarget(configs);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6818762d/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
index 63dbcfb..46e1534 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
@@ -149,4 +149,8 @@ public interface Connection {
     * @return
     */
    boolean isUsingProtocolHandling();
+
+   //returns true if one of the configs points to the same
+   //node as this connection does.
+   boolean isSameTarget(TransportConfiguration... configs);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6818762d/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
index 889493e..46f7fea 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
@@ -280,4 +280,21 @@ public class InVMConnection implements Connection {
       return "InVMConnection [serverID=" + serverID + ", id=" + id + "]";
    }
 
+   @Override
+   public boolean isSameTarget(TransportConfiguration... configs) {
+      boolean result = false;
+      for (TransportConfiguration cfg : configs) {
+         if (cfg == null) {
+            continue;
+         }
+         if (InVMConnectorFactory.class.getName().equals(cfg.getFactoryClassName())) {
+            //factory same, get id
+            if (serverID == (int) cfg.getParams().get(TransportConstants.SERVER_ID_PROP_NAME)) {
+               result = true;
+               break;
+            }
+         }
+      }
+      return result;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6818762d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
index b4bd024..c52dcf6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
@@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
 import org.apache.activemq.artemis.core.server.cluster.ha.LiveOnlyPolicy;
 import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.jboss.logging.Logger;
 
 public class LiveOnlyActivation extends Activation {
@@ -109,8 +110,8 @@ public class LiveOnlyActivation extends Activation {
          connectToScaleDownTarget(liveOnlyPolicy.getScaleDownPolicy());
       }
 
-      TransportConfiguration tc = scaleDownClientSessionFactory == null ? null : scaleDownClientSessionFactory.getConnectorConfiguration();
-      String nodeID = tc == null ? null : scaleDownClientSessionFactory.getServerLocator().getTopology().getMember(tc).getNodeId();
+      RemotingConnection rc = scaleDownClientSessionFactory == null ? null : scaleDownClientSessionFactory.getConnection();
+      String nodeID = rc == null ? null : scaleDownClientSessionFactory.getServerLocator().getTopology().getMember(rc).getNodeId();
       if (remotingService != null) {
          remotingService.freeze(nodeID, null);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6818762d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
index 49117f3..a1c2229 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
@@ -307,7 +307,7 @@ public class ScaleDownHandler {
    }
 
    private String getTargetNodeId(ClientSessionFactory sessionFactory) {
-      return sessionFactory.getServerLocator().getTopology().getMember(sessionFactory.getConnectorConfiguration()).getNodeId();
+      return sessionFactory.getServerLocator().getTopology().getMember(sessionFactory.getConnection()).getNodeId();
    }
 
    public void scaleDownTransactions(ClientSessionFactory sessionFactory,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6818762d/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/invm/InVMConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/invm/InVMConnectionTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/invm/InVMConnectionTest.java
new file mode 100644
index 0000000..e19b915
--- /dev/null
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/invm/InVMConnectionTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.unit.core.remoting.impl.invm;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnection;
+import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
+import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class InVMConnectionTest {
+
+   @Test
+   public void testIsTargetNode() throws Exception {
+
+      int serverID = 0;
+      InVMConnection conn = new InVMConnection(serverID, null, null, null);
+
+      Map<String, Object> config0 = new HashMap<>();
+      config0.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
+      TransportConfiguration tf0 = new TransportConfiguration(InVMConnectorFactory.class.getName(), config0, "tf0");
+
+      Map<String, Object> config1 = new HashMap<>();
+      config1.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      TransportConfiguration tf1 = new TransportConfiguration(InVMConnectorFactory.class.getName(), config1, "tf1");
+
+      Map<String, Object> config2 = new HashMap<>();
+      config2.put(TransportConstants.SERVER_ID_PROP_NAME, 2);
+      TransportConfiguration tf2 = new TransportConfiguration(InVMConnectorFactory.class.getName(), config2, "tf2");
+
+      assertTrue(conn.isSameTarget(tf0));
+      assertFalse(conn.isSameTarget(tf1));
+      assertFalse(conn.isSameTarget(tf2));
+      assertTrue(conn.isSameTarget(tf0, tf1));
+      assertTrue(conn.isSameTarget(tf2, tf0));
+      assertFalse(conn.isSameTarget(tf2, tf1));
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6818762d/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
index 05ae1f6..c9c975c 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -27,7 +28,9 @@ import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
 import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
@@ -84,6 +87,60 @@ public class NettyConnectionTest extends ActiveMQTestBase {
       conn.blockUntilWritable(0, 0, TimeUnit.NANOSECONDS);
    }
 
+   @Test
+   public void testIsTargetNode() throws Exception {
+      Map<String, Object> config = new HashMap<>();
+      config.put("host", "localhost");
+      config.put("port", "1234");
+
+      Map<String, Object> config1 = new HashMap<>();
+      config1.put("host", "localhost");
+      config1.put("port", "1234");
+      TransportConfiguration tf1 = new TransportConfiguration(NettyConnectorFactory.class.getName(), config1, "tf1");
+
+      Map<String, Object> config2 = new HashMap<>();
+      config2.put("host", "127.0.0.1");
+      config2.put("port", "1234");
+      TransportConfiguration tf2 = new TransportConfiguration(NettyConnectorFactory.class.getName(), config2, "tf2");
+
+      Map<String, Object> config3 = new HashMap<>();
+      config3.put("host", "otherhost");
+      config3.put("port", "1234");
+      TransportConfiguration tf3 = new TransportConfiguration(NettyConnectorFactory.class.getName(), config3, "tf3");
+
+      Map<String, Object> config4 = new HashMap<>();
+      config4.put("host", "127.0.0.1");
+      config4.put("port", "9999");
+      TransportConfiguration tf4 = new TransportConfiguration(NettyConnectorFactory.class.getName(), config4, "tf4");
+
+      Map<String, Object> config5 = new HashMap<>();
+      config5.put("host", "127.0.0.2");
+      config5.put("port", "1234");
+      TransportConfiguration tf5 = new TransportConfiguration(NettyConnectorFactory.class.getName(), config5, "tf5");
+
+      Map<String, Object> config6 = new HashMap<>();
+      config6.put("host", "127.0.0.2");
+      config6.put("port", "1234");
+      TransportConfiguration tf6 = new TransportConfiguration("some.other.FactoryClass", config6, "tf6");
+
+      Channel channel = createChannel();
+      NettyConnection conn = new NettyConnection(config, channel, new MyListener(), false, false);
+
+      assertTrue(conn.isSameTarget(tf1));
+      assertTrue(conn.isSameTarget(tf2));
+      assertTrue(conn.isSameTarget(tf1, tf2));
+      assertFalse(conn.isSameTarget(tf3));
+      assertTrue(conn.isSameTarget(tf3, tf1));
+      assertTrue(conn.isSameTarget(tf3, tf2));
+      assertTrue(conn.isSameTarget(tf1, tf3));
+      assertFalse(conn.isSameTarget(tf4));
+      assertFalse(conn.isSameTarget(tf5));
+      assertFalse(conn.isSameTarget(tf4, tf5));
+      assertFalse(conn.isSameTarget(tf6));
+      assertTrue(conn.isSameTarget(tf1, tf6));
+      assertTrue(conn.isSameTarget(tf6, tf2));
+   }
+
    private static EmbeddedChannel createChannel() {
       return new EmbeddedChannel(new ChannelInboundHandlerAdapter());
    }