You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/01/08 17:23:08 UTC

[1/2] git commit: Fix execution of LOCAL_QUORUM queries with SimpleStrategy

Updated Branches:
  refs/heads/cassandra-2.0 5fa605510 -> f8fd7db67


Fix execution of LOCAL_QUORUM queries with SimpleStrategy

patch by alexliu68; reviewed by slebresne for CASSANDRA-6545


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/32d7cb50
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/32d7cb50
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/32d7cb50

Branch: refs/heads/cassandra-2.0
Commit: 32d7cb5066050ef6123f50a25c6e9b4c9e180ea0
Parents: 2a7c20e
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jan 8 17:14:58 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 8 17:15:20 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ConsistencyLevel.java   | 60 +++++++++++++-------
 .../locator/AbstractReplicationStrategy.java    |  2 +-
 3 files changed, 42 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/32d7cb50/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5a85977..cba97d0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@
  * fsync compression metadata (CASSANDRA-6531)
  * Validate CF existence on execution for prepared statement (CASSANDRA-6535)
  * Add ability to throttle batchlog replay (CASSANDRA-6550)
+ * Fix executing LOCAL_QUORUM with SimpleStrategy (CASSANDRA-6545)
 
 
 1.2.13

http://git-wip-us.apache.org/repos/asf/cassandra/blob/32d7cb50/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 4d72767..3737c73 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -88,9 +88,16 @@ public enum ConsistencyLevel
         return codeIdx[code];
     }
 
+    private int quorumFor(Table table)
+    {
+        return (table.getReplicationStrategy().getReplicationFactor() / 2) + 1;
+    }
+
     private int localQuorumFor(Table table, String dc)
     {
-        return (((NetworkTopologyStrategy) table.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1;
+        return (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
+             ? (((NetworkTopologyStrategy) table.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1
+             : quorumFor(table);
     }
 
     public int blockFor(Table table)
@@ -107,17 +114,24 @@ public enum ConsistencyLevel
             case THREE:
                 return 3;
             case QUORUM:
-                return (table.getReplicationStrategy().getReplicationFactor() / 2) + 1;
+                return quorumFor(table);
             case ALL:
                 return table.getReplicationStrategy().getReplicationFactor();
             case LOCAL_QUORUM:
                 return localQuorumFor(table, DatabaseDescriptor.getLocalDataCenter());
             case EACH_QUORUM:
-                NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) table.getReplicationStrategy();
-                int n = 0;
-                for (String dc : strategy.getDatacenters())
-                    n += localQuorumFor(table, dc);
-                return n;
+                if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
+                {
+                    NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) table.getReplicationStrategy();
+                    int n = 0;
+                    for (String dc : strategy.getDatacenters())
+                        n += localQuorumFor(table, dc);
+                    return n;
+                }
+                else
+                {
+                    return quorumFor(table);
+                }
             default:
                 throw new UnsupportedOperationException("Invalid consistency level: " + toString());
         }
@@ -208,16 +222,20 @@ public enum ConsistencyLevel
                 // local hint is acceptable, and local node is always live
                 return true;
             case LOCAL_ONE:
-                    return countLocalEndpoints(liveEndpoints) >= 1;
+                return countLocalEndpoints(liveEndpoints) >= 1;
             case LOCAL_QUORUM:
                 return countLocalEndpoints(liveEndpoints) >= blockFor(table);
             case EACH_QUORUM:
-                for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
+                if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
                 {
-                    if (entry.getValue() < localQuorumFor(table, entry.getKey()))
-                        return false;
+                    for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
+                    {
+                        if (entry.getValue() < localQuorumFor(table, entry.getKey()))
+                            return false;
+                    }
+                    return true;
                 }
-                return true;
+                // Fallthough on purpose for SimpleStrategy
             default:
                 return Iterables.size(liveEndpoints) >= blockFor(table);
         }
@@ -250,14 +268,18 @@ public enum ConsistencyLevel
                 }
                 break;
             case EACH_QUORUM:
-                for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
+                if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
                 {
-                    int dcBlockFor = localQuorumFor(table, entry.getKey());
-                    int dcLive = entry.getValue();
-                    if (dcLive < dcBlockFor)
-                        throw new UnavailableException(this, dcBlockFor, dcLive);
+                    for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
+                    {
+                        int dcBlockFor = localQuorumFor(table, entry.getKey());
+                        int dcLive = entry.getValue();
+                        if (dcLive < dcBlockFor)
+                            throw new UnavailableException(this, dcBlockFor, dcLive);
+                    }
+                    break;
                 }
-                break;
+                // Fallthough on purpose for SimpleStrategy
             default:
                 int live = Iterables.size(liveEndpoints);
                 if (live < blockFor)
@@ -282,8 +304,6 @@ public enum ConsistencyLevel
 
     public void validateForWrite(String table) throws InvalidRequestException
     {
-        if(this == EACH_QUORUM)
-            requireNetworkTopologyStrategy(table);
     }
 
     public void validateCounterForWrite(CFMetaData metadata) throws InvalidRequestException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/32d7cb50/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index a48bec9..e4dd422 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -134,7 +134,7 @@ public abstract class AbstractReplicationStrategy
             // block for in this context will be localnodes block.
             return new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
         }
-        else if (consistency_level == ConsistencyLevel.EACH_QUORUM)
+        else if (consistency_level == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
         {
             return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
         }


[2/2] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0

Posted by sl...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0

Conflicts:
	src/java/org/apache/cassandra/db/ConsistencyLevel.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8fd7db6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8fd7db6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8fd7db6

Branch: refs/heads/cassandra-2.0
Commit: f8fd7db67e3ff7268bd2bc96bd1373d54664b7dd
Parents: 5fa6055 32d7cb5
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jan 8 17:22:55 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 8 17:22:55 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ConsistencyLevel.java   | 59 +++++++++++++-------
 .../locator/AbstractReplicationStrategy.java    |  2 +-
 3 files changed, 41 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fd7db6/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index faf52a4,cba97d0..d0b63a0
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -5,29 -8,10 +5,30 @@@ Merged from 1.2
   * fsync compression metadata (CASSANDRA-6531)
   * Validate CF existence on execution for prepared statement (CASSANDRA-6535)
   * Add ability to throttle batchlog replay (CASSANDRA-6550)
+  * Fix executing LOCAL_QUORUM with SimpleStrategy (CASSANDRA-6545)
  
  
 -1.2.13
 +2.0.4
 + * Allow removing snapshots of no-longer-existing CFs (CASSANDRA-6418)
 + * add StorageService.stopDaemon() (CASSANDRA-4268)
 + * add IRE for invalid CF supplied to get_count (CASSANDRA-5701)
 + * add client encryption support to sstableloader (CASSANDRA-6378)
 + * Fix accept() loop for SSL sockets post-shutdown (CASSANDRA-6468)
 + * Fix size-tiered compaction in LCS L0 (CASSANDRA-6496)
 + * Fix assertion failure in filterColdSSTables (CASSANDRA-6483)
 + * Fix row tombstones in larger-than-memory compactions (CASSANDRA-6008)
 + * Fix cleanup ClassCastException (CASSANDRA-6462)
 + * Reduce gossip memory use by interning VersionedValue strings (CASSANDRA-6410)
 + * Allow specifying datacenters to participate in a repair (CASSANDRA-6218)
 + * Fix divide-by-zero in PCI (CASSANDRA-6403)
 + * Fix setting last compacted key in the wrong level for LCS (CASSANDRA-6284)
 + * Add millisecond precision formats to the timestamp parser (CASSANDRA-6395)
 + * Expose a total memtable size metric for a CF (CASSANDRA-6391)
 + * cqlsh: handle symlinks properly (CASSANDRA-6425)
 + * Fix potential infinite loop when paging query with IN (CASSANDRA-6464)
 + * Fix assertion error in AbstractQueryPager.discardFirst (CASSANDRA-6447)
 + * Fix streaming older SSTable yields unnecessary tombstones (CASSANDRA-6527)
 +Merged from 1.2:
   * Improved error message on bad properties in DDL queries (CASSANDRA-6453)
   * Randomize batchlog candidates selection (CASSANDRA-6481)
   * Fix thundering herd on endpoint cache invalidation (CASSANDRA-6345, 6485)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fd7db6/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 0f6aba7,3737c73..6d04314
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@@ -89,12 -88,19 +89,19 @@@ public enum ConsistencyLeve
          return codeIdx[code];
      }
  
 -    private int quorumFor(Table table)
++    private int quorumFor(Keyspace keyspace)
+     {
 -        return (table.getReplicationStrategy().getReplicationFactor() / 2) + 1;
++        return (keyspace.getReplicationStrategy().getReplicationFactor() / 2) + 1;
+     }
+ 
 -    private int localQuorumFor(Table table, String dc)
 +    private int localQuorumFor(Keyspace keyspace, String dc)
      {
-         return (((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1;
 -        return (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
 -             ? (((NetworkTopologyStrategy) table.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1
 -             : quorumFor(table);
++        return (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
++             ? (((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1
++             : quorumFor(keyspace);
      }
  
 -    public int blockFor(Table table)
 +    public int blockFor(Keyspace keyspace)
      {
          switch (this)
          {
@@@ -108,17 -114,24 +115,24 @@@
              case THREE:
                  return 3;
              case QUORUM:
-                 return (keyspace.getReplicationStrategy().getReplicationFactor() / 2) + 1;
 -                return quorumFor(table);
++                return quorumFor(keyspace);
              case ALL:
 -                return table.getReplicationStrategy().getReplicationFactor();
 +                return keyspace.getReplicationStrategy().getReplicationFactor();
              case LOCAL_QUORUM:
 -                return localQuorumFor(table, DatabaseDescriptor.getLocalDataCenter());
 +                return localQuorumFor(keyspace, DatabaseDescriptor.getLocalDataCenter());
              case EACH_QUORUM:
-                 NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy();
-                 int n = 0;
-                 for (String dc : strategy.getDatacenters())
-                     n += localQuorumFor(keyspace, dc);
-                 return n;
 -                if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
++                if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
+                 {
 -                    NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) table.getReplicationStrategy();
++                    NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy();
+                     int n = 0;
+                     for (String dc : strategy.getDatacenters())
 -                        n += localQuorumFor(table, dc);
++                        n += localQuorumFor(keyspace, dc);
+                     return n;
+                 }
+                 else
+                 {
 -                    return quorumFor(table);
++                    return quorumFor(keyspace);
+                 }
              default:
                  throw new UnsupportedOperationException("Invalid consistency level: " + toString());
          }
@@@ -211,16 -224,20 +225,20 @@@
              case LOCAL_ONE:
                  return countLocalEndpoints(liveEndpoints) >= 1;
              case LOCAL_QUORUM:
 -                return countLocalEndpoints(liveEndpoints) >= blockFor(table);
 +                return countLocalEndpoints(liveEndpoints) >= blockFor(keyspace);
              case EACH_QUORUM:
-                 for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet())
 -                if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
++                if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
                  {
-                     if (entry.getValue() < localQuorumFor(keyspace, entry.getKey()))
-                         return false;
 -                    for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
++                    for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet())
+                     {
 -                        if (entry.getValue() < localQuorumFor(table, entry.getKey()))
++                        if (entry.getValue() < localQuorumFor(keyspace, entry.getKey()))
+                             return false;
+                     }
+                     return true;
                  }
-                 return true;
+                 // Fallthough on purpose for SimpleStrategy
              default:
 -                return Iterables.size(liveEndpoints) >= blockFor(table);
 +                return Iterables.size(liveEndpoints) >= blockFor(keyspace);
          }
      }
  
@@@ -251,14 -268,18 +269,18 @@@
                  }
                  break;
              case EACH_QUORUM:
-                 for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet())
 -                if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
++                if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
                  {
-                     int dcBlockFor = localQuorumFor(keyspace, entry.getKey());
-                     int dcLive = entry.getValue();
-                     if (dcLive < dcBlockFor)
-                         throw new UnavailableException(this, dcBlockFor, dcLive);
 -                    for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
++                    for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet())
+                     {
 -                        int dcBlockFor = localQuorumFor(table, entry.getKey());
++                        int dcBlockFor = localQuorumFor(keyspace, entry.getKey());
+                         int dcLive = entry.getValue();
+                         if (dcLive < dcBlockFor)
+                             throw new UnavailableException(this, dcBlockFor, dcLive);
+                     }
+                     break;
                  }
-                 break;
+                 // Fallthough on purpose for SimpleStrategy
              default:
                  int live = Iterables.size(liveEndpoints);
                  if (live < blockFor)
@@@ -281,42 -302,8 +303,39 @@@
          }
      }
  
 -    public void validateForWrite(String table) throws InvalidRequestException
 +    public void validateForWrite(String keyspaceName) throws InvalidRequestException
 +    {
 +        switch (this)
 +        {
-             case EACH_QUORUM:
-                 requireNetworkTopologyStrategy(keyspaceName);
-                 break;
 +            case SERIAL:
 +            case LOCAL_SERIAL:
 +                throw new InvalidRequestException("You must use conditional updates for serializable writes");
 +        }
 +    }
 +
 +    // This is the same than validateForWrite really, but we include a slightly different error message for SERIAL/LOCAL_SERIAL
 +    public void validateForCasCommit(String keyspaceName) throws InvalidRequestException
 +    {
 +        switch (this)
 +        {
 +            case EACH_QUORUM:
 +                requireNetworkTopologyStrategy(keyspaceName);
 +                break;
 +            case SERIAL:
 +            case LOCAL_SERIAL:
 +                throw new InvalidRequestException(this + " is not supported as conditional update commit consistency. Use ANY if you mean \"make sure it is accepted but I don't care how many replicas commit it for non-SERIAL reads\"");
 +        }
 +    }
 +
 +    public void validateForCas() throws InvalidRequestException
      {
 +        if (!isSerialConsistency())
 +            throw new InvalidRequestException("Invalid consistency for conditional update. Must be one of SERIAL or LOCAL_SERIAL");
 +    }
 +
 +    public boolean isSerialConsistency()
 +    {
 +        return this == SERIAL || this == LOCAL_SERIAL;
      }
  
      public void validateCounterForWrite(CFMetaData metadata) throws InvalidRequestException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fd7db6/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index f4a2662,e4dd422..df33813
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@@ -132,20 -132,20 +132,20 @@@ public abstract class AbstractReplicati
          if (consistency_level.isDatacenterLocal())
          {
              // block for in this context will be localnodes block.
 -            return new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
 +            return new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType);
          }
-         else if (consistency_level == ConsistencyLevel.EACH_QUORUM)
+         else if (consistency_level == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
          {
 -            return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
 +            return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType);
          }
 -        return new WriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
 +        return new WriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType);
      }
  
 -    private Table getTable()
 +    private Keyspace getKeyspace()
      {
 -        if (table == null)
 -            table = Table.open(tableName);
 -        return table;
 +        if (keyspace == null)
 +            keyspace = Keyspace.open(keyspaceName);
 +        return keyspace;
      }
  
      /**