You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2010/10/31 15:35:37 UTC
svn commit: r1029356 - in /cassandra/trunk: interface/
src/java/org/apache/cassandra/avro/ src/java/org/apache/cassandra/cql/
Author: eevans
Date: Sun Oct 31 14:35:36 2010
New Revision: 1029356
URL: http://svn.apache.org/viewvc?rev=1029356&view=rev
Log:
(mostly )working SELECT; (barely )working UPDATE
Patch by eevans
Added:
cassandra/trunk/src/java/org/apache/cassandra/cql/
cassandra/trunk/src/java/org/apache/cassandra/cql/CQLStatement.java
cassandra/trunk/src/java/org/apache/cassandra/cql/Client.java
cassandra/trunk/src/java/org/apache/cassandra/cql/Column.java
cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g
cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java
cassandra/trunk/src/java/org/apache/cassandra/cql/Relation.java
cassandra/trunk/src/java/org/apache/cassandra/cql/Row.java
cassandra/trunk/src/java/org/apache/cassandra/cql/SelectExpression.java
cassandra/trunk/src/java/org/apache/cassandra/cql/SelectStatement.java
cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java
cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java
cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java
Modified:
cassandra/trunk/interface/cassandra.genavro
cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
Modified: cassandra/trunk/interface/cassandra.genavro
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.genavro?rev=1029356&r1=1029355&r2=1029356&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.genavro (original)
+++ cassandra/trunk/interface/cassandra.genavro Sun Oct 31 14:35:36 2010
@@ -375,4 +375,29 @@ protocol Cassandra {
KeyRange range,
ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException;
+
+ enum Compression {
+ GZIP
+ }
+
+ enum CqlResultType {
+ ROWS, VOID
+ }
+
+ record CqlRow {
+ bytes key;
+ array<Column> columns;
+ }
+
+ record CqlResult {
+ CqlResultType type;
+ union { array<CqlRow>, null } rows;
+ }
+
+ /**
+ * Executes a CQL (Cassandra Query Language) statement and returns a
+ * CqlResult containing the results.
+ */
+ CqlResult execute_cql_query(bytes query, Compression compression)
+ throws InvalidRequestException, UnavailableException, TimedOutException;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java?rev=1029356&r1=1029355&r2=1029356&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java Sun Oct 31 14:35:36 2010
@@ -48,7 +48,7 @@ import static org.apache.cassandra.avro.
*/
public class AvroValidation
{
- static void validateKey(ByteBuffer key) throws InvalidRequestException
+ public static void validateKey(ByteBuffer key) throws InvalidRequestException
{
if (key == null || key.remaining() == 0)
throw newInvalidRequestException("Key may not be empty");
Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=1029356&r1=1029355&r2=1029356&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Sun Oct 31 14:35:36 2010
@@ -21,6 +21,7 @@ package org.apache.cassandra.avro;
*/
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -34,7 +35,10 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+import org.antlr.runtime.RecognitionException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
@@ -59,6 +63,7 @@ import org.apache.cassandra.config.Colum
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.cql.QueryProcessor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.MarshalException;
@@ -1161,4 +1166,47 @@ public class CassandraServer implements
}
return null;
}
+
+ @Override
+ public CqlResult execute_cql_query(ByteBuffer query, Compression compression)
+ throws UnavailableException, InvalidRequestException, TimedOutException
+ {
+ String queryString = null;
+
+ // Decompress the query string.
+ try
+ {
+ switch (compression)
+ {
+ case GZIP:
+ Inflater decompressor = new Inflater();
+ decompressor.setInput(query.array(), 0, query.array().length);
+
+ byte[] decompressedBytes = new byte[100];
+ int length = decompressor.inflate(decompressedBytes);
+ decompressor.end();
+
+ queryString = new String(decompressedBytes, 0, length, "UTF-8");
+ }
+ }
+ catch (DataFormatException e)
+ {
+ throw newInvalidRequestException("Error deflating query string.");
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw newInvalidRequestException("Unknown query string encoding.");
+ }
+
+ try
+ {
+ return QueryProcessor.process(queryString, state().getKeyspace());
+ }
+ catch (RecognitionException e)
+ {
+ InvalidRequestException badQuery = newInvalidRequestException("Invalid or malformed CQL query string");
+ badQuery.initCause(e);
+ throw badQuery;
+ }
+ }
}
Added: cassandra/trunk/src/java/org/apache/cassandra/cql/CQLStatement.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/CQLStatement.java?rev=1029356&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/CQLStatement.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/CQLStatement.java Sun Oct 31 14:35:36 2010
@@ -0,0 +1,13 @@
+package org.apache.cassandra.cql;
+
+public class CQLStatement
+{
+ public StatementType type;
+ public Object statement;
+
+ public CQLStatement(StatementType type, Object statement)
+ {
+ this.type = type;
+ this.statement = statement;
+ }
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/cql/Client.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Client.java?rev=1029356&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/Client.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/Client.java Sun Oct 31 14:35:36 2010
@@ -0,0 +1,50 @@
+package org.apache.cassandra.cql;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.zip.Deflater;
+
+import org.apache.avro.ipc.HttpTransceiver;
+import org.apache.avro.specific.SpecificRequestor;
+import org.apache.cassandra.avro.Cassandra;
+import org.apache.cassandra.avro.Column;
+import org.apache.cassandra.avro.Compression;
+import org.apache.cassandra.avro.CqlResult;
+import org.apache.cassandra.avro.CqlRow;
+
+public class Client
+{
+ public static void main(String[] args) throws IOException
+ {
+ // Remote setup
+ String host = "localhost", keyspace = "Keyspace1";
+ int port = 9160;
+
+ HttpTransceiver tr = new HttpTransceiver(new URL("http", host, port, ""));
+ Cassandra client = (Cassandra)SpecificRequestor.getClient(Cassandra.class, tr);
+ client.set_keyspace(keyspace);
+
+ // Query compression
+ String query = "SELECT FROM Standard2 USING CONSISTENCY.ONE WHERE KEY = \"eevans\" AND COL < \"age\" COLLIMIT 2 ASC;";
+ Deflater compressor = new Deflater();
+ compressor.setInput(query.getBytes());
+ compressor.finish();
+ byte[] output = new byte[100];
+ System.out.println("Query compressed from " + query.length() + " bytes, to " + compressor.deflate(output) + " bytes");
+
+ CqlResult res = client.execute_cql_query(ByteBuffer.wrap(output), Compression.GZIP);
+
+ switch (res.type)
+ {
+ case ROWS:
+ for (CqlRow row : res.rows)
+ {
+ System.out.println("Key = " + new String(row.key.array()));
+ for (Column col : row.columns)
+ System.out.println(" Col = " + new String(col.name.array()) + "/" + new String(col.value.array()));
+ }
+ }
+
+ }
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/cql/Column.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Column.java?rev=1029356&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/Column.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/Column.java Sun Oct 31 14:35:36 2010
@@ -0,0 +1,23 @@
+package org.apache.cassandra.cql;
+
+public class Column
+{
+ private final Term name;
+ private final Term value;
+
+ public Column(Term name, Term value)
+ {
+ this.name = name;
+ this.value = value;
+ }
+
+ public Term getName()
+ {
+ return name;
+ }
+
+ public Term getValue()
+ {
+ return value;
+ }
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g?rev=1029356&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g Sun Oct 31 14:35:36 2010
@@ -0,0 +1,199 @@
+grammar Cql;
+
+options {
+ language = Java;
+}
+
+@header {
+ package org.apache.cassandra.cql;
+ import org.apache.cassandra.thrift.ConsistencyLevel;
+}
+
+@lexer::header {
+ package org.apache.cassandra.cql;
+}
+
+query returns [CQLStatement stmnt]
+ : selectStatement { $stmnt = new CQLStatement(StatementType.SELECT, $selectStatement.expr); }
+ | updateStatement { $stmnt = new CQLStatement(StatementType.UPDATE, $updateStatement.expr); }
+ ;
+
+/**
+ * SELECT FROM
+ * <CF>
+ * USING
+ * CONSISTENCY.ONE
+ * WHERE
+ * KEY = "key1" AND KEY = "key2" AND
+ * COL > 1 AND COL < 100
+ * COLLIMIT 10 DESC;
+ */
+selectStatement returns [SelectStatement expr]
+ : {
+ int numRecords = Integer.MAX_VALUE;
+ int numColumns = Integer.MAX_VALUE;
+ boolean reversed = false;
+ ConsistencyLevel cLevel = ConsistencyLevel.ONE;
+ }
+ K_SELECT K_FROM? IDENT
+ (K_USING K_CONSISTENCY '.' K_LEVEL { cLevel = ConsistencyLevel.valueOf($K_LEVEL.text); })?
+ K_WHERE selectExpression
+ (limit=(K_ROWLIMIT | K_COLLIMIT) value=INTEGER
+ {
+ int count = Integer.parseInt($value.text);
+ if ($limit.type == K_ROWLIMIT)
+ numRecords = count;
+ else
+ numColumns = count;
+ }
+ )*
+ order=(K_ASC | K_DESC { reversed = true; })? ';'
+ {
+ return new SelectStatement($IDENT.text,
+ cLevel,
+ $selectExpression.expr,
+ numRecords,
+ numColumns,
+ reversed);
+ }
+ ;
+
+/**
+ * UPDATE
+ * <CF>
+ * USING
+ * CONSISTENCY.ONE
+ * WITH
+ * ROW("key1", COL("col1", "val1"), ...) AND
+ * ROW("key2", COL("col1", "val1"), ...) AND
+ * ROW("key3", COLUMN("col1", "val1"), ...)
+ */
+updateStatement returns [UpdateStatement expr]
+ : { ConsistencyLevel cLevel = ConsistencyLevel.ONE; }
+ K_UPDATE IDENT
+ (K_USING K_CONSISTENCY '.' K_LEVEL { cLevel = ConsistencyLevel.valueOf($K_LEVEL.text); })?
+ K_WITH first=rowDef { $expr = new UpdateStatement($IDENT.text, first, cLevel); }
+ (K_AND next=rowDef { $expr.and(next); })* ';'
+ ;
+
+// TODO: date/time, utf8
+term returns [Term item]
+ : ( t=STRING_LITERAL | t=LONG )
+ { $item = new Term($t.text, $t.type); }
+ ;
+
+// Note: slices are inclusive so >= and >, and < and <= all have the same semantics.
+relation returns [Relation rel]
+ : kind=(K_KEY | K_COLUMN) type=('=' | '<' | '<=' | '>=' | '>') t=term
+ { return new Relation($kind.text, $type.text, $t.item); }
+ ;
+
+// relation [[AND relation] ...]
+selectExpression returns [SelectExpression expr]
+ : first=relation { $expr = new SelectExpression(first); }
+ (K_AND next=relation { $expr.and(next); })*
+ ;
+
+columnDef returns [Column column]
+ : K_COLUMN '(' n=term ',' v=term ')' { $column = new Column($n.item, $v.item); }
+ ;
+
+rowDef returns [Row row]
+ : K_ROW '(' key=term ',' first=columnDef { $row = new Row($key.item, first); }
+ (',' next=columnDef { $row.and(next); })* ')'
+ ;
+
+// Case-insensitive keywords
+K_SELECT: S E L E C T;
+K_FROM: F R O M;
+K_WHERE: W H E R E;
+K_AND: A N D;
+K_KEY: K E Y;
+K_COLUMN: C O L (U M N)?;
+K_UPDATE: U P D A T E;
+K_WITH: W I T H;
+K_ROW: R O W;
+K_ROWLIMIT: R O W L I M I T;
+K_COLLIMIT: C O L L I M I T;
+K_ASC: A S C (E N D I N G)?;
+K_DESC: D E S C (E N D I N G)?;
+K_USING: U S I N G;
+K_CONSISTENCY: C O N S I S T E N C Y;
+K_LEVEL: ( Z E R O
+ | O N E
+ | Q U O R U M
+ | A L L
+ | D C Q U O R U M
+ | D C Q U O R U M S Y N C
+ )
+ ;
+
+// Case-insensitive alpha characters
+fragment A: ('a'|'A');
+fragment B: ('b'|'B');
+fragment C: ('c'|'C');
+fragment D: ('d'|'D');
+fragment E: ('e'|'E');
+fragment F: ('f'|'F');
+fragment G: ('g'|'G');
+fragment H: ('h'|'H');
+fragment I: ('i'|'I');
+fragment J: ('j'|'J');
+fragment K: ('k'|'K');
+fragment L: ('l'|'L');
+fragment M: ('m'|'M');
+fragment N: ('n'|'N');
+fragment O: ('o'|'O');
+fragment P: ('p'|'P');
+fragment Q: ('q'|'Q');
+fragment R: ('r'|'R');
+fragment S: ('s'|'S');
+fragment T: ('t'|'T');
+fragment U: ('u'|'U');
+fragment V: ('v'|'V');
+fragment W: ('w'|'W');
+fragment X: ('x'|'X');
+fragment Y: ('y'|'Y');
+fragment Z: ('z'|'Z');
+
+STRING_LITERAL
+ : '"'
+ { StringBuilder b = new StringBuilder(); }
+ ( c=~('"'|'\r'|'\n') { b.appendCodePoint(c);}
+ | '"' '"' { b.appendCodePoint('"');}
+ )*
+ '"'
+ { setText(b.toString()); }
+ ;
+
+fragment DIGIT
+ : '0'..'9'
+ ;
+
+fragment LETTER
+ : ('A'..'Z' | 'a'..'z')
+ ;
+
+INTEGER
+ : DIGIT+
+ ;
+
+LONG
+ : INTEGER 'L' { setText($INTEGER.text); }
+ ;
+
+IDENT
+ : LETTER (LETTER | DIGIT)*
+ ;
+
+WS
+ : (' ' | '\t' | '\n' | '\r')+ { $channel = HIDDEN; }
+ ;
+
+COMMENT
+ : ('--' | '//') .* ('\n'|'\r') { $channel = HIDDEN; }
+ ;
+
+MULTILINE_COMMENT
+ : '/*' .* '*/' { $channel = HIDDEN; }
+ ;
Added: cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1029356&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java Sun Oct 31 14:35:36 2010
@@ -0,0 +1,222 @@
+
+package org.apache.cassandra.cql;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import org.antlr.runtime.ANTLRStringStream;
+import org.antlr.runtime.CharStream;
+import org.antlr.runtime.CommonTokenStream;
+import org.antlr.runtime.RecognitionException;
+import org.antlr.runtime.TokenStream;
+import org.apache.cassandra.avro.Column;
+import org.apache.cassandra.avro.CqlResult;
+import org.apache.cassandra.avro.CqlResultType;
+import org.apache.cassandra.avro.CqlRow;
+import org.apache.cassandra.avro.InvalidRequestException;
+import org.apache.cassandra.avro.TimedOutException;
+import org.apache.cassandra.avro.UnavailableException;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.SliceByNamesReadCommand;
+import org.apache.cassandra.db.SliceFromReadCommand;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+
+import static org.apache.cassandra.avro.AvroValidation.validateKey;
+
+public class QueryProcessor
+{
+
+ public static Map<DecoratedKey<?>, ColumnFamily> readColumnFamily(List<ReadCommand> commands, ConsistencyLevel cLevel)
+ throws UnavailableException, InvalidRequestException, TimedOutException
+ {
+ Map<DecoratedKey<?>, ColumnFamily> columnFamilyKeyMap = new HashMap<DecoratedKey<?>, ColumnFamily>();
+ List<org.apache.cassandra.db.Row> rows;
+
+ try
+ {
+ rows = StorageProxy.readProtocol(commands, cLevel);
+ }
+ catch (TimeoutException e)
+ {
+ throw new TimedOutException();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (org.apache.cassandra.thrift.UnavailableException e)
+ {
+ UnavailableException error = new UnavailableException();
+ error.initCause(e);
+ throw error;
+ }
+ catch (org.apache.cassandra.thrift.InvalidRequestException e)
+ {
+ InvalidRequestException error = new InvalidRequestException();
+ error.initCause(e);
+ throw error;
+ }
+
+ for (org.apache.cassandra.db.Row row: rows)
+ columnFamilyKeyMap.put(row.key, row.cf);
+
+ return columnFamilyKeyMap;
+ }
+
+ public static CqlResult process(String queryString, String keyspace)
+ throws RecognitionException, UnavailableException, InvalidRequestException, TimedOutException
+ {
+ CqlParser parser = getParser(queryString);
+ CQLStatement statement = parser.query();
+
+ CqlResult avroResult = new CqlResult();
+
+ switch (statement.type)
+ {
+ case SELECT:
+ SelectStatement select = (SelectStatement)statement.statement;
+
+ QueryPath queryPath = new QueryPath(select.getColumnFamily());
+ List<ReadCommand> commands = new ArrayList<ReadCommand>();
+
+ List<CqlRow> avroRows = new ArrayList<CqlRow>();
+ avroResult.type = CqlResultType.ROWS;
+
+ // It's a multiget...
+ if (!select.getKeyPredicates().isRange())
+ {
+
+ for (Term keyName : select.getKeyPredicates().getTerms())
+ {
+ byte[] key = keyName.getBytes(); // FIXME: surely not good enough
+ validateKey(key);
+
+ // ...of a list of column names
+ if (!select.getColumnPredicates().isRange())
+ {
+ Collection<byte[]> columnNames = new ArrayList<byte[]>();
+ for (Term column : select.getColumnPredicates().getTerms())
+ columnNames.add(column.getBytes()); // FIXME: surely not good enough
+
+ commands.add(new SliceByNamesReadCommand(keyspace, key, queryPath, columnNames));
+ }
+ // ...a range (slice) of column names
+ else
+ {
+ commands.add(new SliceFromReadCommand(keyspace,
+ key,
+ queryPath,
+ select.getColumnPredicates().getStart().getBytes(),
+ select.getColumnPredicates().getFinish().getBytes(),
+ select.reversed(),
+ select.getNumColumns()));
+ }
+
+ Map<DecoratedKey<?>, ColumnFamily> columnFamilies = readColumnFamily(commands,
+ select.getConsistencyLevel());
+ List<Column> avroColumns = new ArrayList<Column>();
+
+ for (ReadCommand cmd : commands)
+ {
+ ColumnFamily cf = columnFamilies.get(StorageService.getPartitioner().decorateKey(cmd.key));
+ // TODO: handle reversing order
+ for (IColumn column : cf.getSortedColumns())
+ {
+ Column avroColumn = new Column();
+ avroColumn.name = ByteBuffer.wrap(column.name());
+ avroColumn.value = ByteBuffer.wrap(column.value());
+ avroColumns.add(avroColumn);
+ }
+ }
+
+ // Create a new row, add the columns to it, and then add it to the list of rows
+ CqlRow avroRow = new CqlRow();
+ avroRow.key = ByteBuffer.wrap(key);
+ avroRow.columns = avroColumns;
+ avroRows.add(avroRow);
+ }
+ }
+ else // It is a range query (range of keys).
+ {
+
+ }
+
+ avroResult.rows = avroRows;
+ return avroResult;
+
+ case UPDATE:
+ UpdateStatement update = (UpdateStatement)statement.statement;
+ avroResult.type = CqlResultType.VOID;
+
+ List<RowMutation> rowMutations = new ArrayList<RowMutation>();
+
+ for (Row row : update.getRows())
+ {
+ RowMutation rm = new RowMutation(keyspace, row.getKey().getBytes());
+
+ for (org.apache.cassandra.cql.Column col : row.getColumns())
+ {
+ rm.add(new QueryPath(update.getColumnFamily(), null, col.getName().getBytes()),
+ col.getValue().getBytes(),
+ System.currentTimeMillis());
+ rowMutations.add(rm);
+ }
+ }
+
+ try
+ {
+ StorageProxy.mutate(rowMutations, update.getConsistencyLevel());
+ }
+ catch (org.apache.cassandra.thrift.UnavailableException e)
+ {
+ throw new UnavailableException();
+ }
+ catch (TimeoutException e)
+ {
+ throw new TimedOutException();
+ }
+
+ return avroResult;
+ }
+
+ return null; // We should never get here.
+ }
+
+ private static CqlParser getParser(String queryStr)
+ {
+ CharStream stream = new ANTLRStringStream(queryStr);
+ CqlLexer lexer = new CqlLexer(stream);
+ TokenStream tokenStream = new CommonTokenStream(lexer);
+ return new CqlParser(tokenStream);
+ }
+
+ public static void main(String[] args) throws RecognitionException
+ {
+ CqlParser parser = getParser("SElecT FRoM Standard1 where KEY > \"foo\" and key < \"fnord\" and COLUMN=\"bar\";");
+ CQLStatement statement = parser.query();
+
+ switch (statement.type)
+ {
+ case SELECT:
+ SelectStatement st = (SelectStatement)statement.statement;
+ System.out.println(st.getColumnFamily() + " " + st.getKeyPredicates().getStart().getText() +
+ " " + st.getColumnPredicates().getTerms() + " " + st.getKeyPredicates().isRange());
+ case UPDATE:
+ return;
+ }
+ }
+
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/cql/Relation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Relation.java?rev=1029356&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/Relation.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/Relation.java Sun Oct 31 14:35:36 2010
@@ -0,0 +1,67 @@
+package org.apache.cassandra.cql;
+
+/**
+ * Relations encapsulate the relationship between an entity and a value. For
+ * example, KEY > 'start' or COLUMN = 'somecolumn'.
+ *
+ * @author eevans
+ *
+ */
+public class Relation
+{
+ public Entity entity = Entity.COLUMN;
+ public RelationType type;
+ public Term value;
+
+ /**
+ * Creates a new relation.
+ *
+ * @param entity the kind of relation this is; what the value is compared to.
+ * @param type the type of relation; how how this entity relates to the value.
+ * @param value the value being compared to the entity.
+ */
+ public Relation(String entity, String type, Term value)
+ {
+ if (entity.toUpperCase().equals("KEY"))
+ this.entity = Entity.KEY;
+
+ this.type = RelationType.forString(type);
+ this.value = value;
+ }
+
+ public boolean isKey()
+ {
+ return entity.equals(Entity.KEY);
+ }
+
+ public boolean isColumn()
+ {
+ return entity.equals(Entity.COLUMN);
+ }
+}
+
+enum Entity
+{
+ KEY, COLUMN;
+}
+
+enum RelationType
+{
+ EQ, LT, LTE, GTE, GT;
+
+ public static RelationType forString(String s)
+ {
+ if (s.equals("="))
+ return EQ;
+ else if (s.equals("<"))
+ return LT;
+ else if (s.equals("<="))
+ return LTE;
+ else if (s.equals(">="))
+ return GTE;
+ else if (s.equals(">"))
+ return GT;
+
+ return null;
+ }
+}
\ No newline at end of file
Added: cassandra/trunk/src/java/org/apache/cassandra/cql/Row.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Row.java?rev=1029356&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/Row.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/Row.java Sun Oct 31 14:35:36 2010
@@ -0,0 +1,31 @@
+package org.apache.cassandra.cql;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Row
+{
+ private final Term key;
+ private List<Column> columns = new ArrayList<Column>();
+
+ public Row(Term key, Column firstColumn)
+ {
+ this.key = key;
+ columns.add(firstColumn);
+ }
+
+ public void and(Column col)
+ {
+ columns.add(col);
+ }
+
+ public Term getKey()
+ {
+ return key;
+ }
+
+ public List<Column> getColumns()
+ {
+ return columns;
+ }
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/cql/SelectExpression.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/SelectExpression.java?rev=1029356&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/SelectExpression.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/SelectExpression.java Sun Oct 31 14:35:36 2010
@@ -0,0 +1,114 @@
+package org.apache.cassandra.cql;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * SelectExpressions encapsulate all of the predicates of a SELECT query.
+ *
+ * @author eevans
+ *
+ */
+public class SelectExpression
+{
+ private Predicates keys = new Predicates();
+ private Predicates columns = new Predicates();
+
+ public SelectExpression(Relation firstRelation)
+ {
+ and(firstRelation);
+ }
+
+ public void and(Relation relation)
+ {
+ if (relation.isKey())
+ {
+ if (relation.type.equals(RelationType.EQ))
+ keys.addTerm(relation.value);
+ else if ((relation.type.equals(RelationType.GT) || relation.type.equals(RelationType.GTE)))
+ keys.setStart(relation.value);
+ else if ((relation.type.equals(RelationType.LT) || relation.type.equals(RelationType.LTE)))
+ keys.setFinish(relation.value);
+ }
+ else // It's a column
+ {
+ if (relation.type.equals(RelationType.EQ))
+ columns.addTerm(relation.value);
+ else if ((relation.type.equals(RelationType.GT) || relation.type.equals(RelationType.GTE)))
+ columns.setStart(relation.value);
+ else if ((relation.type.equals(RelationType.LT) || relation.type.equals(RelationType.LTE)))
+ columns.setFinish(relation.value);
+ }
+ }
+
+ public Predicates getKeyPredicates()
+ {
+ return keys;
+ }
+
+ public Predicates getColumnPredicates()
+ {
+ return columns;
+ }
+}
+
+class Predicates
+{
+ private boolean initialized = false;
+ private List<Term> names = new ArrayList<Term>();
+ private Term start, finish;
+ private boolean isRange = false;
+
+ Term getStart()
+ {
+ return start == null ? new Term() : start;
+ }
+
+ void setStart(Term start)
+ {
+ // FIXME: propagate a proper exception
+ if (initialized && (!isRange()))
+ throw new RuntimeException("You cannot combine discreet names and range operators.");
+
+ initialized = true;
+ isRange = true;
+ this.start = start;
+ }
+
+ Term getFinish()
+ {
+ return finish == null ? new Term() : finish;
+ }
+
+ void setFinish(Term finish)
+ {
+ // FIXME: propagate a proper exception
+ if (initialized && (!isRange()))
+ throw new RuntimeException("You cannot combine discreet names and range operators.");
+
+ initialized = true;
+ isRange = true;
+ this.finish = finish;
+ }
+
+ List<Term> getTerms()
+ {
+ return names;
+ }
+
+ void addTerm(Term name)
+ {
+ // FIXME: propagate a proper exception
+ if (initialized && (isRange()))
+ throw new RuntimeException("You cannot combine discreet names and range operators.");
+
+ initialized = true;
+ isRange = false;
+ names.add(name);
+ }
+
+ boolean isRange()
+ {
+ return isRange;
+ }
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/cql/SelectStatement.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/SelectStatement.java?rev=1029356&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/SelectStatement.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/SelectStatement.java Sun Oct 31 14:35:36 2010
@@ -0,0 +1,66 @@
+package org.apache.cassandra.cql;
+
+import org.apache.cassandra.thrift.ConsistencyLevel;
+
+/**
+ * Encapsulates a completely parsed SELECT query, including the target
+ * column family, expression, result count, and ordering clause.
+ *
+ * @author eevans
+ *
+ */
+public class SelectStatement
+{
+ private final String columnFamily;
+ private final ConsistencyLevel cLevel;
+ private final SelectExpression expression;
+ private final int numRecords;
+ private final int numColumns;
+ private final boolean reverse;
+
+ public SelectStatement(String columnFamily, ConsistencyLevel cLevel, SelectExpression expression,
+ int numRecords, int numColumns, boolean reverse)
+ {
+ this.columnFamily = columnFamily;
+ this.cLevel = cLevel;
+ this.expression = expression;
+ this.numRecords = numRecords;
+ this.numColumns = numColumns;
+ this.reverse = reverse;
+ }
+
+ public Predicates getKeyPredicates()
+ {
+ return expression.getKeyPredicates();
+ }
+
+ public Predicates getColumnPredicates()
+ {
+ return expression.getColumnPredicates();
+ }
+
+ public String getColumnFamily()
+ {
+ return columnFamily;
+ }
+
+ public boolean reversed()
+ {
+ return reverse;
+ }
+
+ public ConsistencyLevel getConsistencyLevel()
+ {
+ return cLevel;
+ }
+
+ public int getNumRecords()
+ {
+ return numRecords;
+ }
+
+ public int getNumColumns()
+ {
+ return numColumns;
+ }
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java?rev=1029356&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java Sun Oct 31 14:35:36 2010
@@ -0,0 +1,6 @@
+package org.apache.cassandra.cql;
+
+public enum StatementType
+{
+ SELECT, UPDATE;
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java?rev=1029356&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java Sun Oct 31 14:35:36 2010
@@ -0,0 +1,91 @@
+package org.apache.cassandra.cql;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Represents a term processed from a CQL query statement. Terms are things
+ * like strings, numbers, UUIDs, etc.
+ *
+ * @author eevans
+ *
+ */
+public class Term
+{
+ private final String text;
+ private final TermType type;
+
+ /**
+ * Create new Term instance from a string, and an integer that corresponds
+ * with the token ID from CQLParser.
+ *
+ * @param text the text representation of the term.
+ * @param type the term's type as an integer token ID.
+ */
+ public Term(String text, int type)
+ {
+ this.text = text;
+ this.type = TermType.forInt(type);
+ }
+
+ protected Term()
+ {
+ this.text = "";
+ this.type = TermType.STRING;
+ }
+
+ /**
+ * Get the text that was parsed to create this term.
+ *
+ * @return the string term as parsed from a CQL statement.
+ */
+ public String getText()
+ {
+ return text;
+ }
+
+ /**
+ * Get the typed value, serialized to a byte[].
+ *
+ * @return
+ */
+ public byte[] getBytes()
+ {
+ switch (type)
+ {
+ case STRING:
+ return text.getBytes();
+ case LONG:
+ return FBUtilities.toByteArray(Long.parseLong(text));
+ }
+
+ // FIXME: handle scenario that should never happen
+ return null;
+ }
+
+ /**
+ * Get the term's type.
+ *
+ * @return the type
+ */
+ public TermType getType()
+ {
+ return type;
+ }
+
+}
+
+enum TermType
+{
+ STRING, LONG;
+
+ static TermType forInt(int type)
+ {
+ if (type == CqlParser.STRING_LITERAL)
+ return STRING;
+ else if (type == CqlParser.LONG)
+ return LONG;
+
+ // FIXME: handled scenario that should never occur.
+ return null;
+ }
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java?rev=1029356&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java Sun Oct 31 14:35:36 2010
@@ -0,0 +1,40 @@
+package org.apache.cassandra.cql;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.thrift.ConsistencyLevel;
+
+public class UpdateStatement
+{
+ private String columnFamily;
+ private List<Row> rows = new ArrayList<Row>();
+ private ConsistencyLevel cLevel;
+
+ public UpdateStatement(String columnFamily, Row first, ConsistencyLevel cLevel)
+ {
+ this.columnFamily = columnFamily;
+ this.cLevel = cLevel;
+ and(first);
+ }
+
+ public void and(Row row)
+ {
+ rows.add(row);
+ }
+
+ public List<Row> getRows()
+ {
+ return rows;
+ }
+
+ public ConsistencyLevel getConsistencyLevel()
+ {
+ return cLevel;
+ }
+
+ public String getColumnFamily()
+ {
+ return columnFamily;
+ }
+}