You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/07/28 22:27:14 UTC
svn commit: r1151990 - in /cassandra/trunk/src/java/org/apache/cassandra:
gms/ApplicationState.java gms/VersionedValue.java
locator/Ec2MultiRegionSnitch.java net/OutboundTcpConnection.java
net/OutboundTcpConnectionPool.java
Author: brandonwilliams
Date: Thu Jul 28 20:27:13 2011
New Revision: 1151990
URL: http://svn.apache.org/viewvc?rev=1151990&view=rev
Log:
Ec2 snitch with support for multiple regions.
Patch by Vijay Parthasarathy, reviewed by brandonwilliams for
CASSANDRA-2452
Added:
cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java (with props)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java
cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java?rev=1151990&r1=1151989&r2=1151990&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java Thu Jul 28 20:27:13 2011
@@ -29,6 +29,7 @@ public enum ApplicationState
DC,
RACK,
RELEASE_VERSION,
+ INTERNAL_IP,
// pad to allow adding new states to existing cluster
X1,
X2,
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1151990&r1=1151989&r2=1151990&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java Thu Jul 28 20:27:13 2011
@@ -159,6 +159,11 @@ public class VersionedValue implements C
{
return new VersionedValue(FBUtilities.getReleaseVersionString());
}
+
+ public VersionedValue internalIP(String private_ip)
+ {
+ return new VersionedValue(private_ip);
+ }
}
private static class VersionedValueSerializer implements ICompactSerializer<VersionedValue>
Added: cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java?rev=1151990&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java Thu Jul 28 20:27:13 2011
@@ -0,0 +1,103 @@
+package org.apache.cassandra.locator;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+
+/**
+ * 1) Snitch will automatically set the public IP by querying the AWS API
+ *
+ * 2) Snitch will set the private IP as a Gossip application state.
+ *
+ * 3) Snitch implements IESCS and will reset the connection if it is within the
+ * same region to communicate via private IP.
+ *
+ * Implements Ec2Snitch to inherit its functionality and extend it for
+ * Multi-Region.
+ *
+ * Operational: All the nodes in this cluster needs to be able to (modify the
+ * Security group settings in AWS) communicate via Public IP's.
+ */
+public class Ec2MultiRegionSnitch extends Ec2Snitch implements IEndpointStateChangeSubscriber
+{
+ private static final String PUBLIC_IP_QUERY_URL = "http://169.254.169.254/latest/meta-data/public-ipv4";
+ private static final String PRIVATE_IP_QUERY_URL = "http://169.254.169.254/latest/meta-data/local-ipv4";
+ private final InetAddress public_ip;
+ private final String private_ip;
+
+ public Ec2MultiRegionSnitch() throws IOException, ConfigurationException
+ {
+ super();
+ public_ip = InetAddress.getByName(awsApiCall(PUBLIC_IP_QUERY_URL));
+ logger.info("EC2Snitch using publicIP as identifier: " + public_ip);
+ private_ip = awsApiCall(PRIVATE_IP_QUERY_URL);
+ // use the Public IP to broadcast Address to other nodes.
+ DatabaseDescriptor.setBroadcastAddress(public_ip);
+ }
+
+ @Override
+ public void onJoin(InetAddress endpoint, EndpointState epState)
+ {
+ if (epState.getApplicationState(ApplicationState.INTERNAL_IP) != null)
+ reConnect(endpoint, epState.getApplicationState(ApplicationState.INTERNAL_IP));
+ }
+
+ @Override
+ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
+ {
+ if (state == ApplicationState.INTERNAL_IP)
+ reConnect(endpoint, value);
+ }
+
+ @Override
+ public void onAlive(InetAddress endpoint, EndpointState state)
+ {
+ if (state.getApplicationState(ApplicationState.INTERNAL_IP) != null)
+ reConnect(endpoint, state.getApplicationState(ApplicationState.INTERNAL_IP));
+ }
+
+ @Override
+ public void onDead(InetAddress endpoint, EndpointState state)
+ {
+ // do nothing
+ }
+
+ @Override
+ public void onRemove(InetAddress endpoint)
+ {
+ // do nothing.
+ }
+
+ private void reConnect(InetAddress endpoint, VersionedValue versionedValue)
+ {
+ if (!getDatacenter(endpoint).equals(getDatacenter(public_ip)))
+ return; // do nothing return back...
+
+ try
+ {
+ InetAddress remoteIP = InetAddress.getByName(versionedValue.value);
+ MessagingService.instance().getConnectionPool(endpoint).reset(remoteIP);
+ logger.debug(String.format("Intiated reconnect to an Internal IP %s for the %s", remoteIP, endpoint));
+ } catch (UnknownHostException e)
+ {
+ logger.error("Error in getting the IP address resolved: ", e);
+ }
+ }
+
+ public void gossiperStarting()
+ {
+ super.gossiperStarting();
+ Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP, StorageService.instance.valueFactory.internalIP(private_ip));
+ Gossiper.instance.register(this);
+ }
+}
Propchange: cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1151990&r1=1151989&r2=1151990&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Thu Jul 28 20:27:13 2011
@@ -44,10 +44,10 @@ public class OutboundTcpConnection exten
{
private static final Logger logger = LoggerFactory.getLogger(OutboundTcpConnection.class);
- private static final ByteBuffer CLOSE_SENTINEL = ByteBuffer.allocate(0);
+ public static final ByteBuffer CLOSE_SENTINEL = ByteBuffer.allocate(0);
private static final int OPEN_RETRY_DELAY = 100; // ms between retries
- private final InetAddress endpoint;
+ private InetAddress endpoint;
private final BlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>();
private DataOutputStream output;
private Socket socket;
@@ -56,7 +56,12 @@ public class OutboundTcpConnection exten
public OutboundTcpConnection(InetAddress remoteEp)
{
super("WRITE-" + remoteEp);
- this.endpoint = remoteEp;
+ setEndPoint(remoteEp);
+ }
+
+ public void setEndPoint(InetAddress remoteEndPoint)
+ {
+ this.endpoint = remoteEndPoint;
}
public void write(ByteBuffer buffer)
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?rev=1151990&r1=1151989&r2=1151990&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java Thu Jul 28 20:27:13 2011
@@ -22,7 +22,7 @@ import java.net.InetAddress;
import org.apache.cassandra.concurrent.Stage;
-class OutboundTcpConnectionPool
+public class OutboundTcpConnectionPool
{
public final OutboundTcpConnection cmdCon;
public final OutboundTcpConnection ackCon;
@@ -52,4 +52,12 @@ class OutboundTcpConnectionPool
for (OutboundTcpConnection con : new OutboundTcpConnection[] { cmdCon, ackCon })
con.closeSocket();
}
+
+ public void reset(InetAddress remoteEP)
+ {
+ ackCon.setEndPoint(remoteEP);
+ ackCon.write(OutboundTcpConnection.CLOSE_SENTINEL);
+ cmdCon.setEndPoint(remoteEP);
+ cmdCon.write(OutboundTcpConnection.CLOSE_SENTINEL);
+ }
}