You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/30 16:42:09 UTC
svn commit: r831339 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra:
locator/DatacenterEndPointSnitch.java locator/DatacenterShardStategy.java
service/DatacenterQuorumResponseHandler.java service/StorageProxy.java
Author: jbellis
Date: Fri Oct 30 15:42:09 2009
New Revision: 831339
URL: http://svn.apache.org/viewvc?rev=831339&view=rev
Log:
fixes for DQ quorum code. patch by Vijay Parthasarathy; reviewed by jbellis for CASSANDRA-492
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java?rev=831339&r1=831338&r2=831339&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java Fri Oct 30 15:42:09 2009
@@ -49,7 +49,6 @@
public DatacenterEndPointSnitch() throws IOException,
ParserConfigurationException, SAXException
{
- super();
xmlUtils = new XMLUtils(DEFAULT_RACK_CONFIG_FILE);
reloadConfiguration();
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java?rev=831339&r1=831338&r2=831339&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java Fri Oct 30 15:42:09 2009
@@ -48,11 +48,12 @@
*
* @param tokenToEndPointMap - Provided the endpoint map which will be mapped with the DC's
*/
- private void loadEndPoints(Map<Token, InetAddress> tokenToEndPointMap, Collection<Token> tokens) throws IOException
+ private synchronized void loadEndPoints(Map<Token, InetAddress> tokenToEndPointMap, Collection<Token> tokens) throws IOException
{
endPointSnitch = (DatacenterEndPointSnitch) StorageService.instance().getEndPointSnitch();
this.tokens = new ArrayList<Token>(tokens);
String localDC = endPointSnitch.getLocation(InetAddress.getLocalHost());
+ dcMap = new HashMap<String, List<Token>>();
for (Token token : this.tokens)
{
InetAddress endPoint = tokenToEndPointMap.get(token);
@@ -150,6 +151,7 @@
if ((replicas_ - 1) > foundCount)
{
forloopReturn.add(endPointOfIntrest);
+ foundCount++;
continue;
}
else
@@ -205,7 +207,7 @@
if (consistency_level == ConsistencyLevel.DCQUORUM)
{
List<InetAddress> endpoints = getLocalEndPoints();
- return new DatacenterQuorumResponseHandler<T>(endpoints, locQFactor, responseResolver);
+ return new DatacenterQuorumResponseHandler<T>(locQFactor, responseResolver);
}
else if (consistency_level == ConsistencyLevel.DCQUORUMSYNC)
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java?rev=831339&r1=831338&r2=831339&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java Fri Oct 30 15:42:09 2009
@@ -4,9 +4,13 @@
package org.apache.cassandra.service;
import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.List;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.IEndPointSnitch;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.FBUtilities;
/**
* This class will basically will block for the replication factor which is
@@ -15,36 +19,47 @@
*/
public class DatacenterQuorumResponseHandler<T> extends QuorumResponseHandler<T>
{
- private final List<InetAddress> waitList;
private int blockFor;
+ private IEndPointSnitch endpointsnitch;
+ private InetAddress localEndpoint;
- public DatacenterQuorumResponseHandler(List<InetAddress> waitList, int blockFor, IResponseResolver<T> responseResolver)
+ public DatacenterQuorumResponseHandler(int blockFor, IResponseResolver<T> responseResolver)
throws InvalidRequestException
{
// Response is been managed by the map so the waitlist size really doesnt matter.
super(blockFor, responseResolver);
this.blockFor = blockFor;
- this.waitList = waitList;
+ endpointsnitch = DatabaseDescriptor.getEndPointSnitch();
+ localEndpoint = FBUtilities.getLocalAddress();
}
@Override
public void response(Message message)
{
+ // IF done look no futher.
if (condition_.isSignaled())
{
return;
}
-
- if (waitList.contains(message.getFrom()))
+ //Is optimal to check if same datacenter than comparing Arrays.
+ try
+ {
+ if (endpointsnitch.isInSameDataCenter(localEndpoint, message.getFrom()))
+ {
+ blockFor--;
+ }
+ }
+ catch (UnknownHostException e)
{
- blockFor--;
+ throw new RuntimeException(e);
}
responses_.add(message);
- // If done then the response count will be empty after removing
- // everything.
if (blockFor <= 0)
{
+ //Singnal when Quorum is recived.
condition_.signal();
}
+ if (logger_.isDebugEnabled())
+ logger_.debug("Processed Message: " + message.toString());
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=831339&r1=831338&r2=831339&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Fri Oct 30 15:42:09 2009
@@ -204,7 +204,6 @@
private static int determineBlockFor(int naturalTargets, int hintedTargets, int consistency_level)
{
- // TODO this is broken for DC quorum / DC quorum sync
int bootstrapTargets = hintedTargets - naturalTargets;
int blockFor;
if (consistency_level == ConsistencyLevel.ONE)
@@ -215,6 +214,11 @@
{
blockFor = (naturalTargets / 2) + 1 + bootstrapTargets;
}
+ else if (consistency_level == ConsistencyLevel.DCQUORUM || consistency_level == ConsistencyLevel.DCQUORUMSYNC)
+ {
+ // TODO this is broken
+ blockFor = naturalTargets;
+ }
else if (consistency_level == ConsistencyLevel.ALL)
{
blockFor = naturalTargets + bootstrapTargets;