You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2013/06/14 18:22:05 UTC

git commit: Allow GPFS to reconnect to the internal IP. Patch by Chris Burroughs, reviewed by brandonwilliams for CASSANDRA-5630

Updated Branches:
  refs/heads/cassandra-1.2 30b093a71 -> 7eec23eb2


Allow GPFS to reconnect to the internal IP.
Patch by Chris Burroughs, reviewed by brandonwilliams for CASSANDRA-5630


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7eec23eb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7eec23eb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7eec23eb

Branch: refs/heads/cassandra-1.2
Commit: 7eec23eb27d1f6a75f55a0aeb89aaccde3a6bda0
Parents: 30b093a
Author: Brandon Williams <br...@apache.org>
Authored: Fri Jun 14 11:19:56 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Jun 14 11:21:52 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra-rackdc.properties                |  3 +
 .../locator/GossipingPropertyFileSnitch.java    | 72 +++++++++++++++++++-
 3 files changed, 74 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eec23eb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a063d69..a1fc8ce 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -22,6 +22,7 @@
  * Evaluate now() function at execution time (CASSANDRA-5616)
  * Expose detailed read repair metrics (CASSANDRA-5618)
  * Correct blob literal + ReversedType parsing (CASSANDRA-5629)
+ * Allow GPFS to prefer the internal IP like EC2MRS (CASSANDRA-5630)
 Merged from 1.1:
  * Remove buggy thrift max message length option (CASSANDRA-5529)
  * Fix NPE in Pig's widerow mode (CASSANDRA-5488)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eec23eb/conf/cassandra-rackdc.properties
----------------------------------------------------------------------
diff --git a/conf/cassandra-rackdc.properties b/conf/cassandra-rackdc.properties
index be2e7d2..f0a0d55 100644
--- a/conf/cassandra-rackdc.properties
+++ b/conf/cassandra-rackdc.properties
@@ -22,3 +22,6 @@ rack=RAC1
 # Add a suffix to a datacenter name. Used by the Ec2Snitch and Ec2MultiRegionSnitch
 # to append a string to the EC2 region name.
 #dc_suffix=
+
+# Uncomment the following line to make this snitch prefer the internal ip when possible, as the Ec2MultiRegionSnitch does.
+# prefer_local=true

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eec23eb/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
index e08a27e..071cd09 100644
--- a/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
+++ b/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.locator;
 
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.Map;
 
 import org.apache.cassandra.db.SystemTable;
@@ -29,9 +30,14 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.service.StorageService;
 
-public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch
+
+public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch implements IEndpointStateChangeSubscriber
 {
     private static final Logger logger = LoggerFactory.getLogger(GossipingPropertyFileSnitch.class);
 
@@ -41,6 +47,7 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch
     private Map<InetAddress, Map<String, String>> savedEndpoints;
     private String DEFAULT_DC = "UNKNOWN_DC";
     private String DEFAULT_RACK = "UNKNOWN_RACK";
+    private boolean preferLocal;
 
     public GossipingPropertyFileSnitch() throws ConfigurationException
     {
@@ -51,7 +58,7 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch
 
         myDC = myDC.trim();
         myRack = myRack.trim();
-
+        preferLocal = Boolean.parseBoolean(SnitchProperties.get("prefer_local", "false"));
         try
         {
             psnitch = new PropertyFileSnitch();
@@ -118,4 +125,65 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch
         }
         return epState.getApplicationState(ApplicationState.RACK).value;
     }
+
+    // IEndpointStateChangeSubscriber methods
+
+    public void onJoin(InetAddress endpoint, EndpointState epState)
+    {
+        if (preferLocal && epState.getApplicationState(ApplicationState.INTERNAL_IP) != null)
+            reConnect(endpoint, epState.getApplicationState(ApplicationState.INTERNAL_IP));
+    }
+
+    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
+    {
+        if (preferLocal && state == ApplicationState.INTERNAL_IP)
+            reConnect(endpoint, value);
+    }
+
+    public void onAlive(InetAddress endpoint, EndpointState state)
+    {
+        if (preferLocal && state.getApplicationState(ApplicationState.INTERNAL_IP) != null)
+            reConnect(endpoint, state.getApplicationState(ApplicationState.INTERNAL_IP));
+    }
+
+    public void onDead(InetAddress endpoint, EndpointState state)
+    {
+        // do nothing
+    }
+
+    public void onRestart(InetAddress endpoint, EndpointState state)
+    {
+        // do nothing
+    }
+
+    public void onRemove(InetAddress endpoint)
+    {
+        // do nothing.
+    }
+
+    private void reConnect(InetAddress endpoint, VersionedValue versionedValue)
+    {
+        if (!getDatacenter(endpoint).equals(myDC))
+            return; // do nothing return back...
+
+        try
+        {
+            InetAddress remoteIP = InetAddress.getByName(versionedValue.value);
+            MessagingService.instance().getConnectionPool(endpoint).reset(remoteIP);
+            logger.debug(String.format("Intiated reconnect to an Internal IP %s for the endpoint %s", remoteIP, endpoint));
+        }
+        catch (UnknownHostException e)
+        {
+            logger.error("Error in getting the IP address resolved", e);
+        }
+    }
+
+    @Override
+    public void gossiperStarting()
+    {
+        super.gossiperStarting();
+        Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP,
+                                                   StorageService.instance.valueFactory.internalIP(FBUtilities.getLocalAddress().getHostAddress()));
+        Gossiper.instance.register(this);
+    }
 }