You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/11 01:24:36 UTC
svn commit: r1134475 - in /cassandra/branches/cassandra-0.8: CHANGES.txt
src/java/org/apache/cassandra/cql/QueryProcessor.java
Author: jbellis
Date: Fri Jun 10 23:24:36 2011
New Revision: 1134475
URL: http://svn.apache.org/viewvc?rev=1134475&view=rev
Log:
QueryProcessor handles wait-for-schema-agreement
patch by pyaskevich; reviewed by jbellis for CASSANDRA-2756
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1134475&r1=1134474&r2=1134475&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Fri Jun 10 23:24:36 2011
@@ -9,6 +9,7 @@
- ALTER COLUMNFAMILY (CASSANDRA-1709)
- DROP INDEX (CASSANDRA-2617)
- add SCHEMA/TABLE as aliases for KS/CF (CASSANDRA-2743)
+ - server handles wait-for-schema-agreement (CASSANDRA-2756)
* add support for comparator parameters and a generic ReverseType
(CASSANDRA-2355)
* add CompositeType and DynamicCompositeType (CASSANDRA-2231)
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1134475&r1=1134474&r2=1134475&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java Fri Jun 10 23:24:36 2011
@@ -63,6 +63,8 @@ public class QueryProcessor
{
private static final Logger logger = LoggerFactory.getLogger(QueryProcessor.class);
+ private static final long timeLimitForSchemaAgreement = 10 * 1000;
+
private static List<org.apache.cassandra.db.Row> getSlice(String keyspace, SelectStatement select)
throws InvalidRequestException, TimedOutException, UnavailableException
{
@@ -343,9 +345,9 @@ public class QueryProcessor
throw new InvalidRequestException("No indexed columns present in by-columns clause with \"equals\" operator");
}
}
-
+
// Copypasta from o.a.c.thrift.CassandraDaemon
- private static void applyMigrationOnStage(final Migration m) throws InvalidRequestException
+ private static void applyMigrationOnStage(final Migration m) throws SchemaDisagreementException, InvalidRequestException
{
Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(new Callable<Object>()
{
@@ -380,6 +382,8 @@ public class QueryProcessor
throw ex;
}
}
+
+ validateSchemaIsSettled();
}
public static void validateKey(ByteBuffer key) throws InvalidRequestException
@@ -463,13 +467,17 @@ public class QueryProcessor
// Copypasta from CassandraServer (where it is private).
private static void validateSchemaAgreement() throws SchemaDisagreementException
{
- // unreachable hosts don't count towards disagreement
- Map<String, List<String>> versions = Maps.filterKeys(StorageProxy.describeSchemaVersions(),
- Predicates.not(Predicates.equalTo(StorageProxy.UNREACHABLE)));
- if (versions.size() > 1)
+ if (describeSchemaVersions().size() > 1)
throw new SchemaDisagreementException();
}
+ private static Map<String, List<String>> describeSchemaVersions()
+ {
+ // unreachable hosts don't count towards disagreement
+ return Maps.filterKeys(StorageProxy.describeSchemaVersions(),
+ Predicates.not(Predicates.equalTo(StorageProxy.UNREACHABLE)));
+ }
+
public static CqlResult process(String queryString, ClientState clientState)
throws RecognitionException, UnavailableException, InvalidRequestException, TimedOutException, SchemaDisagreementException
{
@@ -940,4 +948,25 @@ public class QueryProcessor
return statement;
}
+
+ private static void validateSchemaIsSettled() throws SchemaDisagreementException
+ {
+ long limit = System.currentTimeMillis() + timeLimitForSchemaAgreement;
+
+ outer:
+ while (limit - System.currentTimeMillis() >= 0)
+ {
+ String currentVersionId = DatabaseDescriptor.getDefsVersion().toString();
+ for (String version : describeSchemaVersions().keySet())
+ {
+ if (!version.equals(currentVersionId))
+ continue outer;
+ }
+
+ // schemas agree
+ return;
+ }
+
+ throw new SchemaDisagreementException();
+ }
}