You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/11 01:24:36 UTC

svn commit: r1134475 - in /cassandra/branches/cassandra-0.8: CHANGES.txt src/java/org/apache/cassandra/cql/QueryProcessor.java

Author: jbellis
Date: Fri Jun 10 23:24:36 2011
New Revision: 1134475

URL: http://svn.apache.org/viewvc?rev=1134475&view=rev
Log:
QueryProcessor handles wait-for-schema-agreement
patch by pyaskevich; reviewed by jbellis for CASSANDRA-2756

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1134475&r1=1134474&r2=1134475&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Fri Jun 10 23:24:36 2011
@@ -9,6 +9,7 @@
    - ALTER COLUMNFAMILY (CASSANDRA-1709)
    - DROP INDEX (CASSANDRA-2617)
    - add SCHEMA/TABLE as aliases for KS/CF (CASSANDRA-2743)
+   - server handles wait-for-schema-agreement (CASSANDRA-2756)
  * add support for comparator parameters and a generic ReverseType
    (CASSANDRA-2355)
  * add CompositeType and DynamicCompositeType (CASSANDRA-2231)

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1134475&r1=1134474&r2=1134475&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java Fri Jun 10 23:24:36 2011
@@ -63,6 +63,8 @@ public class QueryProcessor
 {
     private static final Logger logger = LoggerFactory.getLogger(QueryProcessor.class);
 
+    private static final long timeLimitForSchemaAgreement = 10 * 1000;
+
     private static List<org.apache.cassandra.db.Row> getSlice(String keyspace, SelectStatement select)
     throws InvalidRequestException, TimedOutException, UnavailableException
     {
@@ -343,9 +345,9 @@ public class QueryProcessor
             throw new InvalidRequestException("No indexed columns present in by-columns clause with \"equals\" operator");
         }
     }
-    
+
     // Copypasta from o.a.c.thrift.CassandraDaemon
-    private static void applyMigrationOnStage(final Migration m) throws InvalidRequestException
+    private static void applyMigrationOnStage(final Migration m) throws SchemaDisagreementException, InvalidRequestException
     {
         Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(new Callable<Object>()
         {
@@ -380,6 +382,8 @@ public class QueryProcessor
                 throw ex;
             }
         }
+
+        validateSchemaIsSettled();
     }
     
     public static void validateKey(ByteBuffer key) throws InvalidRequestException
@@ -463,13 +467,17 @@ public class QueryProcessor
     // Copypasta from CassandraServer (where it is private).
     private static void validateSchemaAgreement() throws SchemaDisagreementException
     {
-        // unreachable hosts don't count towards disagreement
-        Map<String, List<String>> versions = Maps.filterKeys(StorageProxy.describeSchemaVersions(),
-                                                             Predicates.not(Predicates.equalTo(StorageProxy.UNREACHABLE)));
-        if (versions.size() > 1)
+       if (describeSchemaVersions().size() > 1)
             throw new SchemaDisagreementException();
     }
 
+    private static Map<String, List<String>> describeSchemaVersions()
+    {
+        // unreachable hosts don't count towards disagreement
+        return Maps.filterKeys(StorageProxy.describeSchemaVersions(),
+                               Predicates.not(Predicates.equalTo(StorageProxy.UNREACHABLE)));
+    }
+
     public static CqlResult process(String queryString, ClientState clientState)
     throws RecognitionException, UnavailableException, InvalidRequestException, TimedOutException, SchemaDisagreementException
     {
@@ -940,4 +948,25 @@ public class QueryProcessor
         
         return statement;
     }
+
+    private static void validateSchemaIsSettled() throws SchemaDisagreementException
+    {
+        long limit = System.currentTimeMillis() + timeLimitForSchemaAgreement;
+
+        outer:
+        while (limit - System.currentTimeMillis() >= 0)
+        {
+            String currentVersionId = DatabaseDescriptor.getDefsVersion().toString();
+            for (String version : describeSchemaVersions().keySet())
+            {
+                if (!version.equals(currentVersionId))
+                    continue outer;
+            }
+
+            // schemas agree
+            return;
+        }
+
+        throw new SchemaDisagreementException();
+    }
 }