You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/01/12 19:07:54 UTC
svn commit: r1058258 - in
/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra: gms/
locator/
Author: brandonwilliams
Date: Wed Jan 12 18:07:53 2011
New Revision: 1058258
URL: http://svn.apache.org/viewvc?rev=1058258&view=rev
Log:
ec2 snitch. Patch by Jon Hermes, Matt Dennis, and brandonwilliams.
Reviewed by brandonwilliams for CASSANDRA-1654
Added:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/Ec2Snitch.java
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/ApplicationState.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/EndpointState.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/VersionedValue.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/ApplicationState.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/ApplicationState.java?rev=1058258&r1=1058257&r2=1058258&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/ApplicationState.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/ApplicationState.java Wed Jan 12 18:07:53 2011
@@ -26,6 +26,8 @@ public enum ApplicationState
STATUS,
LOAD,
SCHEMA,
+ DC,
+ RACK,
// pad to allow adding new states to existing cluster
X1,
X2,
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/EndpointState.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/EndpointState.java?rev=1058258&r1=1058257&r2=1058258&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/EndpointState.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/EndpointState.java Wed Jan 12 18:07:53 2011
@@ -36,6 +36,8 @@ import org.cliffc.high_scale_lib.NonBloc
public class EndpointState
{
+ protected static Logger logger = LoggerFactory.getLogger(EndpointState.class);
+
private final static ICompactSerializer<EndpointState> serializer_ = new EndpointStateSerializer();
volatile HeartBeatState hbState_;
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1058258&r1=1058257&r2=1058258&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java Wed Jan 12 18:07:53 2011
@@ -433,7 +433,7 @@ public class Gossiper implements IFailur
}
}
- EndpointState getEndpointStateForEndpoint(InetAddress ep)
+ public EndpointState getEndpointStateForEndpoint(InetAddress ep)
{
return endpointStateMap_.get(ep);
}
@@ -848,6 +848,9 @@ public class Gossiper implements IFailur
endpointStateMap_.put(localEndpoint_, localState);
}
+ //notify snitches that Gossiper is about to start
+ DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
+
scheduledGossipTask = StorageService.scheduledTasks.scheduleWithFixedDelay(new GossipTask(),
Gossiper.intervalInMillis_,
Gossiper.intervalInMillis_,
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/VersionedValue.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1058258&r1=1058257&r2=1058258&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/VersionedValue.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/VersionedValue.java Wed Jan 12 18:07:53 2011
@@ -132,6 +132,16 @@ public class VersionedValue implements C
+ VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
}
+ public VersionedValue datacenter(String dcId)
+ {
+ return new VersionedValue(dcId);
+ }
+
+ public VersionedValue rack(String rackId)
+ {
+ return new VersionedValue(rackId);
+ }
+
}
private static class VersionedValueSerializer implements ICompactSerializer<VersionedValue>
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java?rev=1058258&r1=1058257&r2=1058258&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java Wed Jan 12 18:07:53 2011
@@ -37,4 +37,9 @@ public abstract class AbstractEndpointSn
{
return a1.getHostAddress().compareTo(a2.getHostAddress());
}
+
+ public void gossiperStarting()
+ {
+ //noop by default
+ }
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1058258&r1=1058257&r2=1058258&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java Wed Jan 12 18:07:53 2011
@@ -105,6 +105,12 @@ public class DynamicEndpointSnitch exten
}
}
+ @Override
+ public void gossiperStarting()
+ {
+ subsnitch.gossiperStarting();
+ }
+
public String getRack(InetAddress endpoint)
{
return subsnitch.getRack(endpoint);
@@ -302,5 +308,4 @@ class AdaptiveLatencyTracker extends Abs
}
return log;
}
-
}
Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/Ec2Snitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/Ec2Snitch.java?rev=1058258&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/Ec2Snitch.java (added)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/Ec2Snitch.java Wed Jan 12 18:07:53 2011
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.io.DataInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.URL;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * A snitch that assumes an EC2 region is a DC and an EC2 availability_zone
+ * is a rack. This information is available in the config for the node.
+ */
+public class Ec2Snitch extends AbstractNetworkTopologySnitch
+{
+ protected static Logger logger = LoggerFactory.getLogger(Ec2Snitch.class);
+ protected String ec2zone;
+ protected String ec2region;
+
+ public Ec2Snitch() throws IOException, ConfigurationException
+ {
+ // Populate the region and zone by introspection, fail if 404 on metadata
+ HttpURLConnection conn = (HttpURLConnection) new URL("http://169.254.169.254/latest/meta-data/placement/availability-zone").openConnection();
+ conn.setRequestMethod("GET");
+ if (conn.getResponseCode() != 200)
+ {
+ throw new ConfigurationException("Ec2Snitch was unable to find region/zone data. Not an ec2 node?");
+ }
+
+ // Read the information. I wish I could say (String) conn.getContent() here...
+ int cl = conn.getContentLength();
+ byte[] b = new byte[cl];
+ DataInputStream d = new DataInputStream((FilterInputStream)conn.getContent());
+ d.readFully(b);
+
+ // Split "us-east-1a" or "asia-1a" into "us-east"/"1a" and "asia"/"1a".
+ String azone = new String(b ,"UTF-8");
+ String[] splits = azone.split("-");
+ ec2zone = splits[splits.length - 1];
+ ec2region = splits.length < 3 ? splits[0] : splits[0]+"-"+splits[1];
+ logger.info("EC2Snitch using region: " + ec2region + ", zone: " + ec2zone + ".");
+ }
+
+ public String getRack(InetAddress endpoint)
+ {
+ if (endpoint == FBUtilities.getLocalAddress())
+ return ec2zone;
+ else
+ return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RACK).value;
+ }
+
+ public String getDatacenter(InetAddress endpoint)
+ {
+ if (endpoint == FBUtilities.getLocalAddress())
+ return ec2region;
+ else
+ return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.DC).value;
+ }
+
+ @Override
+ public void gossiperStarting()
+ {
+ // Share EC2 info via gossip. We have to wait until Gossiper is initialized though.
+ logger.info("Ec2Snitch adding ApplicationState ec2region=" + ec2region + " ec2zone=" + ec2zone);
+ Gossiper.instance.addLocalApplicationState(ApplicationState.DC, StorageService.valueFactory.datacenter(ec2region));
+ Gossiper.instance.addLocalApplicationState(ApplicationState.RACK, StorageService.valueFactory.rack(ec2zone));
+ }
+}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/IEndpointSnitch.java?rev=1058258&r1=1058257&r2=1058258&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/IEndpointSnitch.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/IEndpointSnitch.java Wed Jan 12 18:07:53 2011
@@ -54,4 +54,9 @@ public interface IEndpointSnitch
* compares two endpoints in relation to the target endpoint, returning as Comparator.compare would
*/
public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2);
+
+ /**
+ * called after Gossiper instance exists immediately before it starts gossiping
+ */
+ public void gossiperStarting();
}
\ No newline at end of file