You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC
svn commit: r749218 [11/34] - in /incubator/cassandra: branches/ dist/
nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/
trunk/src/org/apache/ trunk/src/org/apache/cassandra/
trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...
Added: incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/Cql__.g
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/Cql__.g?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/Cql__.g (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/Cql__.g Mon Mar 2 07:57:22 2009
@@ -0,0 +1,100 @@
+lexer grammar Cql;
+@header {
+ package com.facebook.infrastructure.cql.compiler.parse;
+ }
+
+T47 : '=' ;
+T48 : '(' ;
+T49 : ')' ;
+T50 : '[' ;
+T51 : ']' ;
+T52 : '.' ;
+T53 : '?' ;
+
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 247
+K_BY: 'BY';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 248
+K_DELETE: 'DELETE';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 249
+K_EXPLAIN: 'EXPLAIN';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 250
+K_FROM: 'FROM';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 251
+K_GET: 'GET';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 252
+K_IN: 'IN';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 253
+K_LIMIT: 'LIMIT';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 254
+K_OFFSET: 'OFFSET';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 255
+K_ORDER: 'ORDER';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 256
+K_PLAN: 'PLAN';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 257
+K_SELECT: 'SELECT';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 258
+K_SET: 'SET';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 259
+K_WHERE: 'WHERE';
+
+// private syntactic rules
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 262
+fragment
+Letter
+ : 'a'..'z'
+ | 'A'..'Z'
+ ;
+
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 268
+fragment
+Digit
+ : '0'..'9'
+ ;
+
+// syntactic Elements
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 274
+Identifier
+ : Letter ( Letter | Digit | '_')*
+ ;
+
+//
+// Literals
+//
+
+// strings: escape single quote ' by repeating it '' (SQL style)
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 283
+StringLiteral
+ : '\'' (~'\'')* '\'' ( '\'' (~'\'')* '\'' )*
+ ;
+
+// integer literals
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 288
+IntegerLiteral
+ : Digit+
+ ;
+
+//
+// miscellaneous syntactic elements
+//
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 295
+WS
+ : (' '|'\r'|'\t'|'\n') {skip();} // whitepace
+ ;
+
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 299
+COMMENT
+ : '--' (~('\n'|'\r'))* { $channel=HIDDEN; }
+ | '/*' (options {greedy=false;} : .)* '*/' { $channel=HIDDEN; }
+ ;
+
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 304
+ASSOC: '=>';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 305
+COMMA: ',';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 306
+LEFT_BRACE: '{';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 307
+RIGHT_BRACE: '}';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 308
+SEMICOLON: ';';
Added: incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/ParseError.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/ParseError.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/ParseError.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/ParseError.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.compiler.parse;
+
+import org.antlr.runtime.*;
+
+public class ParseError {
+ private BaseRecognizer br;
+ private RecognitionException re;
+ private String[] tokenNames;
+
+ public ParseError(BaseRecognizer br, RecognitionException re, String[] tokenNames) {
+ this.br = br;
+ this.re = re;
+ this.tokenNames = tokenNames;
+ }
+
+ public BaseRecognizer getBaseRecognizer() {
+ return br;
+ }
+
+ public RecognitionException getRecognitionException() {
+ return re;
+ }
+
+ public String[] getTokenNames() {
+ return tokenNames;
+ }
+
+ public String getMessage() {
+ return br.getErrorHeader(re) + " " + br.getErrorMessage(re, tokenNames);
+ }
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/ParseException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/ParseException.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/ParseException.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/ParseException.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.compiler.parse;
+
+/**
+ * Exception from the CQL Parser
+ */
+
+import java.util.ArrayList;
+
+public class ParseException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+ ArrayList<ParseError> errors = null;
+
+ public ParseException(ArrayList<ParseError> errors)
+ {
+ super();
+ this.errors = errors;
+ }
+
+ public ParseException(String message)
+ {
+ super(message);
+ }
+
+ public String getMessage() {
+
+ if (errors == null)
+ return super.getMessage();
+
+ StringBuilder sb = new StringBuilder();
+ for(ParseError err: errors) {
+ sb.append(err.getMessage());
+ sb.append("\n");
+ }
+
+ return sb.toString();
+ }
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/sem/SemanticException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/sem/SemanticException.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/sem/SemanticException.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/sem/SemanticException.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.compiler.sem;
+
+
+/**
+ * Exception from the CQL SemanticAnalyzer
+ */
+
+public class SemanticException extends Exception
+{
+ private static final long serialVersionUID = 1L;
+
+ public SemanticException()
+ {
+ super();
+ }
+
+ public SemanticException(String message)
+ {
+ super(message);
+ }
+
+ public SemanticException(Throwable cause)
+ {
+ super(cause);
+ }
+
+ public SemanticException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/sem/SemanticPhase.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/sem/SemanticPhase.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/sem/SemanticPhase.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/sem/SemanticPhase.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.compiler.sem;
+
+import java.util.Map;
+
+import org.antlr.runtime.tree.CommonTree;
+
+import org.apache.cassandra.cql.common.*;
+import org.apache.cassandra.cql.compiler.common.*;
+import org.apache.cassandra.cql.compiler.parse.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql.common.ColumnMapExpr;
+import org.apache.cassandra.cql.common.ColumnRangeQueryRSD;
+import org.apache.cassandra.cql.common.ConstantOperand;
+import org.apache.cassandra.cql.common.ExplainPlan;
+import org.apache.cassandra.cql.common.OperandDef;
+import org.apache.cassandra.cql.common.Pair;
+import org.apache.cassandra.cql.common.Plan;
+import org.apache.cassandra.cql.common.QueryPlan;
+import org.apache.cassandra.cql.common.RowSourceDef;
+import org.apache.cassandra.cql.common.SetColumnMap;
+import org.apache.cassandra.cql.common.SetSuperColumnMap;
+import org.apache.cassandra.cql.common.SetUniqueKey;
+import org.apache.cassandra.cql.common.SuperColumnMapExpr;
+import org.apache.cassandra.cql.common.SuperColumnRangeQueryRSD;
+import org.apache.cassandra.cql.common.UniqueKeyQueryRSD;
+import org.apache.cassandra.cql.common.Utils;
+import org.apache.cassandra.cql.compiler.common.CompilerErrorMsg;
+import org.apache.cassandra.cql.compiler.parse.CqlParser;
+import org.apache.log4j.Logger;
+
+//
+// Note: This class is CQL related work in progress.
+//
+// Currently, this phase combines both semantic analysis and code-gen.
+// I expect that as my ideas get refined/cleared up, I'll be drawing
+// a more clear distinction between semantic analysis phase and code-gen.
+//
+public class SemanticPhase
+{
+ private final static Logger logger_ = Logger.getLogger(SemanticPhase.class);
+
+ // Current code-gen also happens in this phase!
+ public static Plan doSemanticAnalysis(CommonTree ast) throws SemanticException
+ {
+ Plan plan = null;
+
+ logger_.debug("AST: " + ast.toStringTree());
+
+ switch (ast.getType())
+ {
+ case CqlParser.A_GET:
+ plan = compileGet(ast);
+ break;
+ case CqlParser.A_SET:
+ plan = compileSet(ast);
+ break;
+ case CqlParser.A_DELETE:
+ compileDelete(ast);
+ break;
+ case CqlParser.A_SELECT:
+ compileSelect(ast);
+ break;
+ case CqlParser.A_EXPLAIN_PLAN:
+ // Case: EXPLAN PLAN <stmt>
+ // first, generate a plan for <stmt>
+ // and then, wrapper it with a special ExplainPlan plan
+ // whose execution will result in an explain plan rather
+ // than a normal execution of the statement.
+ plan = doSemanticAnalysis((CommonTree)(ast.getChild(0)));
+ plan = new ExplainPlan(plan);
+ break;
+ default:
+ // Unhandled AST node. Raise an internal error.
+ throw new SemanticException(CompilerErrorMsg.INTERNAL_ERROR.getMsg(ast, "Unknown Node Type: " + ast.getType()));
+ }
+ return plan;
+ }
+
+ /**
+ * Given a CommonTree AST node of type, A_COLUMN_ACCESS related functions, do semantic
+ * checking to ensure table name, column family name, and number of key dimensions
+ * specified are all valid.
+ */
+ private static CFMetaData getColumnFamilyInfo(CommonTree ast) throws SemanticException
+ {
+ assert(ast.getType() == CqlParser.A_COLUMN_ACCESS);
+
+ CommonTree columnFamilyNode = (CommonTree)(ast.getChild(1));
+ CommonTree tableNode = (CommonTree)(ast.getChild(0));
+
+ String columnFamily = columnFamilyNode.getText();
+ String table = tableNode.getText();
+
+ Map<String, CFMetaData> columnFamilies = DatabaseDescriptor.getTableMetaData(table);
+ if (columnFamilies == null)
+ {
+ throw new SemanticException(CompilerErrorMsg.INVALID_TABLE.getMsg(ast, table));
+ }
+
+ CFMetaData cfMetaData = columnFamilies.get(columnFamily);
+ if (cfMetaData == null)
+ {
+ throw new SemanticException(CompilerErrorMsg.INVALID_COLUMN_FAMILY.getMsg(ast, columnFamily, table));
+ }
+
+ // Once you have drilled down to a row using a rowKey, a super column
+ // map can be indexed only 2 further levels deep; and a column map may
+ // be indexed up to 1 level deep.
+ int dimensions = numColumnDimensions(ast);
+ if (("Super".equals(cfMetaData.columnType) && (dimensions > 2)) ||
+ ("Standard".equals(cfMetaData.columnType) && dimensions > 1))
+ {
+ throw new SemanticException(CompilerErrorMsg.TOO_MANY_DIMENSIONS.getMsg(ast, cfMetaData.columnType));
+ }
+
+ return cfMetaData;
+ }
+
+ private static String getRowKey(CommonTree ast)
+ {
+ assert(ast.getType() == CqlParser.A_COLUMN_ACCESS);
+ return Utils.unescapeSQLString(ast.getChild(2).getText());
+ }
+
+ private static int numColumnDimensions(CommonTree ast)
+ {
+ // Skip over table name, column family and rowKey
+ return ast.getChildCount() - 3;
+ }
+
+ // Returns the pos'th (0-based index) column specifier in the astNode
+ private static String getColumn(CommonTree ast, int pos)
+ {
+ // Skip over table name, column family and rowKey
+ return Utils.unescapeSQLString(ast.getChild(pos + 3).getText());
+ }
+
+ // Compile a GET statement
+ private static Plan compileGet(CommonTree ast) throws SemanticException
+ {
+ int childCount = ast.getChildCount();
+ assert(childCount == 1);
+
+ CommonTree columnFamilySpec = (CommonTree)ast.getChild(0);
+ assert(columnFamilySpec.getType() == CqlParser.A_COLUMN_ACCESS);
+
+ CFMetaData cfMetaData = getColumnFamilyInfo(columnFamilySpec);
+ ConstantOperand rowKey = new ConstantOperand(getRowKey(columnFamilySpec));
+ int dimensionCnt = numColumnDimensions(columnFamilySpec);
+
+ RowSourceDef rwsDef;
+ if ("Super".equals(cfMetaData.columnType))
+ {
+ if (dimensionCnt > 2)
+ {
+ // We don't expect this case to arise, since Cql.g grammar disallows this.
+ // therefore, raise this case as an "internal error".
+ throw new SemanticException(CompilerErrorMsg.INTERNAL_ERROR.getMsg(columnFamilySpec));
+ }
+
+ if (dimensionCnt == 2)
+ {
+ // Case: table.super_cf[<rowKey>][<superColumnKey>][<columnKey>]
+ ConstantOperand superColumnKey = new ConstantOperand(getColumn(columnFamilySpec, 0));
+ ConstantOperand columnKey = new ConstantOperand(getColumn(columnFamilySpec, 1));
+ rwsDef = new UniqueKeyQueryRSD(cfMetaData, rowKey, superColumnKey, columnKey);
+ }
+ else if (dimensionCnt == 1)
+ {
+ // Case: table.super_cf[<rowKey>][<superColumnKey>]
+ ConstantOperand superColumnKey = new ConstantOperand(getColumn(columnFamilySpec, 0));
+ rwsDef = new ColumnRangeQueryRSD(cfMetaData, rowKey, superColumnKey, -1, Integer.MAX_VALUE);
+ }
+ else
+ {
+ // Case: table.super_cf[<rowKey>]
+ rwsDef = new SuperColumnRangeQueryRSD(cfMetaData, rowKey, -1, Integer.MAX_VALUE);
+ }
+ }
+ else // Standard Column Family
+ {
+ if (dimensionCnt == 1)
+ {
+ // Case: table.standard_cf[<rowKey>][<columnKey>]
+ ConstantOperand columnKey = new ConstantOperand(getColumn(columnFamilySpec, 0));
+ rwsDef = new UniqueKeyQueryRSD(cfMetaData, rowKey, columnKey);
+ }
+ else
+ {
+ // Case: table.standard_cf[<rowKey>]
+ logger_.assertLog((dimensionCnt == 0), "invalid dimensionCnt: " + dimensionCnt);
+ rwsDef = new ColumnRangeQueryRSD(cfMetaData, rowKey, -1, Integer.MAX_VALUE);
+ }
+ }
+ return new QueryPlan(rwsDef);
+ }
+
+ private static OperandDef getSimpleExpr(CommonTree ast) throws SemanticException
+ {
+ int type = ast.getType();
+
+ // for now, the only simple expressions support are of string type
+ if (type != CqlParser.StringLiteral)
+ {
+ throw new SemanticException(CompilerErrorMsg.INVALID_TYPE.getMsg(ast));
+ }
+ return new ConstantOperand(Utils.unescapeSQLString(ast.getText()));
+ }
+
+ private static ColumnMapExpr getColumnMapExpr(CommonTree ast) throws SemanticException
+ {
+ int type = ast.getType();
+ if (type != CqlParser.A_COLUMN_MAP_VALUE)
+ {
+ throw new SemanticException(CompilerErrorMsg.INVALID_TYPE.getMsg(ast));
+ }
+
+ int size = ast.getChildCount();
+ ColumnMapExpr result = new ColumnMapExpr();
+ for (int idx = 0; idx < size; idx++)
+ {
+ CommonTree entryNode = (CommonTree)(ast.getChild(idx));
+ OperandDef columnKey = getSimpleExpr((CommonTree)(entryNode.getChild(0)));
+ OperandDef columnValue = getSimpleExpr((CommonTree)(entryNode.getChild(1)));
+
+ Pair<OperandDef, OperandDef> entry = new Pair<OperandDef, OperandDef>(columnKey, columnValue);
+ result.add(entry);
+ }
+ return result;
+ }
+
+ private static SuperColumnMapExpr getSuperColumnMapExpr(CommonTree ast) throws SemanticException
+ {
+ int type = ast.getType();
+ if (type != CqlParser.A_SUPERCOLUMN_MAP_VALUE)
+ {
+ throw new SemanticException(CompilerErrorMsg.INVALID_TYPE.getMsg(ast));
+ }
+ int size = ast.getChildCount();
+ SuperColumnMapExpr result = new SuperColumnMapExpr();
+ for (int idx = 0; idx < size; idx++)
+ {
+ CommonTree entryNode = (CommonTree)(ast.getChild(idx));
+ OperandDef superColumnKey = getSimpleExpr((CommonTree)(entryNode.getChild(0)));
+ ColumnMapExpr columnMapExpr = getColumnMapExpr((CommonTree)(entryNode.getChild(1)));
+
+ Pair<OperandDef, ColumnMapExpr> entry = new Pair<OperandDef, ColumnMapExpr>(superColumnKey, columnMapExpr);
+ result.add(entry);
+ }
+ return result;
+ }
+
+ // compile a SET statement
+ private static Plan compileSet(CommonTree ast) throws SemanticException
+ {
+ int childCount = ast.getChildCount();
+ assert(childCount == 2);
+
+ CommonTree columnFamilySpec = (CommonTree)ast.getChild(0);
+ assert(columnFamilySpec.getType() == CqlParser.A_COLUMN_ACCESS);
+
+ CFMetaData cfMetaData = getColumnFamilyInfo(columnFamilySpec);
+ ConstantOperand rowKey = new ConstantOperand(getRowKey(columnFamilySpec));
+ int dimensionCnt = numColumnDimensions(columnFamilySpec);
+
+ CommonTree valueNode = (CommonTree)(ast.getChild(1));
+
+ Plan plan = null;
+ if ("Super".equals(cfMetaData.columnType))
+ {
+ if (dimensionCnt == 2)
+ {
+ // Case: set table.super_cf['key']['supercolumn']['column'] = 'value'
+ OperandDef value = getSimpleExpr(valueNode);
+ ConstantOperand superColumnKey = new ConstantOperand(getColumn(columnFamilySpec, 0));
+ ConstantOperand columnKey = new ConstantOperand(getColumn(columnFamilySpec, 1));
+ plan = new SetUniqueKey(cfMetaData, rowKey, superColumnKey, columnKey, value);
+ }
+ else if (dimensionCnt == 1)
+ {
+ // Case: set table.super_cf['key']['supercolumn'] = <column_map>;
+ ColumnMapExpr columnMapExpr = getColumnMapExpr(valueNode);
+ ConstantOperand superColumnKey = new ConstantOperand(getColumn(columnFamilySpec, 0));
+ plan = new SetColumnMap(cfMetaData, rowKey, superColumnKey, columnMapExpr);
+ }
+ else
+ {
+ // Case: set table.super_cf['key'] = <super_column_map>;
+ logger_.assertLog(dimensionCnt == 0, "invalid dimensionCnt: " + dimensionCnt);
+ SuperColumnMapExpr superColumnMapExpr = getSuperColumnMapExpr(valueNode);
+ plan = new SetSuperColumnMap(cfMetaData, rowKey, superColumnMapExpr);
+ }
+ }
+ else // Standard column family
+ {
+ if (dimensionCnt == 1)
+ {
+ // Case: set table.standard_cf['key']['column'] = 'value'
+ OperandDef value = getSimpleExpr(valueNode);
+ ConstantOperand columnKey = new ConstantOperand(getColumn(columnFamilySpec, 0));
+ plan = new SetUniqueKey(cfMetaData, rowKey, columnKey, value);
+ }
+ else
+ {
+ // Case: set table.standard_cf['key'] = <column_map>;
+ logger_.assertLog(dimensionCnt == 0, "invalid dimensionCnt: " + dimensionCnt);
+ ColumnMapExpr columnMapExpr = getColumnMapExpr(valueNode);
+ plan = new SetColumnMap(cfMetaData, rowKey, columnMapExpr);
+ }
+ }
+ return plan;
+ }
+
+ private static void compileSelect(CommonTree ast) throws SemanticException
+ {
+ // stub; tbd.
+ }
+ private static void compileDelete(CommonTree ast) throws SemanticException
+ {
+ // stub; tbd.
+ }
+}
\ No newline at end of file
Added: incubator/cassandra/trunk/src/org/apache/cassandra/cql/driver/CqlDriver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/driver/CqlDriver.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/driver/CqlDriver.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/driver/CqlDriver.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql.driver;
+
+import org.apache.cassandra.cql.compiler.common.*;
+import org.apache.cassandra.cql.compiler.parse.*;
+import org.apache.cassandra.cql.compiler.sem.*;
+import org.apache.cassandra.cql.common.*;
+import com.facebook.thrift.*;
+
+import org.apache.cassandra.cql.common.CqlResult;
+import org.apache.cassandra.cql.common.Plan;
+import org.apache.cassandra.cql.compiler.common.CqlCompiler;
+import org.apache.cassandra.cql.compiler.parse.ParseException;
+import org.apache.cassandra.cql.compiler.sem.SemanticException;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+// Server side driver class for CQL
+public class CqlDriver
+{
+ private final static Logger logger_ = Logger.getLogger(CqlDriver.class);
+
+ // Execute a CQL Statement
+ public static CqlResult executeQuery(String query) throws TException
+ {
+ CqlCompiler compiler = new CqlCompiler();
+
+ try
+ {
+ logger_.debug("Compiling CQL query ...");
+ Plan plan = compiler.compileQuery(query);
+ if (plan != null)
+ {
+ logger_.debug("Executing CQL query ...");
+ return plan.execute();
+ }
+ }
+ catch (Exception e)
+ {
+ CqlResult result = new CqlResult(null);
+ result.errorTxt = e.getMessage();
+
+ Class<? extends Exception> excpClass = e.getClass();
+ if ((excpClass != SemanticException.class)
+ && (excpClass != ParseException.class)
+ && (excpClass != RuntimeException.class))
+ {
+ result.errorTxt = "CQL Internal Error: " + result.errorTxt;
+ result.errorCode = 1; // failure
+ logger_.error(LogUtil.throwableToString(e));
+ }
+
+ return result;
+ }
+
+ return null;
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/cql/execution/RuntimeErrorMsg.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/execution/RuntimeErrorMsg.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/execution/RuntimeErrorMsg.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/execution/RuntimeErrorMsg.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql.execution;
+
+/**
+ * List of error messages thrown by CQL's Execution Layer
+ **/
+public enum RuntimeErrorMsg
+{
+ // Error messages with String.format() style format specifiers
+ GENERIC_ERROR("CQL Execution Error"),
+ INTERNAL_ERROR("CQL Internal Error: %s"),
+ IMPLEMENTATION_RESTRICTION("Implementation Restriction: %s"),
+ NO_DATA_FOUND("No data found")
+ ;
+
+ private String mesg;
+ RuntimeErrorMsg(String mesg)
+ {
+ this.mesg = mesg;
+ }
+
+ // Returns the formatted error message.
+ public String getMsg(Object... args)
+ {
+ // note: mesg itself might contain other format specifiers...
+ return String.format(mesg, args);
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+abstract class AbstractColumnFactory
+{
+ private static Map<String, AbstractColumnFactory> columnFactory_ = new HashMap<String, AbstractColumnFactory>();
+
+ static
+ {
+ columnFactory_.put(ColumnFamily.getColumnType("Standard"),new ColumnFactory());
+ columnFactory_.put(ColumnFamily.getColumnType("Super"),new SuperColumnFactory());
+ }
+
+ static AbstractColumnFactory getColumnFactory(String columnType)
+ {
+ /* Create based on the type required. */
+ if ( columnType == null || columnType.equals("Standard") )
+ return columnFactory_.get("Standard");
+ else
+ return columnFactory_.get("Super");
+ }
+
+ public abstract IColumn createColumn(String name);
+ public abstract IColumn createColumn(String name, byte[] value);
+ public abstract IColumn createColumn(String name, byte[] value, long timestamp);
+ public abstract ICompactSerializer2<IColumn> createColumnSerializer();
+}
+
+class ColumnFactory extends AbstractColumnFactory
+{
+ public IColumn createColumn(String name)
+ {
+ return new Column(name);
+ }
+
+ public IColumn createColumn(String name, byte[] value)
+ {
+ return new Column(name, value);
+ }
+
+ public IColumn createColumn(String name, byte[] value, long timestamp)
+ {
+ return new Column(name, value, timestamp);
+ }
+
+ public ICompactSerializer2<IColumn> createColumnSerializer()
+ {
+ return Column.serializer();
+ }
+}
+
+class SuperColumnFactory extends AbstractColumnFactory
+{
+ static String[] getSuperColumnAndColumn(String cName)
+ {
+ StringTokenizer st = new StringTokenizer(cName, ":");
+ String[] values = new String[st.countTokens()];
+ int i = 0;
+ while ( st.hasMoreElements() )
+ {
+ values[i++] = (String)st.nextElement();
+ }
+ return values;
+ }
+
+ public IColumn createColumn(String name)
+ {
+ String[] values = SuperColumnFactory.getSuperColumnAndColumn(name);
+ if ( values.length == 0 || values.length > 2 )
+ throw new IllegalArgumentException("Super Column " + name + " in invalid format. Must be in <super column name>:<column name> format.");
+ IColumn superColumn = new SuperColumn(values[0]);
+ if(values.length == 2)
+ {
+ IColumn subColumn = new Column(values[1]);
+ superColumn.addColumn(values[1], subColumn);
+ }
+ return superColumn;
+ }
+
+ public IColumn createColumn(String name, byte[] value)
+ {
+ String[] values = SuperColumnFactory.getSuperColumnAndColumn(name);
+ if ( values.length != 2 )
+ throw new IllegalArgumentException("Super Column " + name + " in invalid format. Must be in <super column name>:<column name> format.");
+ IColumn superColumn = new SuperColumn(values[0]);
+ IColumn subColumn = new Column(values[1], value);
+ superColumn.addColumn(values[1], subColumn);
+ return superColumn;
+ }
+
+ public IColumn createColumn(String name, byte[] value, long timestamp)
+ {
+ String[] values = SuperColumnFactory.getSuperColumnAndColumn(name);
+ if ( values.length != 2 )
+ throw new IllegalArgumentException("Super Column " + name + " in invalid format. Must be in <super column name>:<column name> format.");
+ IColumn superColumn = new SuperColumn(values[0]);
+ IColumn subColumn = new Column(values[1], value, timestamp);
+ superColumn.addColumn(values[1], subColumn);
+ return superColumn;
+ }
+
+ public ICompactSerializer2<IColumn> createColumnSerializer()
+ {
+ return SuperColumn.serializer();
+ }
+}
\ No newline at end of file
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtable.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtable.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtable.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.log4j.Logger;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class BinaryMemtable implements MemtableMBean
+{
+ private static Logger logger_ = Logger.getLogger( Memtable.class );
+ private int threshold_ = 512*1024*1024;
+ private AtomicInteger currentSize_ = new AtomicInteger(0);
+
+ /* Table and ColumnFamily name are used to determine the ColumnFamilyStore */
+ private String table_;
+ private String cfName_;
+ private boolean isFrozen_ = false;
+ private Map<String, byte[]> columnFamilies_ = new NonBlockingHashMap<String, byte[]>();
+ /* Lock and Condition for notifying new clients about Memtable switches */
+ Lock lock_ = new ReentrantLock();
+ Condition condition_;
+
+ BinaryMemtable(String table, String cfName) throws IOException
+ {
+ condition_ = lock_.newCondition();
+ table_ = table;
+ cfName_ = cfName;
+ }
+
+ public int getMemtableThreshold()
+ {
+ return currentSize_.get();
+ }
+
+ void resolveSize(int oldSize, int newSize)
+ {
+ currentSize_.addAndGet(newSize - oldSize);
+ }
+
+
+ boolean isThresholdViolated()
+ {
+ if (currentSize_.get() >= threshold_ || columnFamilies_.size() > 50000)
+ {
+ logger_.debug("CURRENT SIZE:" + currentSize_.get());
+ return true;
+ }
+ return false;
+ }
+
+ String getColumnFamily()
+ {
+ return cfName_;
+ }
+
+ /*
+ * This version is used by the external clients to put data into
+ * the memtable. This version will respect the threshold and flush
+ * the memtable to disk when the size exceeds the threshold.
+ */
+ void put(String key, byte[] buffer) throws IOException
+ {
+ if (isThresholdViolated() )
+ {
+ lock_.lock();
+ try
+ {
+ ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
+ if (!isFrozen_)
+ {
+ isFrozen_ = true;
+ BinaryMemtableManager.instance().submit(cfStore.getColumnFamilyName(), this);
+ cfStore.switchBinaryMemtable(key, buffer);
+ }
+ else
+ {
+ cfStore.applyBinary(key, buffer);
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ else
+ {
+ resolve(key, buffer);
+ }
+ }
+
+ private void resolve(String key, byte[] buffer)
+ {
+ columnFamilies_.put(key, buffer);
+ currentSize_.addAndGet(buffer.length + key.length());
+ }
+
+
+ /*
+ *
+ */
+ void flush() throws IOException
+ {
+ if ( columnFamilies_.size() == 0 )
+ return;
+ ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
+ String directory = DatabaseDescriptor.getDataFileLocation();
+ String filename = cfStore.getNextFileName();
+
+ /*
+ * Use the SSTable to write the contents of the TreeMap
+ * to disk.
+ */
+ SSTable ssTable = new SSTable(directory, filename);
+ List<String> keys = new ArrayList<String>( columnFamilies_.keySet() );
+ Collections.sort(keys);
+ /* Use this BloomFilter to decide if a key exists in a SSTable */
+ BloomFilter bf = new BloomFilter(keys.size(), 8);
+ for ( String key : keys )
+ {
+ byte[] bytes = columnFamilies_.get(key);
+ if ( bytes.length > 0 )
+ {
+ /* Now write the key and value to disk */
+ ssTable.append(key, bytes);
+ bf.fill(key);
+ }
+ }
+ ssTable.close(bf);
+ cfStore.storeLocation( ssTable.getDataFileLocation(), bf );
+ columnFamilies_.clear();
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtableManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtableManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtableManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtableManager.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class BinaryMemtableManager
+{
+ private static BinaryMemtableManager instance_;
+ private static Lock lock_ = new ReentrantLock();
+ private static Logger logger_ = Logger.getLogger(BinaryMemtableManager.class);
+
+ static BinaryMemtableManager instance()
+ {
+ if ( instance_ == null )
+ {
+ lock_.lock();
+ try
+ {
+ if ( instance_ == null )
+ instance_ = new BinaryMemtableManager();
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ return instance_;
+ }
+
+ class BinaryMemtableFlusher implements Runnable
+ {
+ private BinaryMemtable memtable_;
+
+ BinaryMemtableFlusher(BinaryMemtable memtable)
+ {
+ memtable_ = memtable;
+ }
+
+ public void run()
+ {
+ try
+ {
+ memtable_.flush();
+ }
+ catch (IOException e)
+ {
+ logger_.debug( LogUtil.throwableToString(e) );
+ }
+ }
+ }
+
+ private ExecutorService flusher_ = new DebuggableThreadPoolExecutor( 1,
+ 1,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl("BINARY-MEMTABLE-FLUSHER-POOL")
+ );
+
+ /* Submit memtables to be flushed to disk */
+ void submit(String cfName, BinaryMemtable memtbl)
+ {
+ flusher_.submit( new BinaryMemtableFlusher(memtbl) );
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryVerbHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryVerbHandler.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.RowMutationVerbHandler.RowMutationContext;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class BinaryVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger(BinaryVerbHandler.class);
+ /* We use this so that we can reuse the same row mutation context for the mutation. */
+ private static ThreadLocal<RowMutationContext> tls_ = new InheritableThreadLocal<RowMutationContext>();
+
+ public void doVerb(Message message)
+ {
+ byte[] bytes = (byte[])message.getMessageBody()[0];
+ /* Obtain a Row Mutation Context from TLS */
+ RowMutationContext rowMutationCtx = tls_.get();
+ if ( rowMutationCtx == null )
+ {
+ rowMutationCtx = new RowMutationContext();
+ tls_.set(rowMutationCtx);
+ }
+ rowMutationCtx.buffer_.reset(bytes, bytes.length);
+
+ try
+ {
+ RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(rowMutationCtx.buffer_);
+ RowMutation rm = rmMsg.getRowMutation();
+ rowMutationCtx.row_.key(rm.key());
+ rm.load(rowMutationCtx.row_);
+
+ }
+ catch ( Exception e )
+ {
+ logger_.debug(LogUtil.throwableToString(e));
+ }
+ }
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutDeployMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutDeployMessage.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutDeployMessage.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutDeployMessage.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+
+
+public class CalloutDeployMessage
+{
+ private static ICompactSerializer<CalloutDeployMessage> serializer_;
+
+ static
+ {
+ serializer_ = new CalloutDeployMessageSerializer();
+ }
+
+ public static ICompactSerializer<CalloutDeployMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ public static Message getCalloutDeployMessage(CalloutDeployMessage cdMessage) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ serializer_.serialize(cdMessage, dos);
+ Message message = new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.calloutDeployVerbHandler_, new Object[]{bos.toByteArray()});
+ return message;
+ }
+
+ /* Name of the callout */
+ private String callout_;
+ /* The actual procedure */
+ private String script_;
+
+ public CalloutDeployMessage(String callout, String script)
+ {
+ callout_ = callout;
+ script_ = script;
+ }
+
+ String getCallout()
+ {
+ return callout_;
+ }
+
+ String getScript()
+ {
+ return script_;
+ }
+}
+
+class CalloutDeployMessageSerializer implements ICompactSerializer<CalloutDeployMessage>
+{
+ public void serialize(CalloutDeployMessage cdMessage, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(cdMessage.getCallout());
+ dos.writeUTF(cdMessage.getScript());
+ }
+
+ public CalloutDeployMessage deserialize(DataInputStream dis) throws IOException
+ {
+ String callout = dis.readUTF();
+ String script = dis.readUTF();
+ return new CalloutDeployMessage(callout, script);
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutDeployVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutDeployVerbHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutDeployVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutDeployVerbHandler.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+public class CalloutDeployVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger(CalloutDeployVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ Object[] body = message.getMessageBody();
+ byte[] bytes = (byte[])body[0];
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(bytes, bytes.length);
+ try
+ {
+ CalloutDeployMessage cdMessage = CalloutDeployMessage.serializer().deserialize(bufIn);
+ /* save the callout to callout cache and to disk. */
+ CalloutManager.instance().addCallout( cdMessage.getCallout(), cdMessage.getScript() );
+ }
+ catch ( IOException ex )
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutManager.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.script.Bindings;
+import javax.script.Invocable;
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import javax.script.Compilable;
+import javax.script.CompiledScript;
+import javax.script.ScriptException;
+import javax.script.SimpleBindings;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.procedures.GroovyScriptRunner;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+public class CalloutManager
+{
+ private final static Logger logger_ = Logger.getLogger(CalloutManager.class);
+ private static final String extn_ = ".groovy";
+ /* Used to lock the factory for creation of CalloutManager instance */
+ private static Lock createLock_ = new ReentrantLock();
+ /* An instance of the CalloutManager */
+ private static CalloutManager instance_;
+
+ public static CalloutManager instance()
+ {
+ if ( instance_ == null )
+ {
+ CalloutManager.createLock_.lock();
+ try
+ {
+ if ( instance_ == null )
+ {
+ instance_ = new CalloutManager();
+ }
+ }
+ finally
+ {
+ CalloutManager.createLock_.unlock();
+ }
+ }
+ return instance_;
+ }
+
+ /* Map containing the name of callout as key and the callout script as value */
+ private Map<String, CompiledScript> calloutCache_ = new HashMap<String, CompiledScript>();
+ /* The Groovy Script compiler instance */
+ private Compilable compiler_;
+ /* The Groovy script invokable instance */
+ private Invocable invokable_;
+
+ private CalloutManager()
+ {
+ ScriptEngineManager scriptManager = new ScriptEngineManager();
+ ScriptEngine groovyEngine = scriptManager.getEngineByName("groovy");
+ compiler_ = (Compilable)groovyEngine;
+ invokable_ = (Invocable)groovyEngine;
+ }
+
+ /**
+ * Compile the script and cache the compiled script.
+ * @param script to be compiled
+ * @throws ScriptException
+ */
+ private void compileAndCache(String scriptId, String script) throws ScriptException
+ {
+ if ( compiler_ != null )
+ {
+ CompiledScript compiledScript = compiler_.compile(script);
+ calloutCache_.put(scriptId, compiledScript);
+ }
+ }
+
+ /**
+ * Invoked on start up to load all the stored callouts, compile
+ * and cache them.
+ *
+ * @throws IOException
+ */
+ void onStart() throws IOException
+ {
+ String location = DatabaseDescriptor.getCalloutLocation();
+ if ( location == null )
+ return;
+
+ File directory = new File(location);
+
+ if ( !directory.exists() )
+ directory.mkdir();
+
+ File[] files = directory.listFiles();
+
+ for ( File file : files )
+ {
+ String f = file.getName();
+ /* Get the callout name from the file */
+ String callout = f.split(extn_)[0];
+ FileInputStream fis = new FileInputStream(file);
+ byte[] bytes = new byte[fis.available()];
+ fis.read(bytes);
+ fis.close();
+ /* cache the callout after compiling it */
+ try
+ {
+ compileAndCache(callout, new String(bytes));
+ }
+ catch ( ScriptException ex )
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ }
+ }
+
+ /**
+ * Store the callout in cache and write it out
+ * to disk.
+ * @param callout the name of the callout
+ * @param script actual implementation of the callout
+ */
+ public void addCallout(String callout, String script) throws IOException
+ {
+ /* cache the script */
+ /* cache the callout after compiling it */
+ try
+ {
+ compileAndCache(callout, script);
+ }
+ catch ( ScriptException ex )
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ /* save the script to disk */
+ String scriptFile = DatabaseDescriptor.getCalloutLocation() + System.getProperty("file.separator") + callout + extn_;
+ File file = new File(scriptFile);
+ if ( file.exists() )
+ {
+ logger_.debug("Deleting the old script file ...");
+ file.delete();
+ }
+ FileOutputStream fos = new FileOutputStream(scriptFile);
+ fos.write(script.getBytes());
+ fos.close();
+ }
+
+ /**
+ * Remove the registered callout and delete the
+ * script on the disk.
+ * @param callout to be removed
+ */
+ public void removeCallout(String callout)
+ {
+ /* remove the script from cache */
+ calloutCache_.remove(callout);
+ String scriptFile = DatabaseDescriptor.getCalloutLocation() + System.getProperty("file.separator") + callout + ".grv";
+ File file = new File(scriptFile);
+ file.delete();
+ }
+
+ /**
+ * Execute the specified callout.
+ * @param callout to be executed.
+ * @params args arguments to be passed to the callouts.
+ */
+ public Object executeCallout(String callout, Object ... args)
+ {
+ Object result = null;
+ CompiledScript script = calloutCache_.get(callout);
+ if ( script != null )
+ {
+ try
+ {
+ Bindings binding = new SimpleBindings();
+ binding.put("args", args);
+ result = script.eval(binding);
+ }
+ catch(ScriptException ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ }
+ return result;
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HashingSchemes;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class Column implements IColumn, Serializable
+{
+ private static Logger logger_ = Logger.getLogger(SuperColumn.class);
+ private static ICompactSerializer2<IColumn> serializer_;
+ private final static String seperator_ = ":";
+ static
+ {
+ serializer_ = new ColumnSerializer();
+ }
+
+ static ICompactSerializer2<IColumn> serializer()
+ {
+ return serializer_;
+ }
+
+ private String name_;
+ private byte[] value_ = new byte[0];
+ private long timestamp_ = 0;
+
+ private transient AtomicBoolean isMarkedForDelete_;
+
+ /* CTOR for JAXB */
+ Column()
+ {
+ }
+
+ Column(String name)
+ {
+ name_ = name;
+ }
+
+ Column(String name, byte[] value)
+ {
+ this(name, value, 0);
+ }
+
+ Column(String name, byte[] value, long timestamp)
+ {
+ this(name);
+ value_ = value;
+ timestamp_ = timestamp;
+ }
+
+ public String name()
+ {
+ return name_;
+ }
+
+ public byte[] value()
+ {
+ return value_;
+ }
+
+ public byte[] value(String key)
+ {
+ throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+ }
+
+ public Collection<IColumn> getSubColumns()
+ {
+ throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+ }
+
+ public IColumn getSubColumn( String columnName )
+ {
+ throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+ }
+
+ public int getObjectCount()
+ {
+ return 1;
+ }
+
+ public long timestamp()
+ {
+ return timestamp_;
+ }
+
+ public long timestamp(String key)
+ {
+ throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+ }
+
+ public boolean isMarkedForDelete()
+ {
+ return (isMarkedForDelete_ != null) ? isMarkedForDelete_.get() : false;
+ }
+
+ public int size()
+ {
+ /*
+ * Size of a column is =
+ * size of a name (UtfPrefix + length of the string)
+ * + 1 byte to indicate if the column has been deleted
+ * + 8 bytes for timestamp
+ * + 4 bytes which basically indicates the size of the byte array
+ * + entire byte array.
+ */
+
+ /*
+ * We store the string as UTF-8 encoded, so when we calculate the length, it
+ * should be converted to UTF-8.
+ */
+ return IColumn.UtfPrefix_ + FBUtilities.getUTF8Length(name_) + DBConstants.boolSize_ + DBConstants.tsSize_ + DBConstants.intSize_ + value_.length;
+ }
+
+ /*
+ * This returns the size of the column when serialized.
+ * @see com.facebook.infrastructure.db.IColumn#serializedSize()
+ */
+ public int serializedSize()
+ {
+ return size();
+ }
+
+ public void addColumn(String name, IColumn column)
+ {
+ throw new UnsupportedOperationException("This operation is not supported for simple columns.");
+ }
+
+ public void delete()
+ {
+ if ( isMarkedForDelete_ == null )
+ isMarkedForDelete_ = new AtomicBoolean(true);
+ else
+ isMarkedForDelete_.set(true);
+ value_ = new byte[0];
+ }
+
+ public void repair(IColumn column)
+ {
+ if( timestamp() < column.timestamp() )
+ {
+ value_ = column.value();
+ timestamp_ = column.timestamp();
+ }
+ }
+ public IColumn diff(IColumn column)
+ {
+ IColumn columnDiff = null;
+ if( timestamp() < column.timestamp() )
+ {
+ columnDiff = new Column(column.name(),column.value(),column.timestamp());
+ }
+ return columnDiff;
+ }
+
+ /*
+ * Resolve the column by comparing timestamps
+ * if a newer vaue is being input
+ * take the change else ignore .
+ *
+ */
+ public boolean putColumn(IColumn column)
+ {
+ if ( !(column instanceof Column))
+ throw new UnsupportedOperationException("Only Column objects should be put here");
+ if( !name_.equals(column.name()))
+ throw new IllegalArgumentException("The name should match the name of the current column or super column");
+ if(timestamp_ <= column.timestamp())
+ {
+ return true;
+ }
+ return false;
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(name_);
+ sb.append(":");
+ sb.append(isMarkedForDelete());
+ sb.append(":");
+ sb.append(timestamp());
+ sb.append(":");
+ sb.append(value().length);
+ sb.append(":");
+ sb.append(value());
+ sb.append(":");
+ return sb.toString();
+ }
+
+ public byte[] digest()
+ {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(name_);
+ stringBuilder.append(seperator_);
+ stringBuilder.append(timestamp_);
+ return stringBuilder.toString().getBytes();
+ }
+
+ /**
+ * This method is basically implemented for Writable interface
+ * for M/R.
+ */
+ public void readFields(DataInput in) throws IOException
+ {
+ name_ = in.readUTF();
+ boolean delete = in.readBoolean();
+ long ts = in.readLong();
+ int size = in.readInt();
+ byte[] value = new byte[size];
+ in.readFully(value);
+ if ( delete )
+ delete();
+ }
+
+ /**
+ * This method is basically implemented for Writable interface
+ * for M/R.
+ */
+ public void write(DataOutput out) throws IOException
+ {
+ out.writeUTF(name_);
+ out.writeBoolean(isMarkedForDelete());
+ out.writeLong(timestamp_);
+ out.writeInt(value().length);
+ out.write(value());
+ }
+
+}
+
+class ColumnSerializer implements ICompactSerializer2<IColumn>
+{
+ public void serialize(IColumn column, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(column.name());
+ dos.writeBoolean(column.isMarkedForDelete());
+ dos.writeLong(column.timestamp());
+ dos.writeInt(column.value().length);
+ dos.write(column.value());
+ }
+
+ private IColumn defreeze(DataInputStream dis, String name) throws IOException
+ {
+ IColumn column = null;
+ boolean delete = dis.readBoolean();
+ long ts = dis.readLong();
+ int size = dis.readInt();
+ byte[] value = new byte[size];
+ dis.readFully(value);
+ column = new Column(name, value, ts);
+ if ( delete )
+ column.delete();
+ return column;
+ }
+
+ public IColumn deserialize(DataInputStream dis) throws IOException
+ {
+ String name = dis.readUTF();
+ return defreeze(dis, name);
+ }
+
+ /**
+ * Here we need to get the column and apply the filter.
+ */
+ public IColumn deserialize(DataInputStream dis, IFilter filter) throws IOException
+ {
+ if ( dis.available() == 0 )
+ return null;
+
+ String name = dis.readUTF();
+ IColumn column = new Column(name);
+ column = filter.filter(column, dis);
+ if ( column != null )
+ {
+ column = defreeze(dis, name);
+ }
+ else
+ {
+ /* Skip a boolean and the timestamp */
+ dis.skip(DBConstants.boolSize_ + DBConstants.tsSize_);
+ int size = dis.readInt();
+ dis.skip(size);
+ }
+ return column;
+ }
+
+ /**
+ * We know the name of the column here so just return it.
+ * Filter is pretty much useless in this call and is ignored.
+ */
+ public IColumn deserialize(DataInputStream dis, String columnName, IFilter filter) throws IOException
+ {
+ if ( dis.available() == 0 )
+ return null;
+ IColumn column = null;
+ String name = dis.readUTF();
+ if ( name.equals(columnName) )
+ {
+ column = defreeze(dis, name);
+ if( filter instanceof IdentityFilter )
+ {
+ /*
+ * If this is being called with identity filter
+ * since a column name is passed in we know
+ * that this is a final call
+ * Hence if the column is found set the filter to done
+ * so that we do not look for the column in further files
+ */
+ IdentityFilter f = (IdentityFilter)filter;
+ f.setDone();
+ }
+ }
+ else
+ {
+ /* Skip a boolean and the timestamp */
+ dis.skip(DBConstants.boolSize_ + DBConstants.tsSize_);
+ int size = dis.readInt();
+ dis.skip(size);
+ }
+ return column;
+ }
+
+ public void skip(DataInputStream dis) throws IOException
+ {
+ /* read the column name */
+ dis.readUTF();
+ /* boolean indicating if the column is deleted */
+ dis.readBoolean();
+ /* timestamp associated with the column */
+ dis.readLong();
+ /* size of the column */
+ int size = dis.readInt();
+ dis.skip(size);
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnComparatorFactory.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnComparatorFactory.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnComparatorFactory.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnComparatorFactory.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ColumnComparatorFactory
+{
+ public static enum ComparatorType
+ {
+ NAME,
+ TIMESTAMP
+ }
+
+ private static Comparator<IColumn> nameComparator_ = new ColumnNameComparator();
+ private static Comparator<IColumn> timestampComparator_ = new ColumnTimestampComparator();
+
+ public static Comparator<IColumn> getComparator(ComparatorType comparatorType)
+ {
+ Comparator<IColumn> columnComparator = timestampComparator_;
+
+ switch(comparatorType)
+ {
+ case NAME:
+ columnComparator = nameComparator_;
+ break;
+
+ case TIMESTAMP:
+
+ default:
+ columnComparator = timestampComparator_;
+ break;
+ }
+
+ return columnComparator;
+ }
+
+ public static Comparator<IColumn> getComparator(int comparatorTypeInt)
+ {
+ ComparatorType comparatorType = ComparatorType.NAME;
+
+ if(comparatorTypeInt == ComparatorType.NAME.ordinal())
+ {
+ comparatorType = ComparatorType.NAME;
+ }
+ else if(comparatorTypeInt == ComparatorType.TIMESTAMP.ordinal())
+ {
+ comparatorType = ComparatorType.TIMESTAMP;
+ }
+ return getComparator(comparatorType);
+ }
+
+ public static void main(String[] args)
+ {
+ IColumn col1 = new Column("Column-9");
+ IColumn col2 = new Column("Column-10");
+ System.out.println("Result of compare: " + getComparator(ComparatorType.NAME).compare(col1, col2));
+ }
+}
+
+abstract class AbstractColumnComparator implements Comparator<IColumn>, Serializable
+{
+ protected ColumnComparatorFactory.ComparatorType comparatorType_;
+
+ public AbstractColumnComparator(ColumnComparatorFactory.ComparatorType comparatorType)
+ {
+ comparatorType_ = comparatorType;
+ }
+
+ ColumnComparatorFactory.ComparatorType getComparatorType()
+ {
+ return comparatorType_;
+ }
+}
+
+class ColumnTimestampComparator extends AbstractColumnComparator
+{
+ ColumnTimestampComparator()
+ {
+ super(ColumnComparatorFactory.ComparatorType.TIMESTAMP);
+ }
+
+ /* if the time-stamps are the same then sort by names */
+ public int compare(IColumn column1, IColumn column2)
+ {
+ /* inverse sort by time to get hte latest first */
+ long result = column2.timestamp() - column1.timestamp();
+ int finalResult = 0;
+ if(result == 0)
+ {
+ result = column1.name().compareTo(column2.name());
+ }
+ if(result > 0)
+ {
+ finalResult = 1;
+ }
+ if( result < 0 )
+ {
+ finalResult = -1;
+ }
+ return finalResult;
+ }
+}
+
+class ColumnNameComparator extends AbstractColumnComparator
+{
+ ColumnNameComparator()
+ {
+ super(ColumnComparatorFactory.ComparatorType.NAME);
+ }
+
+ /* if the names are the same then sort by time-stamps */
+ public int compare(IColumn column1, IColumn column2)
+ {
+ long result = column1.name().compareTo(column2.name());
+ int finalResult = 0;
+ if(result == 0)
+ {
+ /* inverse sort by time to get hte latest first */
+ result = column2.timestamp() - column1.timestamp();
+ }
+ if(result > 0)
+ {
+ finalResult = 1;
+ }
+ if( result < 0 )
+ {
+ finalResult = -1;
+ }
+ return finalResult;
+ }
+}