You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/30 16:42:09 UTC

svn commit: r831339 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: locator/DatacenterEndPointSnitch.java locator/DatacenterShardStategy.java service/DatacenterQuorumResponseHandler.java service/StorageProxy.java

Author: jbellis
Date: Fri Oct 30 15:42:09 2009
New Revision: 831339

URL: http://svn.apache.org/viewvc?rev=831339&view=rev
Log:
fixes for DQ quorum code.  patch by Vijay Parthasarathy; reviewed by jbellis for CASSANDRA-492

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java?rev=831339&r1=831338&r2=831339&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java Fri Oct 30 15:42:09 2009
@@ -49,7 +49,6 @@
     public DatacenterEndPointSnitch() throws IOException,
                                              ParserConfigurationException, SAXException
     {
-        super();
         xmlUtils = new XMLUtils(DEFAULT_RACK_CONFIG_FILE);
         reloadConfiguration();
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java?rev=831339&r1=831338&r2=831339&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java Fri Oct 30 15:42:09 2009
@@ -48,11 +48,12 @@
      *
      * @param tokenToEndPointMap - Provided the endpoint map which will be mapped with the DC's
      */
-    private void loadEndPoints(Map<Token, InetAddress> tokenToEndPointMap, Collection<Token> tokens) throws IOException
+    private synchronized void loadEndPoints(Map<Token, InetAddress> tokenToEndPointMap, Collection<Token> tokens) throws IOException
     {
         endPointSnitch = (DatacenterEndPointSnitch) StorageService.instance().getEndPointSnitch();
         this.tokens = new ArrayList<Token>(tokens);
         String localDC = endPointSnitch.getLocation(InetAddress.getLocalHost());
+        dcMap = new HashMap<String, List<Token>>();
         for (Token token : this.tokens)
         {
             InetAddress endPoint = tokenToEndPointMap.get(token);
@@ -150,6 +151,7 @@
                 if ((replicas_ - 1) > foundCount)
                 {
                     forloopReturn.add(endPointOfIntrest);
+                    foundCount++;
                     continue;
                 }
                 else
@@ -205,7 +207,7 @@
         if (consistency_level == ConsistencyLevel.DCQUORUM)
         {
             List<InetAddress> endpoints = getLocalEndPoints();
-            return new DatacenterQuorumResponseHandler<T>(endpoints, locQFactor, responseResolver);
+            return new DatacenterQuorumResponseHandler<T>(locQFactor, responseResolver);
         }
         else if (consistency_level == ConsistencyLevel.DCQUORUMSYNC)
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java?rev=831339&r1=831338&r2=831339&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java Fri Oct 30 15:42:09 2009
@@ -4,9 +4,13 @@
 package org.apache.cassandra.service;
 
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.List;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.IEndPointSnitch;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * This class will basically will block for the replication factor which is
@@ -15,36 +19,47 @@
  */
 public class DatacenterQuorumResponseHandler<T> extends QuorumResponseHandler<T>
 {
-    private final List<InetAddress> waitList;
     private int blockFor;
+    private IEndPointSnitch endpointsnitch;
+    private InetAddress localEndpoint;
 
-    public DatacenterQuorumResponseHandler(List<InetAddress> waitList, int blockFor, IResponseResolver<T> responseResolver)
+    public DatacenterQuorumResponseHandler(int blockFor, IResponseResolver<T> responseResolver)
     throws InvalidRequestException
     {
         // Response is been managed by the map so the waitlist size really doesnt matter.
         super(blockFor, responseResolver);
         this.blockFor = blockFor;
-        this.waitList = waitList;
+        endpointsnitch = DatabaseDescriptor.getEndPointSnitch();
+        localEndpoint = FBUtilities.getLocalAddress();
     }
 
     @Override
     public void response(Message message)
     {
+        // IF done look no futher.
         if (condition_.isSignaled())
         {
             return;
         }
-
-        if (waitList.contains(message.getFrom()))
+            //Is optimal to check if same datacenter than comparing Arrays.
+        try
+        {
+            if (endpointsnitch.isInSameDataCenter(localEndpoint, message.getFrom()))
+            {
+                blockFor--;
+            }
+        }
+        catch (UnknownHostException e)
         {
-            blockFor--;
+            throw new RuntimeException(e);
         }
         responses_.add(message);
-        // If done then the response count will be empty after removing
-        // everything.
         if (blockFor <= 0)
         {
+            //Singnal when Quorum is recived.
             condition_.signal();
         }
+        if (logger_.isDebugEnabled())
+            logger_.debug("Processed Message: " + message.toString());
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=831339&r1=831338&r2=831339&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Fri Oct 30 15:42:09 2009
@@ -204,7 +204,6 @@
 
     private static int determineBlockFor(int naturalTargets, int hintedTargets, int consistency_level)
     {
-        // TODO this is broken for DC quorum / DC quorum sync
         int bootstrapTargets = hintedTargets - naturalTargets;
         int blockFor;
         if (consistency_level == ConsistencyLevel.ONE)
@@ -215,6 +214,11 @@
         {
             blockFor = (naturalTargets / 2) + 1 + bootstrapTargets;
         }
+        else if (consistency_level == ConsistencyLevel.DCQUORUM || consistency_level == ConsistencyLevel.DCQUORUMSYNC)
+        {
+            // TODO this is broken
+            blockFor = naturalTargets;
+        }
         else if (consistency_level == ConsistencyLevel.ALL)
         {
             blockFor = naturalTargets + bootstrapTargets;