You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/01/12 19:07:54 UTC

svn commit: r1058258 - in /cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra: gms/ locator/

Author: brandonwilliams
Date: Wed Jan 12 18:07:53 2011
New Revision: 1058258

URL: http://svn.apache.org/viewvc?rev=1058258&view=rev
Log:
ec2 snitch.  Patch by Jon Hermes, Matt Dennis, and brandonwilliams.
Reviewed by brandonwilliams for CASSANDRA-1654

Added:
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/Ec2Snitch.java
Modified:
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/ApplicationState.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/EndpointState.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/VersionedValue.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/IEndpointSnitch.java

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/ApplicationState.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/ApplicationState.java?rev=1058258&r1=1058257&r2=1058258&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/ApplicationState.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/ApplicationState.java Wed Jan 12 18:07:53 2011
@@ -26,6 +26,8 @@ public enum ApplicationState
     STATUS,
     LOAD,
     SCHEMA,
+    DC,
+    RACK,
     // pad to allow adding new states to existing cluster
     X1,
     X2,

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/EndpointState.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/EndpointState.java?rev=1058258&r1=1058257&r2=1058258&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/EndpointState.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/EndpointState.java Wed Jan 12 18:07:53 2011
@@ -36,6 +36,8 @@ import org.cliffc.high_scale_lib.NonBloc
 
 public class EndpointState
 {
+    protected static Logger logger = LoggerFactory.getLogger(EndpointState.class);
+
     private final static ICompactSerializer<EndpointState> serializer_ = new EndpointStateSerializer();
 
     volatile HeartBeatState hbState_;

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1058258&r1=1058257&r2=1058258&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java Wed Jan 12 18:07:53 2011
@@ -433,7 +433,7 @@ public class Gossiper implements IFailur
         }
     }
 
-    EndpointState getEndpointStateForEndpoint(InetAddress ep)
+    public EndpointState getEndpointStateForEndpoint(InetAddress ep)
     {
         return endpointStateMap_.get(ep);
     }
@@ -848,6 +848,9 @@ public class Gossiper implements IFailur
             endpointStateMap_.put(localEndpoint_, localState);
         }
 
+        //notify snitches that Gossiper is about to start
+        DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
+
         scheduledGossipTask = StorageService.scheduledTasks.scheduleWithFixedDelay(new GossipTask(),
                                                                                    Gossiper.intervalInMillis_,
                                                                                    Gossiper.intervalInMillis_,

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/VersionedValue.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1058258&r1=1058257&r2=1058258&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/VersionedValue.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/VersionedValue.java Wed Jan 12 18:07:53 2011
@@ -132,6 +132,16 @@ public class VersionedValue implements C
                                         + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
         }
 
+        public VersionedValue datacenter(String dcId)
+        {
+            return new VersionedValue(dcId);
+        }
+
+        public VersionedValue rack(String rackId)
+        {
+            return new VersionedValue(rackId);
+        }
+
     }
 
     private static class VersionedValueSerializer implements ICompactSerializer<VersionedValue>

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java?rev=1058258&r1=1058257&r2=1058258&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java Wed Jan 12 18:07:53 2011
@@ -37,4 +37,9 @@ public abstract class AbstractEndpointSn
     {
         return a1.getHostAddress().compareTo(a2.getHostAddress());
     }
+
+    public void gossiperStarting()
+    {
+        //noop by default
+    }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1058258&r1=1058257&r2=1058258&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java Wed Jan 12 18:07:53 2011
@@ -105,6 +105,12 @@ public class DynamicEndpointSnitch exten
         }
     }
 
+    @Override
+    public void gossiperStarting()
+    {
+        subsnitch.gossiperStarting();
+    }
+
     public String getRack(InetAddress endpoint)
     {
         return subsnitch.getRack(endpoint);
@@ -302,5 +308,4 @@ class AdaptiveLatencyTracker extends Abs
         }
         return log;
     }
-
 }

Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/Ec2Snitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/Ec2Snitch.java?rev=1058258&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/Ec2Snitch.java (added)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/Ec2Snitch.java Wed Jan 12 18:07:53 2011
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.io.DataInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.URL;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * A snitch that assumes an EC2 region is a DC and an EC2 availability_zone
+ *  is a rack. This information is available in the config for the node.
+ */
+public class Ec2Snitch extends AbstractNetworkTopologySnitch
+{
+    protected static Logger logger = LoggerFactory.getLogger(Ec2Snitch.class);
+    protected String ec2zone;
+    protected String ec2region;
+
+    public Ec2Snitch() throws IOException, ConfigurationException
+    {
+        // Populate the region and zone by introspection, fail if 404 on metadata
+        HttpURLConnection conn = (HttpURLConnection) new URL("http://169.254.169.254/latest/meta-data/placement/availability-zone").openConnection();
+        conn.setRequestMethod("GET");
+        if (conn.getResponseCode() != 200)
+        {
+            throw new ConfigurationException("Ec2Snitch was unable to find region/zone data. Not an ec2 node?");
+        }
+
+        // Read the information. I wish I could say (String) conn.getContent() here...
+        int cl = conn.getContentLength();
+        byte[] b = new byte[cl];
+        DataInputStream d = new DataInputStream((FilterInputStream)conn.getContent());
+        d.readFully(b);
+
+        // Split "us-east-1a" or "asia-1a" into "us-east"/"1a" and "asia"/"1a".
+        String azone = new String(b ,"UTF-8");
+        String[] splits = azone.split("-");
+        ec2zone = splits[splits.length - 1];
+        ec2region = splits.length < 3 ? splits[0] : splits[0]+"-"+splits[1];
+        logger.info("EC2Snitch using region: " + ec2region + ", zone: " + ec2zone + ".");
+    }
+
+    public String getRack(InetAddress endpoint)
+    {
+        if (endpoint == FBUtilities.getLocalAddress())
+            return ec2zone;
+        else
+            return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RACK).value;
+    }
+
+    public String getDatacenter(InetAddress endpoint)
+    {
+        if (endpoint == FBUtilities.getLocalAddress())
+            return ec2region;
+        else
+            return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.DC).value;
+    }
+
+    @Override
+    public void gossiperStarting()
+    {
+        // Share EC2 info via gossip.  We have to wait until Gossiper is initialized though.
+        logger.info("Ec2Snitch adding ApplicationState ec2region=" + ec2region + " ec2zone=" + ec2zone);
+        Gossiper.instance.addLocalApplicationState(ApplicationState.DC, StorageService.valueFactory.datacenter(ec2region));
+        Gossiper.instance.addLocalApplicationState(ApplicationState.RACK, StorageService.valueFactory.rack(ec2zone));
+    }
+}

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/IEndpointSnitch.java?rev=1058258&r1=1058257&r2=1058258&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/IEndpointSnitch.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/IEndpointSnitch.java Wed Jan 12 18:07:53 2011
@@ -54,4 +54,9 @@ public interface IEndpointSnitch
      * compares two endpoints in relation to the target endpoint, returning as Comparator.compare would
      */
     public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2);
+
+    /**
+     * called after Gossiper instance exists immediately before it starts gossiping
+     */
+    public void gossiperStarting();
 }
\ No newline at end of file