You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/07/28 22:27:14 UTC

svn commit: r1151990 - in /cassandra/trunk/src/java/org/apache/cassandra: gms/ApplicationState.java gms/VersionedValue.java locator/Ec2MultiRegionSnitch.java net/OutboundTcpConnection.java net/OutboundTcpConnectionPool.java

Author: brandonwilliams
Date: Thu Jul 28 20:27:13 2011
New Revision: 1151990

URL: http://svn.apache.org/viewvc?rev=1151990&view=rev
Log:
Ec2 snitch with support for multiple regions.
Patch by Vijay Parthasarathy, reviewed by brandonwilliams for
CASSANDRA-2452

Added:
    cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java   (with props)
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
    cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
    cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java?rev=1151990&r1=1151989&r2=1151990&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java Thu Jul 28 20:27:13 2011
@@ -29,6 +29,7 @@ public enum ApplicationState
     DC,
     RACK,
     RELEASE_VERSION,
+    INTERNAL_IP,
     // pad to allow adding new states to existing cluster
     X1,
     X2,

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1151990&r1=1151989&r2=1151990&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java Thu Jul 28 20:27:13 2011
@@ -159,6 +159,11 @@ public class VersionedValue implements C
         {
             return new VersionedValue(FBUtilities.getReleaseVersionString());
         }
+
+        public VersionedValue internalIP(String private_ip)
+        {
+            return new VersionedValue(private_ip);
+        }
     }
 
     private static class VersionedValueSerializer implements ICompactSerializer<VersionedValue>

Added: cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java?rev=1151990&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java Thu Jul 28 20:27:13 2011
@@ -0,0 +1,103 @@
+package org.apache.cassandra.locator;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+
+/**
+ * 1) Snitch will automatically set the public IP by querying the AWS API
+ * 
+ * 2) Snitch will set the private IP as a Gossip application state.
+ * 
+ * 3) Snitch implements IESCS and will reset the connection if it is within the
+ * same region to communicate via private IP.
+ * 
+ * Implements Ec2Snitch to inherit its functionality and extend it for
+ * Multi-Region.
+ * 
+ * Operational: All the nodes in this cluster needs to be able to (modify the
+ * Security group settings in AWS) communicate via Public IP's.
+ */
+public class Ec2MultiRegionSnitch extends Ec2Snitch implements IEndpointStateChangeSubscriber
+{
+    private static final String PUBLIC_IP_QUERY_URL = "http://169.254.169.254/latest/meta-data/public-ipv4";
+    private static final String PRIVATE_IP_QUERY_URL = "http://169.254.169.254/latest/meta-data/local-ipv4";
+    private final InetAddress public_ip;
+    private final String private_ip;
+
+    public Ec2MultiRegionSnitch() throws IOException, ConfigurationException
+    {
+        super();
+        public_ip = InetAddress.getByName(awsApiCall(PUBLIC_IP_QUERY_URL));
+        logger.info("EC2Snitch using publicIP as identifier: " + public_ip);
+        private_ip = awsApiCall(PRIVATE_IP_QUERY_URL);
+        // use the Public IP to broadcast Address to other nodes.
+        DatabaseDescriptor.setBroadcastAddress(public_ip);
+    }
+    
+    @Override
+    public void onJoin(InetAddress endpoint, EndpointState epState)
+    {
+        if (epState.getApplicationState(ApplicationState.INTERNAL_IP) != null)
+            reConnect(endpoint, epState.getApplicationState(ApplicationState.INTERNAL_IP));
+    }
+
+    @Override
+    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
+    {
+        if (state == ApplicationState.INTERNAL_IP)
+            reConnect(endpoint, value);
+    }
+
+    @Override
+    public void onAlive(InetAddress endpoint, EndpointState state)
+    {
+        if (state.getApplicationState(ApplicationState.INTERNAL_IP) != null)
+            reConnect(endpoint, state.getApplicationState(ApplicationState.INTERNAL_IP));
+    }
+
+    @Override
+    public void onDead(InetAddress endpoint, EndpointState state)
+    {
+        // do nothing
+    }
+
+    @Override
+    public void onRemove(InetAddress endpoint)
+    {
+        // do nothing.
+    }
+
+    private void reConnect(InetAddress endpoint, VersionedValue versionedValue)
+    {
+        if (!getDatacenter(endpoint).equals(getDatacenter(public_ip)))
+            return; // do nothing return back...
+        
+        try
+        {
+            InetAddress remoteIP = InetAddress.getByName(versionedValue.value);
+            MessagingService.instance().getConnectionPool(endpoint).reset(remoteIP);
+            logger.debug(String.format("Intiated reconnect to an Internal IP %s for the %s", remoteIP, endpoint));
+        } catch (UnknownHostException e)
+        {
+            logger.error("Error in getting the IP address resolved: ", e);
+        }
+    }
+    
+    public void gossiperStarting()
+    {
+        super.gossiperStarting();
+        Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP, StorageService.instance.valueFactory.internalIP(private_ip));
+        Gossiper.instance.register(this);
+    }
+}

Propchange: cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1151990&r1=1151989&r2=1151990&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Thu Jul 28 20:27:13 2011
@@ -44,10 +44,10 @@ public class OutboundTcpConnection exten
 {
     private static final Logger logger = LoggerFactory.getLogger(OutboundTcpConnection.class);
 
-    private static final ByteBuffer CLOSE_SENTINEL = ByteBuffer.allocate(0);
+    public static final ByteBuffer CLOSE_SENTINEL = ByteBuffer.allocate(0);
     private static final int OPEN_RETRY_DELAY = 100; // ms between retries
 
-    private final InetAddress endpoint;
+    private InetAddress endpoint;
     private final BlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>();
     private DataOutputStream output;
     private Socket socket;
@@ -56,7 +56,12 @@ public class OutboundTcpConnection exten
     public OutboundTcpConnection(InetAddress remoteEp)
     {
         super("WRITE-" + remoteEp);
-        this.endpoint = remoteEp;
+        setEndPoint(remoteEp);        
+    }
+    
+    public void setEndPoint(InetAddress remoteEndPoint)
+    {
+        this.endpoint = remoteEndPoint;
     }
 
     public void write(ByteBuffer buffer)

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?rev=1151990&r1=1151989&r2=1151990&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java Thu Jul 28 20:27:13 2011
@@ -22,7 +22,7 @@ import java.net.InetAddress;
 
 import org.apache.cassandra.concurrent.Stage;
 
-class OutboundTcpConnectionPool
+public class OutboundTcpConnectionPool
 {
     public final OutboundTcpConnection cmdCon;
     public final OutboundTcpConnection ackCon;
@@ -52,4 +52,12 @@ class OutboundTcpConnectionPool
         for (OutboundTcpConnection con : new OutboundTcpConnection[] { cmdCon, ackCon })
             con.closeSocket();
     }
+    
+    public void reset(InetAddress remoteEP)
+    {
+        ackCon.setEndPoint(remoteEP);
+        ackCon.write(OutboundTcpConnection.CLOSE_SENTINEL);
+        cmdCon.setEndPoint(remoteEP);
+        cmdCon.write(OutboundTcpConnection.CLOSE_SENTINEL);
+    }
 }