You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/21 17:40:39 UTC
[1/4] git commit: use REQUEST_RESPONSE to determine whether session
is local to prevent false positives in RMVH forwarding case patch by jbellis;
reviewed by slebresne for CASSANDRA-5668
Updated Branches:
refs/heads/cassandra-1.2 fbe8a6eb2 -> e75e33fa6
refs/heads/trunk 140b0311d -> 36aae612a
use REQUEST_RESPONSE to determine whether session is local to prevent false positives in RMVH forwarding case
patch by jbellis; reviewed by slebresne for CASSANDRA-5668
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/18f3a79f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/18f3a79f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/18f3a79f
Branch: refs/heads/cassandra-1.2
Commit: 18f3a79fdd2414698817aa4c35d0f9b44d5a7e55
Parents: fbe8a6e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Jun 21 10:39:27 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Jun 21 10:39:27 2013 -0500
----------------------------------------------------------------------
.../org/apache/cassandra/net/OutboundTcpConnection.java | 3 ++-
.../org/apache/cassandra/tracing/ExpiredTraceState.java | 2 +-
src/java/org/apache/cassandra/tracing/TraceState.java | 4 +---
src/java/org/apache/cassandra/tracing/Tracing.java | 9 ++++-----
4 files changed, 8 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/18f3a79f/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index ee30d36..7077922 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -188,7 +188,8 @@ public class OutboundTcpConnection extends Thread
else
{
state.trace(message);
- Tracing.instance().stopIfNonLocal(state);
+ if (qm.message.verb == MessagingService.Verb.REQUEST_RESPONSE)
+ Tracing.instance().stopNonLocal(state);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/18f3a79f/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
index 6b4f90b..37a013b 100644
--- a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
+++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
@@ -29,7 +29,7 @@ public class ExpiredTraceState extends TraceState
{
public ExpiredTraceState(UUID sessionId)
{
- super(FBUtilities.getBroadcastAddress(), sessionId, true);
+ super(FBUtilities.getBroadcastAddress(), sessionId);
}
public int elapsed()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/18f3a79f/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index 25599c4..4d52f8f 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -45,16 +45,14 @@ public class TraceState
public final InetAddress coordinator;
public final Stopwatch watch;
public final ByteBuffer sessionIdBytes;
- public final boolean isLocallyOwned;
- public TraceState(InetAddress coordinator, UUID sessionId, boolean locallyOwned)
+ public TraceState(InetAddress coordinator, UUID sessionId)
{
assert coordinator != null;
assert sessionId != null;
this.coordinator = coordinator;
this.sessionId = sessionId;
- this.isLocallyOwned = locallyOwned;
sessionIdBytes = ByteBufferUtil.bytes(sessionId);
watch = new Stopwatch();
watch.start();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/18f3a79f/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
index eb5bad9..c692436 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -144,17 +144,16 @@ public class Tracing
{
assert state.get() == null;
- TraceState ts = new TraceState(localAddress, sessionId, true);
+ TraceState ts = new TraceState(localAddress, sessionId);
state.set(ts);
sessions.put(sessionId, ts);
return sessionId;
}
- public void stopIfNonLocal(TraceState state)
+ public void stopNonLocal(TraceState state)
{
- if (!state.isLocallyOwned)
- sessions.remove(state.sessionId);
+ sessions.remove(state.sessionId);
}
/**
@@ -254,7 +253,7 @@ public class Tracing
}
else
{
- ts = new TraceState(message.from, sessionId, false);
+ ts = new TraceState(message.from, sessionId);
sessions.put(sessionId, ts);
return ts;
}
[3/4] git commit: Merge branch 'cassandra-1.2' into trunk
Posted by jb...@apache.org.
Merge branch 'cassandra-1.2' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/36aae612
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/36aae612
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/36aae612
Branch: refs/heads/trunk
Commit: 36aae612a0b248c2650ef979b9230e50410b7415
Parents: 140b031 18f3a79
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Jun 21 10:39:36 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Jun 21 10:39:36 2013 -0500
----------------------------------------------------------------------
.../org/apache/cassandra/net/OutboundTcpConnection.java | 3 ++-
.../org/apache/cassandra/tracing/ExpiredTraceState.java | 2 +-
src/java/org/apache/cassandra/tracing/TraceState.java | 4 +---
src/java/org/apache/cassandra/tracing/Tracing.java | 9 ++++-----
4 files changed, 8 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36aae612/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36aae612/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36aae612/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
[2/4] git commit: use REQUEST_RESPONSE to determine whether session
is local to prevent false positives in RMVH forwarding case patch by jbellis;
reviewed by slebresne for CASSANDRA-5668
Posted by jb...@apache.org.
use REQUEST_RESPONSE to determine whether session is local to prevent false positives in RMVH forwarding case
patch by jbellis; reviewed by slebresne for CASSANDRA-5668
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/18f3a79f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/18f3a79f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/18f3a79f
Branch: refs/heads/trunk
Commit: 18f3a79fdd2414698817aa4c35d0f9b44d5a7e55
Parents: fbe8a6e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Jun 21 10:39:27 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Jun 21 10:39:27 2013 -0500
----------------------------------------------------------------------
.../org/apache/cassandra/net/OutboundTcpConnection.java | 3 ++-
.../org/apache/cassandra/tracing/ExpiredTraceState.java | 2 +-
src/java/org/apache/cassandra/tracing/TraceState.java | 4 +---
src/java/org/apache/cassandra/tracing/Tracing.java | 9 ++++-----
4 files changed, 8 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/18f3a79f/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index ee30d36..7077922 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -188,7 +188,8 @@ public class OutboundTcpConnection extends Thread
else
{
state.trace(message);
- Tracing.instance().stopIfNonLocal(state);
+ if (qm.message.verb == MessagingService.Verb.REQUEST_RESPONSE)
+ Tracing.instance().stopNonLocal(state);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/18f3a79f/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
index 6b4f90b..37a013b 100644
--- a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
+++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
@@ -29,7 +29,7 @@ public class ExpiredTraceState extends TraceState
{
public ExpiredTraceState(UUID sessionId)
{
- super(FBUtilities.getBroadcastAddress(), sessionId, true);
+ super(FBUtilities.getBroadcastAddress(), sessionId);
}
public int elapsed()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/18f3a79f/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index 25599c4..4d52f8f 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -45,16 +45,14 @@ public class TraceState
public final InetAddress coordinator;
public final Stopwatch watch;
public final ByteBuffer sessionIdBytes;
- public final boolean isLocallyOwned;
- public TraceState(InetAddress coordinator, UUID sessionId, boolean locallyOwned)
+ public TraceState(InetAddress coordinator, UUID sessionId)
{
assert coordinator != null;
assert sessionId != null;
this.coordinator = coordinator;
this.sessionId = sessionId;
- this.isLocallyOwned = locallyOwned;
sessionIdBytes = ByteBufferUtil.bytes(sessionId);
watch = new Stopwatch();
watch.start();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/18f3a79f/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
index eb5bad9..c692436 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -144,17 +144,16 @@ public class Tracing
{
assert state.get() == null;
- TraceState ts = new TraceState(localAddress, sessionId, true);
+ TraceState ts = new TraceState(localAddress, sessionId);
state.set(ts);
sessions.put(sessionId, ts);
return sessionId;
}
- public void stopIfNonLocal(TraceState state)
+ public void stopNonLocal(TraceState state)
{
- if (!state.isLocallyOwned)
- sessions.remove(state.sessionId);
+ sessions.remove(state.sessionId);
}
/**
@@ -254,7 +253,7 @@ public class Tracing
}
else
{
- ts = new TraceState(message.from, sessionId, false);
+ ts = new TraceState(message.from, sessionId);
sessions.put(sessionId, ts);
return ts;
}
[4/4] git commit: refactor reconnecting snitches patch by jasobrown;
reviewed by jbellis for CASSANDRA-5681
Posted by jb...@apache.org.
refactor reconnecting snitches
patch by jasobrown; reviewed by jbellis for CASSANDRA-5681
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e75e33fa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e75e33fa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e75e33fa
Branch: refs/heads/cassandra-1.2
Commit: e75e33fa6dc5e2a3fe061d747cc98679a65ef960
Parents: 18f3a79
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Jun 21 10:40:31 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Jun 21 10:40:31 2013 -0500
----------------------------------------------------------------------
.../cassandra/locator/Ec2MultiRegionSnitch.java | 71 +---------------
.../locator/GossipingPropertyFileSnitch.java | 63 +-------------
.../locator/ReconnectableSnitchHelper.java | 88 ++++++++++++++++++++
3 files changed, 94 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e75e33fa/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java b/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
index 9317941..bd5e091 100644
--- a/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
+++ b/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
@@ -19,16 +19,11 @@ package org.apache.cassandra.locator;
import java.io.IOException;
import java.net.InetAddress;
-import java.net.UnknownHostException;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
-import org.apache.cassandra.gms.VersionedValue;
-import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
/**
@@ -36,16 +31,13 @@ import org.apache.cassandra.service.StorageService;
*
* 2) Snitch will set the private IP as a Gossip application state.
*
- * 3) Snitch implements IESCS and will reset the connection if it is within the
+ * 3) Uses a helper class that implements IESCS and will reset the public IP connection if it is within the
* same region to communicate via private IP.
*
- * Implements Ec2Snitch to inherit its functionality and extend it for
- * Multi-Region.
- *
* Operational: All the nodes in this cluster needs to be able to (modify the
* Security group settings in AWS) communicate via Public IP's.
*/
-public class Ec2MultiRegionSnitch extends Ec2Snitch implements IEndpointStateChangeSubscriber
+public class Ec2MultiRegionSnitch extends Ec2Snitch
{
private static final String PUBLIC_IP_QUERY_URL = "http://169.254.169.254/latest/meta-data/public-ipv4";
private static final String PRIVATE_IP_QUERY_URL = "http://169.254.169.254/latest/meta-data/local-ipv4";
@@ -62,67 +54,10 @@ public class Ec2MultiRegionSnitch extends Ec2Snitch implements IEndpointStateCha
DatabaseDescriptor.setBroadcastAddress(localPublicAddress);
}
- public void onJoin(InetAddress endpoint, EndpointState epState)
- {
- if (epState.getApplicationState(ApplicationState.INTERNAL_IP) != null)
- reconnect(endpoint, epState.getApplicationState(ApplicationState.INTERNAL_IP));
- }
-
- public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
- {
- if (state == ApplicationState.INTERNAL_IP)
- reconnect(endpoint, value);
- }
-
- public void onAlive(InetAddress endpoint, EndpointState state)
- {
- if (state.getApplicationState(ApplicationState.INTERNAL_IP) != null)
- reconnect(endpoint, state.getApplicationState(ApplicationState.INTERNAL_IP));
- }
-
- public void onDead(InetAddress endpoint, EndpointState state)
- {
- // do nothing
- }
-
- public void onRestart(InetAddress endpoint, EndpointState state)
- {
- // do nothing
- }
-
- public void onRemove(InetAddress endpoint)
- {
- // do nothing.
- }
-
- private void reconnect(InetAddress publicAddress, VersionedValue localAddressValue)
- {
- try
- {
- reconnect(publicAddress, InetAddress.getByName(localAddressValue.value));
- }
- catch (UnknownHostException e)
- {
- logger.error("Error in getting the IP address resolved: ", e);
- }
- }
-
- private void reconnect(InetAddress publicAddress, InetAddress localAddress)
- {
- if (getDatacenter(publicAddress).equals(getDatacenter(localPublicAddress))
- && MessagingService.instance().getVersion(publicAddress) == MessagingService.current_version
- && !MessagingService.instance().getConnectionPool(publicAddress).endPoint().equals(localAddress))
- {
- MessagingService.instance().getConnectionPool(publicAddress).reset(localAddress);
- logger.debug(String.format("Intiated reconnect to an Internal IP %s for the %s", localAddress, publicAddress));
- }
- }
-
- @Override
public void gossiperStarting()
{
super.gossiperStarting();
Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP, StorageService.instance.valueFactory.internalIP(localPrivateAddress));
- Gossiper.instance.register(this);
+ Gossiper.instance.register(new ReconnectableSnitchHelper(this, ec2region, true));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e75e33fa/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
index 071cd09..e00239e 100644
--- a/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
+++ b/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.locator;
import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.Map;
import org.apache.cassandra.db.SystemTable;
@@ -30,14 +29,11 @@ import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
-import org.apache.cassandra.gms.VersionedValue;
-import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.service.StorageService;
-public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch implements IEndpointStateChangeSubscriber
+public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch// implements IEndpointStateChangeSubscriber
{
private static final Logger logger = LoggerFactory.getLogger(GossipingPropertyFileSnitch.class);
@@ -47,7 +43,7 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch i
private Map<InetAddress, Map<String, String>> savedEndpoints;
private String DEFAULT_DC = "UNKNOWN_DC";
private String DEFAULT_RACK = "UNKNOWN_RACK";
- private boolean preferLocal;
+ private final boolean preferLocal;
public GossipingPropertyFileSnitch() throws ConfigurationException
{
@@ -126,64 +122,11 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch i
return epState.getApplicationState(ApplicationState.RACK).value;
}
- // IEndpointStateChangeSubscriber methods
-
- public void onJoin(InetAddress endpoint, EndpointState epState)
- {
- if (preferLocal && epState.getApplicationState(ApplicationState.INTERNAL_IP) != null)
- reConnect(endpoint, epState.getApplicationState(ApplicationState.INTERNAL_IP));
- }
-
- public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
- {
- if (preferLocal && state == ApplicationState.INTERNAL_IP)
- reConnect(endpoint, value);
- }
-
- public void onAlive(InetAddress endpoint, EndpointState state)
- {
- if (preferLocal && state.getApplicationState(ApplicationState.INTERNAL_IP) != null)
- reConnect(endpoint, state.getApplicationState(ApplicationState.INTERNAL_IP));
- }
-
- public void onDead(InetAddress endpoint, EndpointState state)
- {
- // do nothing
- }
-
- public void onRestart(InetAddress endpoint, EndpointState state)
- {
- // do nothing
- }
-
- public void onRemove(InetAddress endpoint)
- {
- // do nothing.
- }
-
- private void reConnect(InetAddress endpoint, VersionedValue versionedValue)
- {
- if (!getDatacenter(endpoint).equals(myDC))
- return; // do nothing return back...
-
- try
- {
- InetAddress remoteIP = InetAddress.getByName(versionedValue.value);
- MessagingService.instance().getConnectionPool(endpoint).reset(remoteIP);
- logger.debug(String.format("Intiated reconnect to an Internal IP %s for the endpoint %s", remoteIP, endpoint));
- }
- catch (UnknownHostException e)
- {
- logger.error("Error in getting the IP address resolved", e);
- }
- }
-
- @Override
public void gossiperStarting()
{
super.gossiperStarting();
Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP,
StorageService.instance.valueFactory.internalIP(FBUtilities.getLocalAddress().getHostAddress()));
- Gossiper.instance.register(this);
+ Gossiper.instance.register(new ReconnectableSnitchHelper(this, myDC, preferLocal));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e75e33fa/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
new file mode 100644
index 0000000..adec953
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
@@ -0,0 +1,88 @@
+package org.apache.cassandra.locator;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.net.MessagingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Sidekick helper for snitches that want to reconnect from one IP addr for a node to another.
+ * Typically, this is for situations like EC2 where a node will have a public address and a private address,
+ * where we connect on the public, discover the private, and reconnect on the private.
+ */
+public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber
+{
+ private static final Logger logger = LoggerFactory.getLogger(ReconnectableSnitchHelper.class);
+ private final IEndpointSnitch snitch;
+ private final String localDc;
+ private final boolean preferLocal;
+
+ public ReconnectableSnitchHelper(IEndpointSnitch snitch, String localDc, boolean preferLocal)
+ {
+ this.snitch = snitch;
+ this.localDc = localDc;
+ this.preferLocal = preferLocal;
+ }
+
+ private void reconnect(InetAddress publicAddress, VersionedValue localAddressValue)
+ {
+ try
+ {
+ reconnect(publicAddress, InetAddress.getByName(localAddressValue.value));
+ }
+ catch (UnknownHostException e)
+ {
+ logger.error("Error in getting the IP address resolved: ", e);
+ }
+ }
+
+ private void reconnect(InetAddress publicAddress, InetAddress localAddress)
+ {
+ if (snitch.getDatacenter(publicAddress).equals(localDc)
+ && MessagingService.instance().getVersion(publicAddress) == MessagingService.current_version
+ && !MessagingService.instance().getConnectionPool(publicAddress).endPoint().equals(localAddress))
+ {
+ MessagingService.instance().getConnectionPool(publicAddress).reset(localAddress);
+ logger.debug(String.format("Intiated reconnect to an Internal IP %s for the %s", localAddress, publicAddress));
+ }
+ }
+
+ public void onJoin(InetAddress endpoint, EndpointState epState)
+ {
+ if (preferLocal && epState.getApplicationState(ApplicationState.INTERNAL_IP) != null)
+ reconnect(endpoint, epState.getApplicationState(ApplicationState.INTERNAL_IP));
+ }
+
+ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
+ {
+ if (preferLocal && state == ApplicationState.INTERNAL_IP)
+ reconnect(endpoint, value);
+ }
+
+ public void onAlive(InetAddress endpoint, EndpointState state)
+ {
+ if (preferLocal && state.getApplicationState(ApplicationState.INTERNAL_IP) != null)
+ reconnect(endpoint, state.getApplicationState(ApplicationState.INTERNAL_IP));
+ }
+
+ public void onDead(InetAddress endpoint, EndpointState state)
+ {
+ // do nothing.
+ }
+
+ public void onRemove(InetAddress endpoint)
+ {
+ // do nothing.
+ }
+
+ public void onRestart(InetAddress endpoint, EndpointState state)
+ {
+ // do nothing.
+ }
+}