You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC

svn commit: r749218 [11/34] - in /incubator/cassandra: branches/ dist/ nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/ trunk/src/org/apache/ trunk/src/org/apache/cassandra/ trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...

Added: incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/Cql__.g
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/Cql__.g?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/Cql__.g (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/Cql__.g Mon Mar  2 07:57:22 2009
@@ -0,0 +1,100 @@
+lexer grammar Cql;
+@header {
+            package com.facebook.infrastructure.cql.compiler.parse;
+        }
+
+T47 : '=' ;
+T48 : '(' ;
+T49 : ')' ;
+T50 : '[' ;
+T51 : ']' ;
+T52 : '.' ;
+T53 : '?' ;
+
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 247
+K_BY:        'BY';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 248
+K_DELETE:    'DELETE';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 249
+K_EXPLAIN:   'EXPLAIN';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 250
+K_FROM:      'FROM';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 251
+K_GET:       'GET';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 252
+K_IN:        'IN';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 253
+K_LIMIT:     'LIMIT';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 254
+K_OFFSET:    'OFFSET';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 255
+K_ORDER:     'ORDER';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 256
+K_PLAN:      'PLAN';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 257
+K_SELECT:    'SELECT';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 258
+K_SET:       'SET';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 259
+K_WHERE:     'WHERE';
+
+// private syntactic rules
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 262
+fragment
+Letter
+    : 'a'..'z' 
+    | 'A'..'Z'
+    ;
+
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 268
+fragment
+Digit
+    : '0'..'9'
+    ;
+
+// syntactic Elements
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 274
+Identifier
+    : Letter ( Letter | Digit | '_')*
+    ;
+
+//
+// Literals 
+//
+
+// strings: escape single quote ' by repeating it '' (SQL style)
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 283
+StringLiteral
+    : '\'' (~'\'')* '\'' ( '\'' (~'\'')* '\'' )* 
+    ;
+
+// integer literals    
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 288
+IntegerLiteral
+    : Digit+
+    ;
+
+//
+// miscellaneous syntactic elements
+//
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 295
+WS
+    :  (' '|'\r'|'\t'|'\n') {skip();}  // whitepace
+    ;
+
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 299
+COMMENT 
+    : '--' (~('\n'|'\r'))*                     { $channel=HIDDEN; }
+    | '/*' (options {greedy=false;} : .)* '*/' { $channel=HIDDEN; }
+    ;
+
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 304
+ASSOC:        '=>';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 305
+COMMA:        ',';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 306
+LEFT_BRACE:   '{';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 307
+RIGHT_BRACE:  '}';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 308
+SEMICOLON:    ';';

Added: incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/ParseError.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/ParseError.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/ParseError.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/ParseError.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.compiler.parse;
+
+import org.antlr.runtime.*;
+
+public class ParseError {
+  private BaseRecognizer br;
+  private RecognitionException re;
+  private String[] tokenNames;
+  
+  public ParseError(BaseRecognizer br, RecognitionException re, String[] tokenNames) {
+    this.br = br;
+    this.re = re;
+    this.tokenNames = tokenNames;
+    }
+  
+  public BaseRecognizer getBaseRecognizer() {
+    return br;
+  }
+
+  public RecognitionException getRecognitionException() {
+    return re;
+  }
+  
+  public String[] getTokenNames() {
+    return tokenNames;
+  }
+
+  public String getMessage() {
+    return br.getErrorHeader(re) + " " + br.getErrorMessage(re, tokenNames);
+  }
+
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/ParseException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/ParseException.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/ParseException.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/parse/ParseException.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.compiler.parse;
+
+/**
+ * Exception from the CQL Parser
+ */
+
+import java.util.ArrayList;
+
+public class ParseException extends Exception {
+
+    private static final long serialVersionUID = 1L;
+    ArrayList<ParseError> errors = null;
+
+    public ParseException(ArrayList<ParseError> errors)
+    {
+      super();
+      this.errors = errors;
+    }
+
+    public ParseException(String message)
+    {
+        super(message);
+    }
+
+    public String getMessage() {
+
+      if (errors == null)
+          return super.getMessage();
+
+      StringBuilder sb = new StringBuilder();
+      for(ParseError err: errors) {
+        sb.append(err.getMessage());
+        sb.append("\n");
+      }
+
+      return sb.toString();
+    }
+
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/sem/SemanticException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/sem/SemanticException.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/sem/SemanticException.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/sem/SemanticException.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.compiler.sem;
+
+
+/**
+ * Exception from the CQL SemanticAnalyzer
+ */
+
+public class SemanticException extends Exception
+{
+    private static final long serialVersionUID = 1L;
+
+    public SemanticException()
+    {
+        super();
+    }
+    
+    public SemanticException(String message)
+    {
+        super(message);
+    }
+    
+    public SemanticException(Throwable cause)
+    {
+        super(cause);
+    }
+    
+    public SemanticException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/sem/SemanticPhase.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/sem/SemanticPhase.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/sem/SemanticPhase.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/compiler/sem/SemanticPhase.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.compiler.sem;
+
+import java.util.Map;
+
+import org.antlr.runtime.tree.CommonTree;
+
+import org.apache.cassandra.cql.common.*;
+import org.apache.cassandra.cql.compiler.common.*;
+import org.apache.cassandra.cql.compiler.parse.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql.common.ColumnMapExpr;
+import org.apache.cassandra.cql.common.ColumnRangeQueryRSD;
+import org.apache.cassandra.cql.common.ConstantOperand;
+import org.apache.cassandra.cql.common.ExplainPlan;
+import org.apache.cassandra.cql.common.OperandDef;
+import org.apache.cassandra.cql.common.Pair;
+import org.apache.cassandra.cql.common.Plan;
+import org.apache.cassandra.cql.common.QueryPlan;
+import org.apache.cassandra.cql.common.RowSourceDef;
+import org.apache.cassandra.cql.common.SetColumnMap;
+import org.apache.cassandra.cql.common.SetSuperColumnMap;
+import org.apache.cassandra.cql.common.SetUniqueKey;
+import org.apache.cassandra.cql.common.SuperColumnMapExpr;
+import org.apache.cassandra.cql.common.SuperColumnRangeQueryRSD;
+import org.apache.cassandra.cql.common.UniqueKeyQueryRSD;
+import org.apache.cassandra.cql.common.Utils;
+import org.apache.cassandra.cql.compiler.common.CompilerErrorMsg;
+import org.apache.cassandra.cql.compiler.parse.CqlParser;
+import org.apache.log4j.Logger;
+
+//
+// Note: This class is CQL related work in progress.
+//
+// Currently, this phase combines both semantic analysis and code-gen.
+// I expect that as my ideas get refined/cleared up, I'll be drawing
+// a more clear distinction between semantic analysis phase and code-gen.
+//
+public class SemanticPhase
+{
+    private final static Logger logger_ = Logger.getLogger(SemanticPhase.class);    
+
+    // Current code-gen also happens in this phase!
+    public static Plan doSemanticAnalysis(CommonTree ast) throws SemanticException
+    {
+        Plan plan = null;
+
+        logger_.debug("AST: " + ast.toStringTree());
+
+        switch (ast.getType())
+        {
+        case CqlParser.A_GET:
+            plan = compileGet(ast);
+            break;
+        case CqlParser.A_SET:
+            plan = compileSet(ast);
+            break;
+        case CqlParser.A_DELETE:
+            compileDelete(ast);
+            break;
+        case CqlParser.A_SELECT:
+            compileSelect(ast);
+            break;
+        case CqlParser.A_EXPLAIN_PLAN:
+            // Case: EXPLAN PLAN <stmt>
+            // first, generate a plan for <stmt>
+            // and then, wrapper it with a special ExplainPlan plan
+            // whose execution will result in an explain plan rather
+            // than a normal execution of the statement.
+            plan = doSemanticAnalysis((CommonTree)(ast.getChild(0)));
+            plan = new ExplainPlan(plan);
+            break;
+        default:
+            // Unhandled AST node. Raise an internal error. 
+            throw new SemanticException(CompilerErrorMsg.INTERNAL_ERROR.getMsg(ast, "Unknown Node Type: " + ast.getType()));
+        }
+        return plan;
+    }
+
+    /** 
+     * Given a CommonTree AST node of type, A_COLUMN_ACCESS related functions, do semantic
+     * checking to ensure table name, column family name, and number of key dimensions
+     * specified are all valid. 
+     */
+    private static CFMetaData getColumnFamilyInfo(CommonTree ast) throws SemanticException
+    {
+        assert(ast.getType() == CqlParser.A_COLUMN_ACCESS);
+
+        CommonTree columnFamilyNode = (CommonTree)(ast.getChild(1)); 
+        CommonTree tableNode = (CommonTree)(ast.getChild(0));
+
+        String columnFamily = columnFamilyNode.getText();
+        String table = tableNode.getText();
+
+        Map<String, CFMetaData> columnFamilies = DatabaseDescriptor.getTableMetaData(table);
+        if (columnFamilies == null)
+        {
+            throw new SemanticException(CompilerErrorMsg.INVALID_TABLE.getMsg(ast, table));
+        }
+
+        CFMetaData cfMetaData = columnFamilies.get(columnFamily);
+        if (cfMetaData == null)
+        {
+            throw new SemanticException(CompilerErrorMsg.INVALID_COLUMN_FAMILY.getMsg(ast, columnFamily, table));
+        }
+
+        // Once you have drilled down to a row using a rowKey, a super column
+        // map can be indexed only 2 further levels deep; and a column map may
+        // be indexed up to 1 level deep.
+        int dimensions = numColumnDimensions(ast);
+        if (("Super".equals(cfMetaData.columnType) && (dimensions > 2)) ||
+            ("Standard".equals(cfMetaData.columnType) && dimensions > 1))
+        {
+            throw new SemanticException(CompilerErrorMsg.TOO_MANY_DIMENSIONS.getMsg(ast, cfMetaData.columnType));
+        }
+
+        return cfMetaData; 
+    }
+
+    private static String getRowKey(CommonTree ast)
+    {
+        assert(ast.getType() == CqlParser.A_COLUMN_ACCESS);
+        return Utils.unescapeSQLString(ast.getChild(2).getText());
+    }
+
+    private static int numColumnDimensions(CommonTree ast)
+    {
+        // Skip over table name, column family and rowKey
+        return ast.getChildCount() - 3;
+    }
+
+    // Returns the pos'th (0-based index) column specifier in the astNode
+    private static String getColumn(CommonTree ast, int pos)
+    {
+        // Skip over table name, column family and rowKey
+        return Utils.unescapeSQLString(ast.getChild(pos + 3).getText()); 
+    }
+
+    // Compile a GET statement
+    private static Plan compileGet(CommonTree ast) throws SemanticException
+    {
+        int childCount = ast.getChildCount();
+        assert(childCount == 1);
+
+        CommonTree columnFamilySpec = (CommonTree)ast.getChild(0);
+        assert(columnFamilySpec.getType() == CqlParser.A_COLUMN_ACCESS);
+
+        CFMetaData cfMetaData = getColumnFamilyInfo(columnFamilySpec);
+        ConstantOperand rowKey = new ConstantOperand(getRowKey(columnFamilySpec));
+        int dimensionCnt = numColumnDimensions(columnFamilySpec);
+
+        RowSourceDef rwsDef;
+        if ("Super".equals(cfMetaData.columnType))
+        {
+            if (dimensionCnt > 2)
+            {
+                // We don't expect this case to arise, since Cql.g grammar disallows this.
+                // therefore, raise this case as an "internal error".
+                throw new SemanticException(CompilerErrorMsg.INTERNAL_ERROR.getMsg(columnFamilySpec));
+            }
+
+            if (dimensionCnt == 2)
+            {
+                // Case: table.super_cf[<rowKey>][<superColumnKey>][<columnKey>]
+                ConstantOperand superColumnKey = new ConstantOperand(getColumn(columnFamilySpec, 0));                
+                ConstantOperand columnKey = new ConstantOperand(getColumn(columnFamilySpec, 1));
+                rwsDef = new UniqueKeyQueryRSD(cfMetaData, rowKey, superColumnKey, columnKey);
+            }
+            else if (dimensionCnt == 1)
+            {
+                // Case: table.super_cf[<rowKey>][<superColumnKey>]
+                ConstantOperand superColumnKey = new ConstantOperand(getColumn(columnFamilySpec, 0));                
+                rwsDef = new ColumnRangeQueryRSD(cfMetaData, rowKey, superColumnKey, -1, Integer.MAX_VALUE);
+            }
+            else
+            {
+                // Case: table.super_cf[<rowKey>]             
+                rwsDef = new SuperColumnRangeQueryRSD(cfMetaData, rowKey, -1, Integer.MAX_VALUE);
+            }
+        }
+        else  // Standard Column Family
+        {
+            if (dimensionCnt == 1)
+            {
+                // Case: table.standard_cf[<rowKey>][<columnKey>]
+                ConstantOperand columnKey = new ConstantOperand(getColumn(columnFamilySpec, 0));
+                rwsDef = new UniqueKeyQueryRSD(cfMetaData, rowKey, columnKey);
+            }
+            else
+            {
+                // Case: table.standard_cf[<rowKey>]
+                logger_.assertLog((dimensionCnt == 0), "invalid dimensionCnt: " + dimensionCnt);
+                rwsDef = new ColumnRangeQueryRSD(cfMetaData, rowKey, -1, Integer.MAX_VALUE);
+            }
+        }
+        return new QueryPlan(rwsDef);
+    }
+    
+    private static OperandDef  getSimpleExpr(CommonTree ast) throws SemanticException
+    {
+        int type = ast.getType();
+
+        // for now, the only simple expressions support are of string type
+        if (type != CqlParser.StringLiteral)
+        {
+            throw new SemanticException(CompilerErrorMsg.INVALID_TYPE.getMsg(ast));
+        }
+        return new ConstantOperand(Utils.unescapeSQLString(ast.getText()));
+    }
+
+    private static ColumnMapExpr getColumnMapExpr(CommonTree ast) throws SemanticException
+    {
+        int type = ast.getType();
+        if (type != CqlParser.A_COLUMN_MAP_VALUE)
+        {
+            throw new SemanticException(CompilerErrorMsg.INVALID_TYPE.getMsg(ast));
+        }
+        
+        int size = ast.getChildCount();
+        ColumnMapExpr result = new ColumnMapExpr();
+        for (int idx = 0; idx < size; idx++)
+        {
+            CommonTree entryNode = (CommonTree)(ast.getChild(idx));
+            OperandDef columnKey   = getSimpleExpr((CommonTree)(entryNode.getChild(0)));
+            OperandDef columnValue = getSimpleExpr((CommonTree)(entryNode.getChild(1)));            
+
+            Pair<OperandDef, OperandDef> entry = new Pair<OperandDef, OperandDef>(columnKey, columnValue);
+            result.add(entry);
+        }
+        return result;
+    }
+
+    private static SuperColumnMapExpr getSuperColumnMapExpr(CommonTree ast) throws SemanticException
+    {
+        int type = ast.getType();        
+        if (type != CqlParser.A_SUPERCOLUMN_MAP_VALUE)
+        {
+            throw new SemanticException(CompilerErrorMsg.INVALID_TYPE.getMsg(ast));
+        }
+        int size = ast.getChildCount();
+        SuperColumnMapExpr result = new SuperColumnMapExpr();
+        for (int idx = 0; idx < size; idx++)
+        {
+            CommonTree entryNode = (CommonTree)(ast.getChild(idx));
+            OperandDef    superColumnKey = getSimpleExpr((CommonTree)(entryNode.getChild(0)));
+            ColumnMapExpr columnMapExpr  = getColumnMapExpr((CommonTree)(entryNode.getChild(1)));            
+
+            Pair<OperandDef, ColumnMapExpr> entry = new Pair<OperandDef, ColumnMapExpr>(superColumnKey, columnMapExpr);
+            result.add(entry);
+        }
+        return result;
+    }
+
+    // compile a SET statement
+    private static Plan compileSet(CommonTree ast) throws SemanticException
+    {
+        int childCount = ast.getChildCount();
+        assert(childCount == 2);
+
+        CommonTree columnFamilySpec = (CommonTree)ast.getChild(0);
+        assert(columnFamilySpec.getType() == CqlParser.A_COLUMN_ACCESS);
+
+        CFMetaData cfMetaData = getColumnFamilyInfo(columnFamilySpec);
+        ConstantOperand rowKey = new ConstantOperand(getRowKey(columnFamilySpec));
+        int dimensionCnt = numColumnDimensions(columnFamilySpec);
+
+        CommonTree  valueNode = (CommonTree)(ast.getChild(1));
+
+        Plan plan = null;
+        if ("Super".equals(cfMetaData.columnType))
+        {
+            if (dimensionCnt == 2)
+            {
+                // Case: set table.super_cf['key']['supercolumn']['column'] = 'value'
+                OperandDef value = getSimpleExpr(valueNode);
+                ConstantOperand superColumnKey = new ConstantOperand(getColumn(columnFamilySpec, 0));
+                ConstantOperand columnKey = new ConstantOperand(getColumn(columnFamilySpec, 1));
+                plan = new SetUniqueKey(cfMetaData, rowKey, superColumnKey, columnKey, value);
+            }
+            else if (dimensionCnt == 1)
+            {
+                // Case: set table.super_cf['key']['supercolumn'] = <column_map>;
+                ColumnMapExpr columnMapExpr = getColumnMapExpr(valueNode);                
+                ConstantOperand superColumnKey = new ConstantOperand(getColumn(columnFamilySpec, 0));
+                plan = new SetColumnMap(cfMetaData, rowKey, superColumnKey, columnMapExpr);
+            }
+            else
+            {
+                // Case: set table.super_cf['key'] = <super_column_map>;
+                logger_.assertLog(dimensionCnt == 0, "invalid dimensionCnt: " + dimensionCnt);
+                SuperColumnMapExpr superColumnMapExpr = getSuperColumnMapExpr(valueNode);                
+                plan = new SetSuperColumnMap(cfMetaData, rowKey, superColumnMapExpr);
+            }
+        }
+        else  // Standard column family
+        {
+            if (dimensionCnt == 1)
+            {
+                // Case: set table.standard_cf['key']['column'] = 'value'
+                OperandDef value = getSimpleExpr(valueNode);                
+                ConstantOperand columnKey = new ConstantOperand(getColumn(columnFamilySpec, 0));
+                plan = new SetUniqueKey(cfMetaData, rowKey, columnKey, value);
+            } 
+            else
+            {
+                // Case: set table.standard_cf['key'] = <column_map>;
+                logger_.assertLog(dimensionCnt == 0, "invalid dimensionCnt: " + dimensionCnt);
+                ColumnMapExpr columnMapExpr = getColumnMapExpr(valueNode);                
+                plan = new SetColumnMap(cfMetaData, rowKey, columnMapExpr);
+            }
+        }
+        return plan;
+    }
+
+    private static void compileSelect(CommonTree ast) throws SemanticException
+    {
+        // stub; tbd.
+    }
+    private static void compileDelete(CommonTree ast) throws SemanticException
+    {
+        // stub; tbd.
+    }
+}
\ No newline at end of file

Added: incubator/cassandra/trunk/src/org/apache/cassandra/cql/driver/CqlDriver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/driver/CqlDriver.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/driver/CqlDriver.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/driver/CqlDriver.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql.driver;
+
+import org.apache.cassandra.cql.compiler.common.*;
+import org.apache.cassandra.cql.compiler.parse.*;
+import org.apache.cassandra.cql.compiler.sem.*;
+import org.apache.cassandra.cql.common.*;
+import com.facebook.thrift.*;
+
+import org.apache.cassandra.cql.common.CqlResult;
+import org.apache.cassandra.cql.common.Plan;
+import org.apache.cassandra.cql.compiler.common.CqlCompiler;
+import org.apache.cassandra.cql.compiler.parse.ParseException;
+import org.apache.cassandra.cql.compiler.sem.SemanticException;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+// Server side driver class for CQL
+public class CqlDriver 
+{
+    private final static Logger logger_ = Logger.getLogger(CqlDriver.class);
+
+    // Execute a CQL Statement 
+    public static CqlResult executeQuery(String query) throws TException 
+    {
+        CqlCompiler compiler = new CqlCompiler();
+
+        try
+        {
+            logger_.debug("Compiling CQL query ...");
+            Plan plan = compiler.compileQuery(query);
+            if (plan != null)
+            {
+                logger_.debug("Executing CQL query ...");            
+                return plan.execute();
+            }
+        }
+        catch (Exception e)
+        {
+            CqlResult result = new CqlResult(null);
+            result.errorTxt = e.getMessage();           
+
+            Class<? extends Exception> excpClass = e.getClass();
+            if ((excpClass != SemanticException.class)
+                && (excpClass != ParseException.class)
+                && (excpClass != RuntimeException.class))
+            {
+                result.errorTxt = "CQL Internal Error: " + result.errorTxt;
+                result.errorCode = 1; // failure
+                logger_.error(LogUtil.throwableToString(e));
+            }
+
+            return result;
+        }
+
+        return null;
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/cql/execution/RuntimeErrorMsg.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/execution/RuntimeErrorMsg.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/execution/RuntimeErrorMsg.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/execution/RuntimeErrorMsg.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql.execution;
+
+/**
+ * List of error messages thrown by CQL's Execution Layer
+ **/
+public enum RuntimeErrorMsg
+{
+    // Error messages with String.format() style format specifiers
+    GENERIC_ERROR("CQL Execution Error"),
+    INTERNAL_ERROR("CQL Internal Error: %s"),
+    IMPLEMENTATION_RESTRICTION("Implementation Restriction: %s"),
+    NO_DATA_FOUND("No data found")
+    ;
+
+    private String mesg;
+    RuntimeErrorMsg(String mesg) 
+    {
+        this.mesg = mesg;
+    }
+
+    // Returns the formatted error message. 
+    public String getMsg(Object... args)
+    {
+        // note: mesg itself might contain other format specifiers...
+        return String.format(mesg, args);
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+abstract class AbstractColumnFactory
+{
+    private static Map<String, AbstractColumnFactory> columnFactory_ = new HashMap<String, AbstractColumnFactory>();
+	
+	static
+	{
+		columnFactory_.put(ColumnFamily.getColumnType("Standard"),new ColumnFactory());
+		columnFactory_.put(ColumnFamily.getColumnType("Super"),new SuperColumnFactory());
+	}
+	
+	static AbstractColumnFactory getColumnFactory(String columnType)
+	{
+		/* Create based on the type required. */
+		if ( columnType == null || columnType.equals("Standard") )
+			return columnFactory_.get("Standard");
+		else
+			return columnFactory_.get("Super");
+	}
+    
+	public abstract IColumn createColumn(String name);
+	public abstract IColumn createColumn(String name, byte[] value);
+	public abstract IColumn createColumn(String name, byte[] value, long timestamp);
+    public abstract ICompactSerializer2<IColumn> createColumnSerializer();
+}
+
+class ColumnFactory extends AbstractColumnFactory
+{
+	public IColumn createColumn(String name)
+	{
+		return new Column(name);
+	}
+	
+	public IColumn createColumn(String name, byte[] value)
+	{
+		return new Column(name, value);
+	}
+	
+	public IColumn createColumn(String name, byte[] value, long timestamp)
+	{
+		return new Column(name, value, timestamp);
+	}
+    
+    public ICompactSerializer2<IColumn> createColumnSerializer()
+    {
+        return Column.serializer();
+    }
+}
+
+class SuperColumnFactory extends AbstractColumnFactory
+{
+    static String[] getSuperColumnAndColumn(String cName)
+    {
+        StringTokenizer st = new StringTokenizer(cName, ":");
+        String[] values = new String[st.countTokens()];
+        int i = 0;
+        while ( st.hasMoreElements() )
+        {
+            values[i++] = (String)st.nextElement();
+        }
+        return values;
+    }
+
+	public IColumn createColumn(String name)
+	{
+		String[] values = SuperColumnFactory.getSuperColumnAndColumn(name);
+        if ( values.length == 0 ||  values.length > 2 )
+            throw new IllegalArgumentException("Super Column " + name + " in invalid format. Must be in <super column name>:<column name> format.");
+        IColumn superColumn = new SuperColumn(values[0]);
+        if(values.length == 2)
+        {
+	        IColumn subColumn = new Column(values[1]);
+	        superColumn.addColumn(values[1], subColumn);
+        }
+		return superColumn;
+	}
+	
+	public IColumn createColumn(String name, byte[] value)
+	{
+		String[] values = SuperColumnFactory.getSuperColumnAndColumn(name);
+        if ( values.length != 2 )
+            throw new IllegalArgumentException("Super Column " + name + " in invalid format. Must be in <super column name>:<column name> format.");
+        IColumn superColumn = new SuperColumn(values[0]);
+        IColumn subColumn = new Column(values[1], value);
+        superColumn.addColumn(values[1], subColumn);
+		return superColumn;
+	}
+	
+	public IColumn createColumn(String name, byte[] value, long timestamp)
+	{
+		String[] values = SuperColumnFactory.getSuperColumnAndColumn(name);
+        if ( values.length != 2 )
+            throw new IllegalArgumentException("Super Column " + name + " in invalid format. Must be in <super column name>:<column name> format.");
+        IColumn superColumn = new SuperColumn(values[0]);
+        IColumn subColumn = new Column(values[1], value, timestamp);
+        superColumn.addColumn(values[1], subColumn);
+		return superColumn;
+	}
+    
+    public ICompactSerializer2<IColumn> createColumnSerializer()
+    {
+        return SuperColumn.serializer();
+    }
+}
\ No newline at end of file

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtable.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtable.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtable.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.log4j.Logger;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class BinaryMemtable implements MemtableMBean
+{
+    private static Logger logger_ = Logger.getLogger( Memtable.class );
+    private int threshold_ = 512*1024*1024;
+    private AtomicInteger currentSize_ = new AtomicInteger(0);
+
+    /* Table and ColumnFamily name are used to determine the ColumnFamilyStore */
+    private String table_;
+    private String cfName_;
+    private boolean isFrozen_ = false;
+    private Map<String, byte[]> columnFamilies_ = new NonBlockingHashMap<String, byte[]>();
+    /* Lock and Condition for notifying new clients about Memtable switches */
+    Lock lock_ = new ReentrantLock();
+    Condition condition_;
+
+    BinaryMemtable(String table, String cfName) throws IOException
+    {
+        condition_ = lock_.newCondition();
+        table_ = table;
+        cfName_ = cfName;
+    }
+
+    public int getMemtableThreshold()
+    {
+        return currentSize_.get();
+    }
+
+    void resolveSize(int oldSize, int newSize)
+    {
+        currentSize_.addAndGet(newSize - oldSize);
+    }
+
+
+    boolean isThresholdViolated()
+    {
+        if (currentSize_.get() >= threshold_ || columnFamilies_.size() > 50000)
+        {
+            logger_.debug("CURRENT SIZE:" + currentSize_.get());
+        	return true;
+        }
+        return false;
+    }
+
+    String getColumnFamily()
+    {
+    	return cfName_;
+    }
+
+    /*
+     * This version is used by the external clients to put data into
+     * the memtable. This version will respect the threshold and flush
+     * the memtable to disk when the size exceeds the threshold.
+    */
+    void put(String key, byte[] buffer) throws IOException
+    {
+        if (isThresholdViolated() )
+        {
+            lock_.lock();
+            try
+            {
+                ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
+                if (!isFrozen_)
+                {
+                    isFrozen_ = true;
+                    BinaryMemtableManager.instance().submit(cfStore.getColumnFamilyName(), this);
+                    cfStore.switchBinaryMemtable(key, buffer);
+                }
+                else
+                {
+                    cfStore.applyBinary(key, buffer);
+                }
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+        }
+        else
+        {
+            resolve(key, buffer);
+        }
+    }
+
+    private void resolve(String key, byte[] buffer)
+    {
+            columnFamilies_.put(key, buffer);
+            currentSize_.addAndGet(buffer.length + key.length());
+    }
+
+
+    /*
+     * 
+    */
+    void flush() throws IOException
+    {
+        if ( columnFamilies_.size() == 0 )
+            return;
+        ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
+        String directory = DatabaseDescriptor.getDataFileLocation();
+        String filename = cfStore.getNextFileName();
+
+        /*
+         * Use the SSTable to write the contents of the TreeMap
+         * to disk.
+        */
+        SSTable ssTable = new SSTable(directory, filename);
+        List<String> keys = new ArrayList<String>( columnFamilies_.keySet() );
+        Collections.sort(keys);        
+        /* Use this BloomFilter to decide if a key exists in a SSTable */
+        BloomFilter bf = new BloomFilter(keys.size(), 8);
+        for ( String key : keys )
+        {           
+            byte[] bytes = columnFamilies_.get(key);
+            if ( bytes.length > 0 )
+            {            	
+                /* Now write the key and value to disk */
+                ssTable.append(key, bytes);
+                bf.fill(key);
+            }
+        }
+        ssTable.close(bf);
+        cfStore.storeLocation( ssTable.getDataFileLocation(), bf );
+        columnFamilies_.clear();       
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtableManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtableManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtableManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtableManager.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class BinaryMemtableManager
+{
+    private static BinaryMemtableManager instance_;
+    private static Lock lock_ = new ReentrantLock();
+    private static Logger logger_ = Logger.getLogger(BinaryMemtableManager.class);    
+
+    static BinaryMemtableManager instance() 
+    {
+        if ( instance_ == null )
+        {
+            lock_.lock();
+            try
+            {
+                if ( instance_ == null )
+                    instance_ = new BinaryMemtableManager();
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+        }
+        return instance_;
+    }
+    
+    class BinaryMemtableFlusher implements Runnable
+    {
+        private BinaryMemtable memtable_;
+        
+        BinaryMemtableFlusher(BinaryMemtable memtable)
+        {
+            memtable_ = memtable;
+        }
+        
+        public void run()
+        {
+            try
+            {
+            	memtable_.flush();
+            }
+            catch (IOException e)
+            {
+                logger_.debug( LogUtil.throwableToString(e) );
+            }        	
+        }
+    }
+    
+    private ExecutorService flusher_ = new DebuggableThreadPoolExecutor( 1,
+            1,
+            Integer.MAX_VALUE,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<Runnable>(),
+            new ThreadFactoryImpl("BINARY-MEMTABLE-FLUSHER-POOL")
+            );  
+    
+    /* Submit memtables to be flushed to disk */
+    void submit(String cfName, BinaryMemtable memtbl)
+    {
+    	flusher_.submit( new BinaryMemtableFlusher(memtbl) );
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryVerbHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryVerbHandler.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.RowMutationVerbHandler.RowMutationContext;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class BinaryVerbHandler implements IVerbHandler
+{
+    private static Logger logger_ = Logger.getLogger(BinaryVerbHandler.class);    
+    /* We use this so that we can reuse the same row mutation context for the mutation. */
+    private static ThreadLocal<RowMutationContext> tls_ = new InheritableThreadLocal<RowMutationContext>();
+    
+    public void doVerb(Message message)
+    { 
+        byte[] bytes = (byte[])message.getMessageBody()[0];
+        /* Obtain a Row Mutation Context from TLS */
+        RowMutationContext rowMutationCtx = tls_.get();
+        if ( rowMutationCtx == null )
+        {
+            rowMutationCtx = new RowMutationContext();
+            tls_.set(rowMutationCtx);
+        }                
+        rowMutationCtx.buffer_.reset(bytes, bytes.length);
+        
+	    try
+	    {
+            RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(rowMutationCtx.buffer_);
+            RowMutation rm = rmMsg.getRowMutation();            	                
+            rowMutationCtx.row_.key(rm.key());
+            rm.load(rowMutationCtx.row_);
+	
+	    }        
+	    catch ( Exception e )
+	    {
+	        logger_.debug(LogUtil.throwableToString(e));            
+	    }        
+    }
+
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutDeployMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutDeployMessage.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutDeployMessage.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutDeployMessage.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+
+
+public class CalloutDeployMessage
+{
+    private static ICompactSerializer<CalloutDeployMessage> serializer_;
+    
+    static
+    {
+        serializer_ = new CalloutDeployMessageSerializer();
+    }
+    
+    public static ICompactSerializer<CalloutDeployMessage> serializer()
+    {
+        return serializer_;
+    }
+    
+    public static Message getCalloutDeployMessage(CalloutDeployMessage cdMessage) throws IOException
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+        serializer_.serialize(cdMessage, dos);
+        Message message = new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.calloutDeployVerbHandler_, new Object[]{bos.toByteArray()});
+        return message;
+    }
+    
+    /* Name of the callout */
+    private String callout_;
+    /* The actual procedure */
+    private String script_;
+    
+    public CalloutDeployMessage(String callout, String script)
+    {
+        callout_ = callout;
+        script_ = script;
+    }
+    
+    String getCallout()
+    {
+        return callout_;
+    }
+    
+    String getScript()
+    {
+        return script_;
+    }
+}
+
+class CalloutDeployMessageSerializer implements ICompactSerializer<CalloutDeployMessage>
+{
+    public void serialize(CalloutDeployMessage cdMessage, DataOutputStream dos) throws IOException
+    {
+        dos.writeUTF(cdMessage.getCallout());
+        dos.writeUTF(cdMessage.getScript());
+    }
+    
+    public CalloutDeployMessage deserialize(DataInputStream dis) throws IOException
+    {
+        String callout = dis.readUTF();
+        String script = dis.readUTF();
+        return new CalloutDeployMessage(callout, script);
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutDeployVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutDeployVerbHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutDeployVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutDeployVerbHandler.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+public class CalloutDeployVerbHandler implements IVerbHandler
+{
+    private static Logger logger_ = Logger.getLogger(CalloutDeployVerbHandler.class);
+    
+    public void doVerb(Message message)
+    {
+        Object[] body = message.getMessageBody();
+        byte[] bytes = (byte[])body[0];
+        DataInputBuffer bufIn = new DataInputBuffer();
+        bufIn.reset(bytes, bytes.length);
+        try
+        {
+            CalloutDeployMessage cdMessage = CalloutDeployMessage.serializer().deserialize(bufIn);
+            /* save the callout to callout cache and to disk. */
+            CalloutManager.instance().addCallout( cdMessage.getCallout(), cdMessage.getScript() );
+        }
+        catch ( IOException ex )
+        {
+            logger_.warn(LogUtil.throwableToString(ex));
+        }        
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CalloutManager.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.script.Bindings;
+import javax.script.Invocable;
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import javax.script.Compilable;
+import javax.script.CompiledScript;
+import javax.script.ScriptException;
+import javax.script.SimpleBindings;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.procedures.GroovyScriptRunner;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+public class CalloutManager
+{
+    private final static Logger logger_ = Logger.getLogger(CalloutManager.class); 
+    private static final String extn_ = ".groovy";
+    /* Used to lock the factory for creation of CalloutManager instance */
+    private static Lock createLock_ = new ReentrantLock();
+    /* An instance of the CalloutManager  */
+    private static CalloutManager instance_;
+    
+    public static CalloutManager instance()
+    {
+        if ( instance_ == null )
+        {
+            CalloutManager.createLock_.lock();
+            try
+            {
+                if ( instance_ == null )
+                {
+                    instance_ = new CalloutManager();
+                }
+            }
+            finally
+            {
+                CalloutManager.createLock_.unlock();
+            }
+        }
+        return instance_;
+    }
+    
+    /* Map containing the name of callout as key and the callout script as value */
+    private Map<String, CompiledScript> calloutCache_ = new HashMap<String, CompiledScript>();    
+    /* The Groovy Script compiler instance */
+    private Compilable compiler_;
+    /* The Groovy script invokable instance */
+    private Invocable invokable_;
+    
+    private CalloutManager()
+    {
+        ScriptEngineManager scriptManager = new ScriptEngineManager();
+        ScriptEngine groovyEngine = scriptManager.getEngineByName("groovy");
+        compiler_ = (Compilable)groovyEngine;
+        invokable_ = (Invocable)groovyEngine;
+    }
+    
+    /**
+     * Compile the script and cache the compiled script.
+     * @param script to be compiled
+     * @throws ScriptException
+     */
+    private void compileAndCache(String scriptId, String script) throws ScriptException
+    {
+        if ( compiler_ != null )
+        {
+            CompiledScript compiledScript = compiler_.compile(script);
+            calloutCache_.put(scriptId, compiledScript);
+        }
+    }
+    
+    /**
+     * Invoked on start up to load all the stored callouts, compile
+     * and cache them.
+     * 
+     * @throws IOException
+     */
+    void onStart() throws IOException
+    {
+    	String location = DatabaseDescriptor.getCalloutLocation();
+    	if ( location == null )
+    		return;
+    	
+        File directory = new File(location);        
+        
+        if ( !directory.exists() )
+            directory.mkdir();
+        
+        File[] files = directory.listFiles();
+        
+        for ( File file : files )
+        {
+            String f = file.getName();
+            /* Get the callout name from the file */
+            String callout = f.split(extn_)[0];
+            FileInputStream fis = new FileInputStream(file);
+            byte[] bytes = new byte[fis.available()];
+            fis.read(bytes);
+            fis.close();
+            /* cache the callout after compiling it */
+            try
+            {
+                compileAndCache(callout, new String(bytes));                    
+            }
+            catch ( ScriptException ex )
+            {
+                logger_.warn(LogUtil.throwableToString(ex));
+            }
+        }
+    }
+    
+    /**
+     * Store the callout in cache and write it out
+     * to disk.
+     * @param callout the name of the callout
+     * @param script actual implementation of the callout
+    */
+    public void addCallout(String callout, String script) throws IOException
+    {
+        /* cache the script */
+        /* cache the callout after compiling it */
+        try
+        {
+            compileAndCache(callout, script);                    
+        }
+        catch ( ScriptException ex )
+        {
+            logger_.warn(LogUtil.throwableToString(ex));
+        }
+        /* save the script to disk */
+        String scriptFile = DatabaseDescriptor.getCalloutLocation() + System.getProperty("file.separator") + callout + extn_;
+        File file = new File(scriptFile);
+        if ( file.exists() )
+        {
+            logger_.debug("Deleting the old script file ...");
+            file.delete();
+        }
+        FileOutputStream fos = new FileOutputStream(scriptFile);
+        fos.write(script.getBytes());
+        fos.close();
+    }
+    
+    /**
+     * Remove the registered callout and delete the
+     * script on the disk.
+     * @param callout to be removed
+     */
+    public void removeCallout(String callout)
+    {
+        /* remove the script from cache */
+        calloutCache_.remove(callout);
+        String scriptFile = DatabaseDescriptor.getCalloutLocation() + System.getProperty("file.separator") + callout + ".grv";
+        File file = new File(scriptFile);
+        file.delete();
+    }
+    
+    /**
+     * Execute the specified callout.
+     * @param callout to be executed.
+     * @params args arguments to be passed to the callouts.
+     */
+    public Object executeCallout(String callout, Object ... args)
+    {
+        Object result = null;
+        CompiledScript script = calloutCache_.get(callout);
+        if ( script != null )
+        {
+            try
+            {
+                Bindings binding = new SimpleBindings();
+                binding.put("args", args);
+                result = script.eval(binding);
+            }
+            catch(ScriptException ex)
+            {
+                logger_.warn(LogUtil.throwableToString(ex));
+            }
+        }
+        return result;
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HashingSchemes;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class Column implements IColumn, Serializable
+{
+	private static Logger logger_ = Logger.getLogger(SuperColumn.class);
+    private static ICompactSerializer2<IColumn> serializer_;
+	private final static String seperator_ = ":";
+    static
+    {
+        serializer_ = new ColumnSerializer();
+    }
+
+    static ICompactSerializer2<IColumn> serializer()
+    {
+        return serializer_;
+    }
+
+    private String name_;
+    private byte[] value_ = new byte[0];
+    private long timestamp_ = 0;
+
+    private transient AtomicBoolean isMarkedForDelete_;
+
+    /* CTOR for JAXB */
+    Column()
+    {
+    }
+
+    Column(String name)
+    {
+        name_ = name;
+    }
+
+    Column(String name, byte[] value)
+    {
+        this(name, value, 0);
+    }
+
+    Column(String name, byte[] value, long timestamp)
+    {
+        this(name);
+        value_ = value;
+        timestamp_ = timestamp;
+    }
+
+    public String name()
+    {
+        return name_;
+    }
+
+    public byte[] value()
+    {
+        return value_;
+    }
+
+    public byte[] value(String key)
+    {
+    	throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+    }
+
+    public Collection<IColumn> getSubColumns()
+    {
+    	throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+    }
+
+    public IColumn getSubColumn( String columnName )
+    {
+    	throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+    }
+
+    public int getObjectCount()
+    {
+    	return 1;
+    }
+
+    public long timestamp()
+    {
+        return timestamp_;
+    }
+
+    public long timestamp(String key)
+    {
+    	throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+    }
+
+    public boolean isMarkedForDelete()
+    {
+        return (isMarkedForDelete_ != null) ? isMarkedForDelete_.get() : false;
+    }
+
+    public int size()
+    {
+        /*
+         * Size of a column is =
+         *   size of a name (UtfPrefix + length of the string)
+         * + 1 byte to indicate if the column has been deleted
+         * + 8 bytes for timestamp
+         * + 4 bytes which basically indicates the size of the byte array
+         * + entire byte array.
+        */
+
+    	/*
+    	 * We store the string as UTF-8 encoded, so when we calculate the length, it
+    	 * should be converted to UTF-8.
+    	 */
+        return IColumn.UtfPrefix_ + FBUtilities.getUTF8Length(name_) + DBConstants.boolSize_ + DBConstants.tsSize_ + DBConstants.intSize_ + value_.length;
+    }
+
+    /*
+     * This returns the size of the column when serialized.
+     * @see com.facebook.infrastructure.db.IColumn#serializedSize()
+    */
+    public int serializedSize()
+    {
+    	return size();
+    }
+
+    public void addColumn(String name, IColumn column)
+    {
+    	throw new UnsupportedOperationException("This operation is not supported for simple columns.");
+    }
+
+    public void delete()
+    {
+        if ( isMarkedForDelete_ == null )
+            isMarkedForDelete_ = new AtomicBoolean(true);
+        else
+            isMarkedForDelete_.set(true);
+    	value_ = new byte[0];
+    }
+
+    public void repair(IColumn column)
+    {
+    	if( timestamp() < column.timestamp() )
+    	{
+    		value_ = column.value();
+    		timestamp_ = column.timestamp();
+    	}
+    }
+    public IColumn diff(IColumn column)
+    {
+    	IColumn  columnDiff = null;
+    	if( timestamp() < column.timestamp() )
+    	{
+    		columnDiff = new Column(column.name(),column.value(),column.timestamp());
+    	}
+    	return columnDiff;
+    }
+
+    /*
+     * Resolve the column by comparing timestamps
+     * if a newer vaue is being input
+     * take the change else ignore .
+     *
+     */
+    public boolean putColumn(IColumn column)
+    {
+    	if ( !(column instanceof Column))
+    		throw new UnsupportedOperationException("Only Column objects should be put here");
+    	if( !name_.equals(column.name()))
+    		throw new IllegalArgumentException("The name should match the name of the current column or super column");
+    	if(timestamp_ <= column.timestamp())
+    	{
+            return true;
+    	}
+        return false;
+    }
+
+    public String toString()
+    {
+    	StringBuilder sb = new StringBuilder();
+    	sb.append(name_);
+    	sb.append(":");
+    	sb.append(isMarkedForDelete());
+    	sb.append(":");
+    	sb.append(timestamp());
+    	sb.append(":");
+    	sb.append(value().length);
+    	sb.append(":");
+    	sb.append(value());
+    	sb.append(":");
+    	return sb.toString();
+    }
+
+    public byte[] digest()
+    {
+    	StringBuilder stringBuilder = new StringBuilder();
+  		stringBuilder.append(name_);
+  		stringBuilder.append(seperator_);
+  		stringBuilder.append(timestamp_);
+    	return stringBuilder.toString().getBytes();
+    }
+    
+    /**
+     * This method is basically implemented for Writable interface
+     * for M/R. 
+     */
+    public void readFields(DataInput in) throws IOException
+    {
+        name_ = in.readUTF();
+        boolean delete = in.readBoolean();
+        long ts = in.readLong();
+        int size = in.readInt();
+        byte[] value = new byte[size];
+        in.readFully(value);        
+        if ( delete )
+            delete();
+    }
+    
+    /**
+     * This method is basically implemented for Writable interface
+     * for M/R. 
+     */
+    public void write(DataOutput out) throws IOException
+    {
+        out.writeUTF(name_);
+        out.writeBoolean(isMarkedForDelete());
+        out.writeLong(timestamp_);
+        out.writeInt(value().length);
+        out.write(value());
+    }
+
+}
+
+class ColumnSerializer implements ICompactSerializer2<IColumn>
+{
+    public void serialize(IColumn column, DataOutputStream dos) throws IOException
+    {
+        dos.writeUTF(column.name());
+        dos.writeBoolean(column.isMarkedForDelete());
+        dos.writeLong(column.timestamp());
+        dos.writeInt(column.value().length);
+        dos.write(column.value());
+    }
+
+    private IColumn defreeze(DataInputStream dis, String name) throws IOException
+    {
+        IColumn column = null;
+        boolean delete = dis.readBoolean();
+        long ts = dis.readLong();
+        int size = dis.readInt();
+        byte[] value = new byte[size];
+        dis.readFully(value);
+        column = new Column(name, value, ts);
+        if ( delete )
+            column.delete();
+        return column;
+    }
+
+    public IColumn deserialize(DataInputStream dis) throws IOException
+    {
+        String name = dis.readUTF();
+        return defreeze(dis, name);
+    }
+
+    /**
+     * Here we need to get the column and apply the filter.
+     */
+    public IColumn deserialize(DataInputStream dis, IFilter filter) throws IOException
+    {
+        if ( dis.available() == 0 )
+            return null;
+                
+        String name = dis.readUTF();
+        IColumn column = new Column(name);
+        column = filter.filter(column, dis);
+        if ( column != null )
+        {
+            column = defreeze(dis, name);
+        }
+        else
+        {
+        	/* Skip a boolean and the timestamp */
+        	dis.skip(DBConstants.boolSize_ + DBConstants.tsSize_);
+            int size = dis.readInt();
+            dis.skip(size);
+        }
+        return column;
+    }
+
+    /**
+     * We know the name of the column here so just return it.
+     * Filter is pretty much useless in this call and is ignored.
+     */
+    public IColumn deserialize(DataInputStream dis, String columnName, IFilter filter) throws IOException
+    {
+        if ( dis.available() == 0 )
+            return null;
+        IColumn column = null;
+        String name = dis.readUTF();
+        if ( name.equals(columnName) )
+        {
+            column = defreeze(dis, name);
+            if( filter instanceof IdentityFilter )
+            {
+            	/*
+            	 * If this is being called with identity filter
+            	 * since a column name is passed in we know
+            	 * that this is a final call 
+            	 * Hence if the column is found set the filter to done 
+            	 * so that we do not look for the column in further files
+            	 */
+            	IdentityFilter f = (IdentityFilter)filter;
+            	f.setDone();
+            }
+        }
+        else
+        {
+        	/* Skip a boolean and the timestamp */
+        	dis.skip(DBConstants.boolSize_ + DBConstants.tsSize_);
+            int size = dis.readInt();
+            dis.skip(size);
+        }
+        return column;
+    }
+
+    public void skip(DataInputStream dis) throws IOException
+    {
+    	/* read the column name */
+        dis.readUTF();
+        /* boolean indicating if the column is deleted */
+        dis.readBoolean();
+        /* timestamp associated with the column */
+        dis.readLong();
+        /* size of the column */
+        int size = dis.readInt();
+        dis.skip(size);
+    }    
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnComparatorFactory.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnComparatorFactory.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnComparatorFactory.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnComparatorFactory.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ColumnComparatorFactory
+{
+	public static enum ComparatorType
+	{
+		NAME,
+		TIMESTAMP
+	}
+
+	private static Comparator<IColumn> nameComparator_ = new ColumnNameComparator();
+	private static Comparator<IColumn> timestampComparator_ = new ColumnTimestampComparator();
+
+	public static Comparator<IColumn> getComparator(ComparatorType comparatorType)
+	{
+		Comparator<IColumn> columnComparator = timestampComparator_;
+
+		switch(comparatorType)
+		{
+			case NAME:
+				columnComparator = nameComparator_;
+				break;
+
+			case TIMESTAMP:
+
+			default:
+				columnComparator = timestampComparator_;
+				break;
+		}
+
+		return columnComparator;
+	}
+
+	public static Comparator<IColumn> getComparator(int comparatorTypeInt)
+	{
+		ComparatorType comparatorType = ComparatorType.NAME;
+
+		if(comparatorTypeInt == ComparatorType.NAME.ordinal())
+		{
+			comparatorType = ComparatorType.NAME;
+		}
+		else if(comparatorTypeInt == ComparatorType.TIMESTAMP.ordinal())
+		{
+			comparatorType = ComparatorType.TIMESTAMP;
+		}
+		return getComparator(comparatorType);
+	}
+
+	public static void main(String[] args)
+	{
+		IColumn col1 = new Column("Column-9");
+		IColumn col2 = new Column("Column-10");
+		System.out.println("Result of compare: " + getComparator(ComparatorType.NAME).compare(col1, col2));
+	}
+}
+
+abstract class AbstractColumnComparator implements Comparator<IColumn>, Serializable
+{
+	protected ColumnComparatorFactory.ComparatorType comparatorType_;
+
+	public AbstractColumnComparator(ColumnComparatorFactory.ComparatorType comparatorType)
+	{
+		comparatorType_ = comparatorType;
+	}
+
+	ColumnComparatorFactory.ComparatorType getComparatorType()
+	{
+		return comparatorType_;
+	}
+}
+
+class ColumnTimestampComparator extends AbstractColumnComparator
+{
+	ColumnTimestampComparator()
+	{
+		super(ColumnComparatorFactory.ComparatorType.TIMESTAMP);
+	}
+
+	/* if the time-stamps are the same then sort by names */
+    public int compare(IColumn column1, IColumn column2)
+    {
+    	/* inverse sort by time to get hte latest first */
+    	long result = column2.timestamp() - column1.timestamp();
+    	int finalResult = 0;
+    	if(result == 0)
+    	{
+    		result = column1.name().compareTo(column2.name());
+    	}
+    	if(result > 0)
+    	{
+    		finalResult = 1;
+    	}
+    	if( result < 0 )
+    	{
+    		finalResult = -1;
+    	}
+        return finalResult;
+    }
+}
+
+class ColumnNameComparator extends AbstractColumnComparator
+{
+	ColumnNameComparator()
+	{
+		super(ColumnComparatorFactory.ComparatorType.NAME);
+	}
+
+    /* if the names are the same then sort by time-stamps */
+    public int compare(IColumn column1, IColumn column2)
+    {
+    	long result = column1.name().compareTo(column2.name());
+    	int finalResult = 0;
+    	if(result == 0)
+    	{
+    		/* inverse sort by time to get hte latest first */
+    		result = column2.timestamp() - column1.timestamp();
+    	}
+    	if(result > 0)
+    	{
+    		finalResult = 1;
+    	}
+    	if( result < 0 )
+    	{
+    		finalResult = -1;
+    	}
+        return finalResult;
+    }
+}