You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/04/03 20:22:58 UTC
[2/3] activemq-artemis git commit: ARTEMIS-1779 Small refactoring to
logic on BridgeImpl::nodeUp logic
ARTEMIS-1779 Small refactoring to logic on BridgeImpl::nodeUp logic
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f3e1ab33
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f3e1ab33
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f3e1ab33
Branch: refs/heads/master
Commit: f3e1ab337c954c1b73d94f90baad45416f5d9d42
Parents: 262990f
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Apr 3 08:58:39 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Apr 3 16:22:13 2018 -0400
----------------------------------------------------------------------
.../core/server/cluster/impl/BridgeImpl.java | 45 ++++++++++----------
.../cluster/impl/ClusterConnectionBridge.java | 15 +++++++
2 files changed, 38 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f3e1ab33/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 16724bf..5e0eb17 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -93,7 +93,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
private final SimpleString name;
- private final Queue queue;
+ protected final Queue queue;
private final Filter filter;
@@ -1010,6 +1010,27 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
scheduleRetryConnectFixedTimeout(timeout);
}
+
+ // To be called by the topology update
+ // This logic will be updated on the cluster connection
+ protected void nodeUP(TopologyMember member, boolean last) {
+ ClientSessionInternal sessionToUse = session;
+ RemotingConnection connectionToUse = sessionToUse != null ? sessionToUse.getConnection() : null;
+
+ if (member != null && this.targetNodeID != null && this.targetNodeID.equals(member.getNodeId())) {
+ // this could be an update of the topology say after a backup started
+ BridgeImpl.this.targetNode = member;
+ } else {
+ // we don't need synchronization here, but we need to make sure we won't get a NPE on races
+ if (connectionToUse != null && member.isMember(connectionToUse)) {
+ this.targetNode = member;
+ this.targetNodeID = member.getNodeId();
+ }
+ }
+
+ }
+
+
// Inner classes -------------------------------------------------
protected void scheduleRetryConnectFixedTimeout(final long milliseconds) {
@@ -1159,27 +1180,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
// ClusterListener
@Override
public void nodeUP(TopologyMember member, boolean last) {
- if (BridgeImpl.this.queue.isInternalQueue() && member != null && BridgeImpl.this.targetNodeID != null && !BridgeImpl.this.targetNodeID.equals(member.getNodeId())) {
- //A ClusterConnectionBridge (identified by holding an internal queue)
- //never re-connects to another node here. It only connects to its original
- //target node (from the ClusterConnection) or its backups. That's why
- //we put a return here.
- return;
- }
- ClientSessionInternal sessionToUse = session;
- RemotingConnection connectionToUse = sessionToUse != null ? sessionToUse.getConnection() : null;
-
- if (member != null && BridgeImpl.this.targetNodeID != null && BridgeImpl.this.targetNodeID.equals(member.getNodeId())) {
- // this could be an update of the topology say after a backup started
- BridgeImpl.this.targetNode = member;
- } else {
- // we don't need synchronization here, but we need to make sure we won't get a NPE on races
- if (connectionToUse != null && member.isMember(connectionToUse)) {
- BridgeImpl.this.targetNode = member;
- BridgeImpl.this.targetNodeID = member.getNodeId();
- }
- }
-
+ BridgeImpl.this.nodeUP(member, last);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f3e1ab33/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index 88005aa..1a642fe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
@@ -352,6 +353,20 @@ public class ClusterConnectionBridge extends BridgeImpl {
return filterString;
}
+
+ @Override
+ protected void nodeUP(TopologyMember member, boolean last) {
+ if (member != null && targetNodeID != null && !this.targetNodeID.equals(member.getNodeId())) {
+ //A ClusterConnectionBridge (identified by holding an internal queue)
+ //never re-connects to another node here. It only connects to its original
+ //target node (from the ClusterConnection) or its backups. That's why
+ //we put a return here.
+ return;
+ }
+ super.nodeUP(member, last);
+ }
+
+
@Override
protected void afterConnect() throws Exception {
super.afterConnect();