You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 07:12:49 UTC

svn commit: r749205 [10/16] - in /incubator/cassandra/src/org/apache/cassandra: analytics/ cli/ concurrent/ config/ continuations/ cql/ cql/common/ cql/compiler/ cql/compiler/common/ cql/compiler/parse/ cql/compiler/sem/ cql/driver/ cql/execution/ dht/...

Added: incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/Cql__.g
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/Cql__.g?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/Cql__.g (added)
+++ incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/Cql__.g Mon Mar  2 06:12:46 2009
@@ -0,0 +1,100 @@
+lexer grammar Cql;
+@header {
+            package com.facebook.infrastructure.cql.compiler.parse;
+        }
+
+T47 : '=' ;
+T48 : '(' ;
+T49 : ')' ;
+T50 : '[' ;
+T51 : ']' ;
+T52 : '.' ;
+T53 : '?' ;
+
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 247
+K_BY:        'BY';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 248
+K_DELETE:    'DELETE';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 249
+K_EXPLAIN:   'EXPLAIN';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 250
+K_FROM:      'FROM';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 251
+K_GET:       'GET';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 252
+K_IN:        'IN';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 253
+K_LIMIT:     'LIMIT';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 254
+K_OFFSET:    'OFFSET';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 255
+K_ORDER:     'ORDER';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 256
+K_PLAN:      'PLAN';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 257
+K_SELECT:    'SELECT';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 258
+K_SET:       'SET';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 259
+K_WHERE:     'WHERE';
+
+// private syntactic rules
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 262
+fragment
+Letter
+    : 'a'..'z' 
+    | 'A'..'Z'
+    ;
+
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 268
+fragment
+Digit
+    : '0'..'9'
+    ;
+
+// syntactic Elements
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 274
+Identifier
+    : Letter ( Letter | Digit | '_')*
+    ;
+
+//
+// Literals 
+//
+
+// strings: escape single quote ' by repeating it '' (SQL style)
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 283
+StringLiteral
+    : '\'' (~'\'')* '\'' ( '\'' (~'\'')* '\'' )* 
+    ;
+
+// integer literals    
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 288
+IntegerLiteral
+    : Digit+
+    ;
+
+//
+// miscellaneous syntactic elements
+//
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 295
+WS
+    :  (' '|'\r'|'\t'|'\n') {skip();}  // whitepace
+    ;
+
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 299
+COMMENT 
+    : '--' (~('\n'|'\r'))*                     { $channel=HIDDEN; }
+    | '/*' (options {greedy=false;} : .)* '*/' { $channel=HIDDEN; }
+    ;
+
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 304
+ASSOC:        '=>';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 305
+COMMA:        ',';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 306
+LEFT_BRACE:   '{';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 307
+RIGHT_BRACE:  '}';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cql/compiler/parse/Cql.g" 308
+SEMICOLON:    ';';

Added: incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/ParseError.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/ParseError.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/ParseError.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/ParseError.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.compiler.parse;
+
+import org.antlr.runtime.*;
+
+public class ParseError {
+  private BaseRecognizer br;
+  private RecognitionException re;
+  private String[] tokenNames;
+  
+  public ParseError(BaseRecognizer br, RecognitionException re, String[] tokenNames) {
+    this.br = br;
+    this.re = re;
+    this.tokenNames = tokenNames;
+    }
+  
+  public BaseRecognizer getBaseRecognizer() {
+    return br;
+  }
+
+  public RecognitionException getRecognitionException() {
+    return re;
+  }
+  
+  public String[] getTokenNames() {
+    return tokenNames;
+  }
+
+  public String getMessage() {
+    return br.getErrorHeader(re) + " " + br.getErrorMessage(re, tokenNames);
+  }
+
+}

Added: incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/ParseException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/ParseException.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/ParseException.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/ParseException.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.compiler.parse;
+
+/**
+ * Exception from the CQL Parser
+ */
+
+import java.util.ArrayList;
+
+public class ParseException extends Exception {
+
+    private static final long serialVersionUID = 1L;
+    ArrayList<ParseError> errors = null;
+
+    public ParseException(ArrayList<ParseError> errors)
+    {
+      super();
+      this.errors = errors;
+    }
+
+    public ParseException(String message)
+    {
+        super(message);
+    }
+
+    public String getMessage() {
+
+      if (errors == null)
+          return super.getMessage();
+
+      StringBuilder sb = new StringBuilder();
+      for(ParseError err: errors) {
+        sb.append(err.getMessage());
+        sb.append("\n");
+      }
+
+      return sb.toString();
+    }
+
+}

Added: incubator/cassandra/src/org/apache/cassandra/cql/compiler/sem/SemanticException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cql/compiler/sem/SemanticException.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cql/compiler/sem/SemanticException.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cql/compiler/sem/SemanticException.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.compiler.sem;
+
+
+/**
+ * Exception from the CQL SemanticAnalyzer
+ */
+
+public class SemanticException extends Exception
+{
+    private static final long serialVersionUID = 1L;
+
+    public SemanticException()
+    {
+        super();
+    }
+    
+    public SemanticException(String message)
+    {
+        super(message);
+    }
+    
+    public SemanticException(Throwable cause)
+    {
+        super(cause);
+    }
+    
+    public SemanticException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/cql/compiler/sem/SemanticPhase.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cql/compiler/sem/SemanticPhase.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cql/compiler/sem/SemanticPhase.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cql/compiler/sem/SemanticPhase.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.compiler.sem;
+
+import java.util.Map;
+
+import org.antlr.runtime.tree.CommonTree;
+
+import org.apache.cassandra.cql.common.*;
+import org.apache.cassandra.cql.compiler.common.*;
+import org.apache.cassandra.cql.compiler.parse.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql.common.ColumnMapExpr;
+import org.apache.cassandra.cql.common.ColumnRangeQueryRSD;
+import org.apache.cassandra.cql.common.ConstantOperand;
+import org.apache.cassandra.cql.common.ExplainPlan;
+import org.apache.cassandra.cql.common.OperandDef;
+import org.apache.cassandra.cql.common.Pair;
+import org.apache.cassandra.cql.common.Plan;
+import org.apache.cassandra.cql.common.QueryPlan;
+import org.apache.cassandra.cql.common.RowSourceDef;
+import org.apache.cassandra.cql.common.SetColumnMap;
+import org.apache.cassandra.cql.common.SetSuperColumnMap;
+import org.apache.cassandra.cql.common.SetUniqueKey;
+import org.apache.cassandra.cql.common.SuperColumnMapExpr;
+import org.apache.cassandra.cql.common.SuperColumnRangeQueryRSD;
+import org.apache.cassandra.cql.common.UniqueKeyQueryRSD;
+import org.apache.cassandra.cql.common.Utils;
+import org.apache.cassandra.cql.compiler.common.CompilerErrorMsg;
+import org.apache.cassandra.cql.compiler.parse.CqlParser;
+import org.apache.log4j.Logger;
+
+//
+// Note: This class is CQL related work in progress.
+//
+// Currently, this phase combines both semantic analysis and code-gen.
+// I expect that as my ideas get refined/cleared up, I'll be drawing
+// a more clear distinction between semantic analysis phase and code-gen.
+//
+public class SemanticPhase
+{
+    private final static Logger logger_ = Logger.getLogger(SemanticPhase.class);    
+
+    // Current code-gen also happens in this phase!
+    public static Plan doSemanticAnalysis(CommonTree ast) throws SemanticException
+    {
+        Plan plan = null;
+
+        logger_.debug("AST: " + ast.toStringTree());
+
+        switch (ast.getType())
+        {
+        case CqlParser.A_GET:
+            plan = compileGet(ast);
+            break;
+        case CqlParser.A_SET:
+            plan = compileSet(ast);
+            break;
+        case CqlParser.A_DELETE:
+            compileDelete(ast);
+            break;
+        case CqlParser.A_SELECT:
+            compileSelect(ast);
+            break;
+        case CqlParser.A_EXPLAIN_PLAN:
+            // Case: EXPLAN PLAN <stmt>
+            // first, generate a plan for <stmt>
+            // and then, wrapper it with a special ExplainPlan plan
+            // whose execution will result in an explain plan rather
+            // than a normal execution of the statement.
+            plan = doSemanticAnalysis((CommonTree)(ast.getChild(0)));
+            plan = new ExplainPlan(plan);
+            break;
+        default:
+            // Unhandled AST node. Raise an internal error. 
+            throw new SemanticException(CompilerErrorMsg.INTERNAL_ERROR.getMsg(ast, "Unknown Node Type: " + ast.getType()));
+        }
+        return plan;
+    }
+
+    /** 
+     * Given a CommonTree AST node of type, A_COLUMN_ACCESS related functions, do semantic
+     * checking to ensure table name, column family name, and number of key dimensions
+     * specified are all valid. 
+     */
+    private static CFMetaData getColumnFamilyInfo(CommonTree ast) throws SemanticException
+    {
+        assert(ast.getType() == CqlParser.A_COLUMN_ACCESS);
+
+        CommonTree columnFamilyNode = (CommonTree)(ast.getChild(1)); 
+        CommonTree tableNode = (CommonTree)(ast.getChild(0));
+
+        String columnFamily = columnFamilyNode.getText();
+        String table = tableNode.getText();
+
+        Map<String, CFMetaData> columnFamilies = DatabaseDescriptor.getTableMetaData(table);
+        if (columnFamilies == null)
+        {
+            throw new SemanticException(CompilerErrorMsg.INVALID_TABLE.getMsg(ast, table));
+        }
+
+        CFMetaData cfMetaData = columnFamilies.get(columnFamily);
+        if (cfMetaData == null)
+        {
+            throw new SemanticException(CompilerErrorMsg.INVALID_COLUMN_FAMILY.getMsg(ast, columnFamily, table));
+        }
+
+        // Once you have drilled down to a row using a rowKey, a super column
+        // map can be indexed only 2 further levels deep; and a column map may
+        // be indexed up to 1 level deep.
+        int dimensions = numColumnDimensions(ast);
+        if (("Super".equals(cfMetaData.columnType) && (dimensions > 2)) ||
+            ("Standard".equals(cfMetaData.columnType) && dimensions > 1))
+        {
+            throw new SemanticException(CompilerErrorMsg.TOO_MANY_DIMENSIONS.getMsg(ast, cfMetaData.columnType));
+        }
+
+        return cfMetaData; 
+    }
+
+    private static String getRowKey(CommonTree ast)
+    {
+        assert(ast.getType() == CqlParser.A_COLUMN_ACCESS);
+        return Utils.unescapeSQLString(ast.getChild(2).getText());
+    }
+
+    private static int numColumnDimensions(CommonTree ast)
+    {
+        // Skip over table name, column family and rowKey
+        return ast.getChildCount() - 3;
+    }
+
+    // Returns the pos'th (0-based index) column specifier in the astNode
+    private static String getColumn(CommonTree ast, int pos)
+    {
+        // Skip over table name, column family and rowKey
+        return Utils.unescapeSQLString(ast.getChild(pos + 3).getText()); 
+    }
+
+    // Compile a GET statement
+    private static Plan compileGet(CommonTree ast) throws SemanticException
+    {
+        int childCount = ast.getChildCount();
+        assert(childCount == 1);
+
+        CommonTree columnFamilySpec = (CommonTree)ast.getChild(0);
+        assert(columnFamilySpec.getType() == CqlParser.A_COLUMN_ACCESS);
+
+        CFMetaData cfMetaData = getColumnFamilyInfo(columnFamilySpec);
+        ConstantOperand rowKey = new ConstantOperand(getRowKey(columnFamilySpec));
+        int dimensionCnt = numColumnDimensions(columnFamilySpec);
+
+        RowSourceDef rwsDef;
+        if ("Super".equals(cfMetaData.columnType))
+        {
+            if (dimensionCnt > 2)
+            {
+                // We don't expect this case to arise, since Cql.g grammar disallows this.
+                // therefore, raise this case as an "internal error".
+                throw new SemanticException(CompilerErrorMsg.INTERNAL_ERROR.getMsg(columnFamilySpec));
+            }
+
+            if (dimensionCnt == 2)
+            {
+                // Case: table.super_cf[<rowKey>][<superColumnKey>][<columnKey>]
+                ConstantOperand superColumnKey = new ConstantOperand(getColumn(columnFamilySpec, 0));                
+                ConstantOperand columnKey = new ConstantOperand(getColumn(columnFamilySpec, 1));
+                rwsDef = new UniqueKeyQueryRSD(cfMetaData, rowKey, superColumnKey, columnKey);
+            }
+            else if (dimensionCnt == 1)
+            {
+                // Case: table.super_cf[<rowKey>][<superColumnKey>]
+                ConstantOperand superColumnKey = new ConstantOperand(getColumn(columnFamilySpec, 0));                
+                rwsDef = new ColumnRangeQueryRSD(cfMetaData, rowKey, superColumnKey, -1, Integer.MAX_VALUE);
+            }
+            else
+            {
+                // Case: table.super_cf[<rowKey>]             
+                rwsDef = new SuperColumnRangeQueryRSD(cfMetaData, rowKey, -1, Integer.MAX_VALUE);
+            }
+        }
+        else  // Standard Column Family
+        {
+            if (dimensionCnt == 1)
+            {
+                // Case: table.standard_cf[<rowKey>][<columnKey>]
+                ConstantOperand columnKey = new ConstantOperand(getColumn(columnFamilySpec, 0));
+                rwsDef = new UniqueKeyQueryRSD(cfMetaData, rowKey, columnKey);
+            }
+            else
+            {
+                // Case: table.standard_cf[<rowKey>]
+                logger_.assertLog((dimensionCnt == 0), "invalid dimensionCnt: " + dimensionCnt);
+                rwsDef = new ColumnRangeQueryRSD(cfMetaData, rowKey, -1, Integer.MAX_VALUE);
+            }
+        }
+        return new QueryPlan(rwsDef);
+    }
+    
+    private static OperandDef  getSimpleExpr(CommonTree ast) throws SemanticException
+    {
+        int type = ast.getType();
+
+        // for now, the only simple expressions support are of string type
+        if (type != CqlParser.StringLiteral)
+        {
+            throw new SemanticException(CompilerErrorMsg.INVALID_TYPE.getMsg(ast));
+        }
+        return new ConstantOperand(Utils.unescapeSQLString(ast.getText()));
+    }
+
+    private static ColumnMapExpr getColumnMapExpr(CommonTree ast) throws SemanticException
+    {
+        int type = ast.getType();
+        if (type != CqlParser.A_COLUMN_MAP_VALUE)
+        {
+            throw new SemanticException(CompilerErrorMsg.INVALID_TYPE.getMsg(ast));
+        }
+        
+        int size = ast.getChildCount();
+        ColumnMapExpr result = new ColumnMapExpr();
+        for (int idx = 0; idx < size; idx++)
+        {
+            CommonTree entryNode = (CommonTree)(ast.getChild(idx));
+            OperandDef columnKey   = getSimpleExpr((CommonTree)(entryNode.getChild(0)));
+            OperandDef columnValue = getSimpleExpr((CommonTree)(entryNode.getChild(1)));            
+
+            Pair<OperandDef, OperandDef> entry = new Pair<OperandDef, OperandDef>(columnKey, columnValue);
+            result.add(entry);
+        }
+        return result;
+    }
+
+    private static SuperColumnMapExpr getSuperColumnMapExpr(CommonTree ast) throws SemanticException
+    {
+        int type = ast.getType();        
+        if (type != CqlParser.A_SUPERCOLUMN_MAP_VALUE)
+        {
+            throw new SemanticException(CompilerErrorMsg.INVALID_TYPE.getMsg(ast));
+        }
+        int size = ast.getChildCount();
+        SuperColumnMapExpr result = new SuperColumnMapExpr();
+        for (int idx = 0; idx < size; idx++)
+        {
+            CommonTree entryNode = (CommonTree)(ast.getChild(idx));
+            OperandDef    superColumnKey = getSimpleExpr((CommonTree)(entryNode.getChild(0)));
+            ColumnMapExpr columnMapExpr  = getColumnMapExpr((CommonTree)(entryNode.getChild(1)));            
+
+            Pair<OperandDef, ColumnMapExpr> entry = new Pair<OperandDef, ColumnMapExpr>(superColumnKey, columnMapExpr);
+            result.add(entry);
+        }
+        return result;
+    }
+
+    // compile a SET statement
+    private static Plan compileSet(CommonTree ast) throws SemanticException
+    {
+        int childCount = ast.getChildCount();
+        assert(childCount == 2);
+
+        CommonTree columnFamilySpec = (CommonTree)ast.getChild(0);
+        assert(columnFamilySpec.getType() == CqlParser.A_COLUMN_ACCESS);
+
+        CFMetaData cfMetaData = getColumnFamilyInfo(columnFamilySpec);
+        ConstantOperand rowKey = new ConstantOperand(getRowKey(columnFamilySpec));
+        int dimensionCnt = numColumnDimensions(columnFamilySpec);
+
+        CommonTree  valueNode = (CommonTree)(ast.getChild(1));
+
+        Plan plan = null;
+        if ("Super".equals(cfMetaData.columnType))
+        {
+            if (dimensionCnt == 2)
+            {
+                // Case: set table.super_cf['key']['supercolumn']['column'] = 'value'
+                OperandDef value = getSimpleExpr(valueNode);
+                ConstantOperand superColumnKey = new ConstantOperand(getColumn(columnFamilySpec, 0));
+                ConstantOperand columnKey = new ConstantOperand(getColumn(columnFamilySpec, 1));
+                plan = new SetUniqueKey(cfMetaData, rowKey, superColumnKey, columnKey, value);
+            }
+            else if (dimensionCnt == 1)
+            {
+                // Case: set table.super_cf['key']['supercolumn'] = <column_map>;
+                ColumnMapExpr columnMapExpr = getColumnMapExpr(valueNode);                
+                ConstantOperand superColumnKey = new ConstantOperand(getColumn(columnFamilySpec, 0));
+                plan = new SetColumnMap(cfMetaData, rowKey, superColumnKey, columnMapExpr);
+            }
+            else
+            {
+                // Case: set table.super_cf['key'] = <super_column_map>;
+                logger_.assertLog(dimensionCnt == 0, "invalid dimensionCnt: " + dimensionCnt);
+                SuperColumnMapExpr superColumnMapExpr = getSuperColumnMapExpr(valueNode);                
+                plan = new SetSuperColumnMap(cfMetaData, rowKey, superColumnMapExpr);
+            }
+        }
+        else  // Standard column family
+        {
+            if (dimensionCnt == 1)
+            {
+                // Case: set table.standard_cf['key']['column'] = 'value'
+                OperandDef value = getSimpleExpr(valueNode);                
+                ConstantOperand columnKey = new ConstantOperand(getColumn(columnFamilySpec, 0));
+                plan = new SetUniqueKey(cfMetaData, rowKey, columnKey, value);
+            } 
+            else
+            {
+                // Case: set table.standard_cf['key'] = <column_map>;
+                logger_.assertLog(dimensionCnt == 0, "invalid dimensionCnt: " + dimensionCnt);
+                ColumnMapExpr columnMapExpr = getColumnMapExpr(valueNode);                
+                plan = new SetColumnMap(cfMetaData, rowKey, columnMapExpr);
+            }
+        }
+        return plan;
+    }
+
+    private static void compileSelect(CommonTree ast) throws SemanticException
+    {
+        // stub; tbd.
+    }
+    private static void compileDelete(CommonTree ast) throws SemanticException
+    {
+        // stub; tbd.
+    }
+}
\ No newline at end of file

Added: incubator/cassandra/src/org/apache/cassandra/cql/driver/CqlDriver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cql/driver/CqlDriver.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cql/driver/CqlDriver.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cql/driver/CqlDriver.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql.driver;
+
+import org.apache.cassandra.cql.compiler.common.*;
+import org.apache.cassandra.cql.compiler.parse.*;
+import org.apache.cassandra.cql.compiler.sem.*;
+import org.apache.cassandra.cql.common.*;
+import com.facebook.thrift.*;
+
+import org.apache.cassandra.cql.common.CqlResult;
+import org.apache.cassandra.cql.common.Plan;
+import org.apache.cassandra.cql.compiler.common.CqlCompiler;
+import org.apache.cassandra.cql.compiler.parse.ParseException;
+import org.apache.cassandra.cql.compiler.sem.SemanticException;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+// Server side driver class for CQL
+public class CqlDriver 
+{
+    private final static Logger logger_ = Logger.getLogger(CqlDriver.class);
+
+    // Execute a CQL Statement 
+    public static CqlResult executeQuery(String query) throws TException 
+    {
+        CqlCompiler compiler = new CqlCompiler();
+
+        try
+        {
+            logger_.debug("Compiling CQL query ...");
+            Plan plan = compiler.compileQuery(query);
+            if (plan != null)
+            {
+                logger_.debug("Executing CQL query ...");            
+                return plan.execute();
+            }
+        }
+        catch (Exception e)
+        {
+            CqlResult result = new CqlResult(null);
+            result.errorTxt = e.getMessage();           
+
+            Class<? extends Exception> excpClass = e.getClass();
+            if ((excpClass != SemanticException.class)
+                && (excpClass != ParseException.class)
+                && (excpClass != RuntimeException.class))
+            {
+                result.errorTxt = "CQL Internal Error: " + result.errorTxt;
+                result.errorCode = 1; // failure
+                logger_.error(LogUtil.throwableToString(e));
+            }
+
+            return result;
+        }
+
+        return null;
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/cql/execution/RuntimeErrorMsg.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cql/execution/RuntimeErrorMsg.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cql/execution/RuntimeErrorMsg.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cql/execution/RuntimeErrorMsg.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql.execution;
+
+/**
+ * List of error messages thrown by CQL's Execution Layer
+ **/
+public enum RuntimeErrorMsg
+{
+    // Error messages with String.format() style format specifiers
+    GENERIC_ERROR("CQL Execution Error"),
+    INTERNAL_ERROR("CQL Internal Error: %s"),
+    IMPLEMENTATION_RESTRICTION("Implementation Restriction: %s"),
+    NO_DATA_FOUND("No data found")
+    ;
+
+    private String mesg;
+    RuntimeErrorMsg(String mesg) 
+    {
+        this.mesg = mesg;
+    }
+
+    // Returns the formatted error message. 
+    public String getMsg(Object... args)
+    {
+        // note: mesg itself might contain other format specifiers...
+        return String.format(mesg, args);
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/dht/BootStrapper.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/dht/BootStrapper.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/dht/BootStrapper.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,150 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This class handles the boostrapping responsibilities for
+ * any new endpoint.
+*/
+public class BootStrapper implements Runnable
+{
+    private static Logger logger_ = Logger.getLogger(BootStrapper.class);
+    /* endpoints that need to be bootstrapped */
+    protected EndPoint[] targets_ = new EndPoint[0];
+    /* tokens of the nodes being bootstapped. */
+    protected BigInteger[] tokens_ = new BigInteger[0];
+    protected TokenMetadata tokenMetadata_ = null;
+    private List<EndPoint> filters_ = new ArrayList<EndPoint>();
+
+    public BootStrapper(EndPoint[] target, BigInteger[] token)
+    {
+        targets_ = target;
+        tokens_ = token;
+        tokenMetadata_ = StorageService.instance().getTokenMetadata();
+    }
+    
+    public BootStrapper(EndPoint[] target, BigInteger[] token, EndPoint[] filters)
+    {
+        this(target, token);
+        Collections.addAll(filters_, filters);
+    }
+
+    public void run()
+    {
+        try
+        {
+            logger_.debug("Beginning bootstrap process for " + targets_ + " ...");                                                               
+            /* copy the token to endpoint map */
+            Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+            /* remove the tokens associated with the endpoints being bootstrapped */                
+            for ( BigInteger token : tokens_ )
+            {
+                tokenToEndPointMap.remove(token);                    
+            }
+
+            Set<BigInteger> oldTokens = new HashSet<BigInteger>( tokenToEndPointMap.keySet() );
+            Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
+            logger_.debug("Total number of old ranges " + oldRanges.length);
+            /* 
+             * Find the ranges that are split. Maintain a mapping between
+             * the range being split and the list of subranges.
+            */                
+            Map<Range, List<Range>> splitRanges = LeaveJoinProtocolHelper.getRangeSplitRangeMapping(oldRanges, tokens_);                                                      
+            /* Calculate the list of nodes that handle the old ranges */
+            Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges, tokenToEndPointMap);
+            /* Mapping of split ranges to the list of endpoints responsible for the range */                
+            Map<Range, List<EndPoint>> replicasForSplitRanges = new HashMap<Range, List<EndPoint>>();                                
+            Set<Range> rangesSplit = splitRanges.keySet();                
+            for ( Range splitRange : rangesSplit )
+            {
+                replicasForSplitRanges.put( splitRange, oldRangeToEndPointMap.get(splitRange) );
+            }                
+            /* Remove the ranges that are split. */
+            for ( Range splitRange : rangesSplit )
+            {
+                oldRangeToEndPointMap.remove(splitRange);
+            }
+            
+            /* Add the subranges of the split range to the map with the same replica set. */
+            for ( Range splitRange : rangesSplit )
+            {
+                List<Range> subRanges = splitRanges.get(splitRange);
+                List<EndPoint> replicas = replicasForSplitRanges.get(splitRange);
+                for ( Range subRange : subRanges )
+                {
+                    /* Make sure we clone or else we are hammered. */
+                    oldRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
+                }
+            }                
+            
+            /* Add the new token and re-calculate the range assignments */
+            Collections.addAll( oldTokens, tokens_ );
+            Range[] newRanges = StorageService.instance().getAllRanges(oldTokens);
+
+            logger_.debug("Total number of new ranges " + newRanges.length);
+            /* Calculate the list of nodes that handle the new ranges */
+            Map<Range, List<EndPoint>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges);
+            /* Calculate ranges that need to be sent and from whom to where */
+            Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = LeaveJoinProtocolHelper.getRangeSourceTargetInfo(oldRangeToEndPointMap, newRangeToEndPointMap);
+            /* Send messages to respective folks to stream data over to the new nodes being bootstrapped */
+            LeaveJoinProtocolHelper.assignWork(rangesWithSourceTarget, filters_);                
+        }
+        catch ( Throwable th )
+        {
+            logger_.debug( LogUtil.throwableToString(th) );
+        }
+    }
+ 
+    private Range getMyOldRange()
+    {
+        Map<EndPoint, BigInteger> oldEndPointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
+        Map<BigInteger, EndPoint> oldTokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+
+        oldEndPointToTokenMap.remove(targets_);
+        oldTokenToEndPointMap.remove(tokens_);
+
+        BigInteger myToken = oldEndPointToTokenMap.get(StorageService.getLocalStorageEndPoint());
+        List<BigInteger> allTokens = new ArrayList<BigInteger>(oldTokenToEndPointMap.keySet());
+        Collections.sort(allTokens);
+        int index = Collections.binarySearch(allTokens, myToken);
+        /* Calculate the lhs for the range */
+        BigInteger lhs = (index == 0) ? allTokens.get(allTokens.size() - 1) : allTokens.get( index - 1);
+        return new Range( lhs, myToken );
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/dht/BootstrapInitiateMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/dht/BootstrapInitiateMessage.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/dht/BootstrapInitiateMessage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/dht/BootstrapInitiateMessage.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.net.io.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class BootstrapInitiateMessage implements Serializable
+{
+    private static ICompactSerializer<BootstrapInitiateMessage> serializer_;
+    
+    static
+    {
+        serializer_ = new BootstrapInitiateMessageSerializer();
+    }
+    
+    public static ICompactSerializer<BootstrapInitiateMessage> serializer()
+    {
+        return serializer_;
+    }
+    
+    public static Message makeBootstrapInitiateMessage(BootstrapInitiateMessage biMessage) throws IOException
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream( bos );
+        BootstrapInitiateMessage.serializer().serialize(biMessage, dos);
+        return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateVerbHandler_, new Object[]{bos.toByteArray()} );
+    }
+    
+    protected StreamContextManager.StreamContext[] streamContexts_ = new StreamContextManager.StreamContext[0];
+   
+    public BootstrapInitiateMessage(StreamContextManager.StreamContext[] streamContexts)
+    {
+        streamContexts_ = streamContexts;
+    }
+    
+    public StreamContextManager.StreamContext[] getStreamContext()
+    {
+        return streamContexts_;
+    }
+}
+
+class BootstrapInitiateMessageSerializer implements ICompactSerializer<BootstrapInitiateMessage>
+{
+    public void serialize(BootstrapInitiateMessage bim, DataOutputStream dos) throws IOException
+    {
+        dos.writeInt(bim.streamContexts_.length);
+        for ( StreamContextManager.StreamContext streamContext : bim.streamContexts_ )
+        {
+            StreamContextManager.StreamContext.serializer().serialize(streamContext, dos);
+        }
+    }
+    
+    public BootstrapInitiateMessage deserialize(DataInputStream dis) throws IOException
+    {
+        int size = dis.readInt();
+        StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[0];
+        if ( size > 0 )
+        {
+            streamContexts = new StreamContextManager.StreamContext[size];
+            for ( int i = 0; i < size; ++i )
+            {
+                streamContexts[i] = StreamContextManager.StreamContext.serializer().deserialize(dis);
+            }
+        }
+        return new BootstrapInitiateMessage(streamContexts);
+    }
+}
+

Added: incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadata.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadata.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadata.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,102 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+
+
+
+/**
+ * This encapsulates information of the list of 
+ * ranges that a target node requires in order to 
+ * be bootstrapped. This will be bundled in a 
+ * BootstrapMetadataMessage and sent to nodes that
+ * are going to handoff the data.
+*/
+class BootstrapMetadata
+{
+    private static ICompactSerializer<BootstrapMetadata> serializer_;
+    static
+    {
+        serializer_ = new BootstrapMetadataSerializer();
+    }
+    
+    protected static ICompactSerializer<BootstrapMetadata> serializer()
+    {
+        return serializer_;
+    }
+    
+    protected EndPoint target_;
+    protected List<Range> ranges_;
+    
+    BootstrapMetadata(EndPoint target, List<Range> ranges)
+    {
+        target_ = target;
+        ranges_ = ranges;
+    }
+    
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder("");
+        sb.append(target_);
+        sb.append("------->");
+        for ( Range range : ranges_ )
+        {
+            sb.append(range);
+            sb.append(" ");
+        }
+        return sb.toString();
+    }
+}
+
+class BootstrapMetadataSerializer implements ICompactSerializer<BootstrapMetadata>
+{
+    public void serialize(BootstrapMetadata bsMetadata, DataOutputStream dos) throws IOException
+    {
+        CompactEndPointSerializationHelper.serialize(bsMetadata.target_, dos);
+        int size = (bsMetadata.ranges_ == null) ? 0 : bsMetadata.ranges_.size();            
+        dos.writeInt(size);
+        
+        for ( Range range : bsMetadata.ranges_ )
+        {
+            Range.serializer().serialize(range, dos);
+        }            
+    }
+
+    public BootstrapMetadata deserialize(DataInputStream dis) throws IOException
+    {            
+        EndPoint target = CompactEndPointSerializationHelper.deserialize(dis);
+        int size = dis.readInt();
+        List<Range> ranges = (size == 0) ? null : new ArrayList<Range>();
+        for( int i = 0; i < size; ++i )
+        {
+            ranges.add(Range.serializer().deserialize(dis));
+        }            
+        return new BootstrapMetadata( target, ranges );
+    }
+}
+

Added: incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadataMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadataMessage.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadataMessage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadataMessage.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,90 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+
+
+
+/**
+ * This class encapsulates the message that needs to be sent
+ * to nodes that handoff data. The message contains information
+ * about the node to be bootstrapped and the ranges with which
+ * it needs to be bootstrapped.
+*/
+class BootstrapMetadataMessage
+{
+    private static ICompactSerializer<BootstrapMetadataMessage> serializer_;
+    static
+    {
+        serializer_ = new BootstrapMetadataMessageSerializer();
+    }
+    
+    protected static ICompactSerializer<BootstrapMetadataMessage> serializer()
+    {
+        return serializer_;
+    }
+    
+    protected static Message makeBootstrapMetadataMessage(BootstrapMetadataMessage bsMetadataMessage) throws IOException
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream( bos );
+        BootstrapMetadataMessage.serializer().serialize(bsMetadataMessage, dos);
+        return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bsMetadataVerbHandler_, new Object[]{bos.toByteArray()} );            
+    }        
+    
+    protected BootstrapMetadata[] bsMetadata_ = new BootstrapMetadata[0];
+    
+    BootstrapMetadataMessage(BootstrapMetadata[] bsMetadata)
+    {
+        bsMetadata_ = bsMetadata;
+    }
+}
+
+class BootstrapMetadataMessageSerializer implements ICompactSerializer<BootstrapMetadataMessage>
+{
+    public void serialize(BootstrapMetadataMessage bsMetadataMessage, DataOutputStream dos) throws IOException
+    {
+        BootstrapMetadata[] bsMetadata = bsMetadataMessage.bsMetadata_;
+        int size = (bsMetadata == null) ? 0 : bsMetadata.length;
+        dos.writeInt(size);
+        for ( BootstrapMetadata bsmd : bsMetadata )
+        {
+            BootstrapMetadata.serializer().serialize(bsmd, dos);
+        }
+    }
+
+    public BootstrapMetadataMessage deserialize(DataInputStream dis) throws IOException
+    {            
+        int size = dis.readInt();
+        BootstrapMetadata[] bsMetadata = new BootstrapMetadata[size];
+        for ( int i = 0; i < size; ++i )
+        {
+            bsMetadata[i] = BootstrapMetadata.serializer().deserialize(dis);
+        }
+        return new BootstrapMetadataMessage(bsMetadata);
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,163 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.service.StreamManager;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * This verb handler handles the BootstrapMetadataMessage that is sent
+ * by the leader to the nodes that are responsible for handing off data. 
+*/
+public class BootstrapMetadataVerbHandler implements IVerbHandler
+{
+    private static Logger logger_ = Logger.getLogger(BootstrapMetadataVerbHandler.class);
+    
+    public void doVerb(Message message)
+    {
+        logger_.debug("Received a BootstrapMetadataMessage from " + message.getFrom());
+        byte[] body = (byte[])message.getMessageBody()[0];
+        DataInputBuffer bufIn = new DataInputBuffer();
+        bufIn.reset(body, body.length);
+        try
+        {
+            BootstrapMetadataMessage bsMetadataMessage = BootstrapMetadataMessage.serializer().deserialize(bufIn);
+            BootstrapMetadata[] bsMetadata = bsMetadataMessage.bsMetadata_;
+            
+            /*
+             * This is for debugging purposes. Remove later.
+            */
+            for ( BootstrapMetadata bsmd : bsMetadata )
+            {
+                logger_.debug(bsmd.toString());                                      
+            }
+            
+            for ( BootstrapMetadata bsmd : bsMetadata )
+            {
+                long startTime = System.currentTimeMillis();
+                doTransfer(bsmd.target_, bsmd.ranges_);     
+                logger_.debug("Time taken to boostrap " + 
+                        bsmd.target_ + 
+                        " is " + 
+                        (System.currentTimeMillis() - startTime) +
+                        " msecs.");
+            }
+        }
+        catch ( IOException ex )
+        {
+            logger_.info(LogUtil.throwableToString(ex));
+        }
+    }
+    
+    /*
+     * This method needs to figure out the files on disk
+     * locally for each range and then stream them using
+     * the Bootstrap protocol to the target endpoint.
+    */
+    private void doTransfer(EndPoint target, List<Range> ranges) throws IOException
+    {
+        if ( ranges.size() == 0 )
+        {
+            logger_.debug("No ranges to give scram ...");
+            return;
+        }
+        
+        /* Just for debugging process - remove later */            
+        for ( Range range : ranges )
+        {
+            StringBuilder sb = new StringBuilder("");                
+            sb.append(range.toString());
+            sb.append(" ");            
+            logger_.debug("Beginning transfer process to " + target + " for ranges " + sb.toString());                
+        }
+      
+        /*
+         * (1) First we dump all the memtables to disk.
+         * (2) Run a version of compaction which will basically
+         *     put the keys in the range specified into a directory
+         *     named as per the endpoint it is destined for inside the
+         *     bootstrap directory.
+         * (3) Handoff the data.
+        */
+        List<String> tables = DatabaseDescriptor.getTables();
+        for ( String tName : tables )
+        {
+            Table table = Table.open(tName);
+            logger_.debug("Flushing memtables ...");
+            table.flush(false);
+            logger_.debug("Forcing compaction ...");
+            /* Get the counting bloom filter for each endpoint and the list of files that need to be streamed */
+            List<String> fileList = new ArrayList<String>();
+            boolean bVal = table.forceCompaction(ranges, target, fileList);                
+            doHandoff(target, fileList);
+        }
+    }
+
+    /**
+     * Stream the files in the bootstrap directory over to the
+     * node being bootstrapped.
+    */
+    private void doHandoff(EndPoint target, List<String> fileList) throws IOException
+    {
+        List<File> filesList = new ArrayList<File>();
+        for(String file : fileList)
+        {
+            filesList.add(new File(file));
+        }
+        File[] files = filesList.toArray(new File[0]);
+        StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[files.length];
+        int i = 0;
+        for ( File file : files )
+        {
+            streamContexts[i] = new StreamContextManager.StreamContext(file.getAbsolutePath(), file.length());
+            logger_.debug("Stream context metadata " + streamContexts[i]);
+            ++i;
+        }
+        
+        if ( files.length > 0 )
+        {
+            /* Set up the stream manager with the files that need to streamed */
+            StreamManager.instance(target).addFilesToStream(streamContexts);
+            /* Send the bootstrap initiate message */
+            BootstrapInitiateMessage biMessage = new BootstrapInitiateMessage(streamContexts);
+            Message message = BootstrapInitiateMessage.makeBootstrapInitiateMessage(biMessage);
+            logger_.debug("Sending a bootstrap initiate message to " + target + " ...");
+            MessagingService.getMessagingInstance().sendOneWay(message, target);                
+            logger_.debug("Waiting for transfer to " + target + " to complete");
+            StreamManager.instance(target).waitForStreamCompletion();
+            logger_.debug("Done with transfer to " + target);  
+        }
+    }
+}
+

Added: incubator/cassandra/src/org/apache/cassandra/dht/BootstrapSourceTarget.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/dht/BootstrapSourceTarget.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/dht/BootstrapSourceTarget.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/dht/BootstrapSourceTarget.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,49 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * This class encapsulates who is the source and the
+ * target of a bootstrap for a particular range.
+ */
+class BootstrapSourceTarget
+{
+    protected EndPoint source_;
+    protected EndPoint target_;
+    
+    BootstrapSourceTarget(EndPoint source, EndPoint target)
+    {
+        source_ = source;
+        target_ = target;
+    }
+    
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder("");
+        sb.append("SOURCE: ");
+        sb.append(source_);
+        sb.append(" ----> ");
+        sb.append("TARGET: ");
+        sb.append(target_);
+        sb.append(" ");
+        return sb.toString();
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,225 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.log4j.Logger;
+
+
+class LeaveJoinProtocolHelper
+{
+    private static Logger logger_ = Logger.getLogger(LeaveJoinProtocolHelper.class);
+    
+    /**
+     * Give a range a-------------b which is being split as
+     * a-----x-----y-----b then we want a mapping from 
+     * (a, b] --> (a, x], (x, y], (y, b] 
+    */
+    protected static Map<Range, List<Range>> getRangeSplitRangeMapping(Range[] oldRanges, BigInteger[] allTokens)
+    {
+        Map<Range, List<Range>> splitRanges = new HashMap<Range, List<Range>>();
+        BigInteger[] tokens = new BigInteger[allTokens.length];
+        System.arraycopy(allTokens, 0, tokens, 0, tokens.length);
+        Arrays.sort(tokens);
+        
+        Range prevRange = null;
+        BigInteger prevToken = null;
+        boolean bVal = false;
+        
+        for ( Range oldRange : oldRanges )
+        {
+            if ( bVal && prevRange != null )
+            {
+                bVal = false; 
+                List<Range> subRanges = splitRanges.get(prevRange);
+                if ( subRanges != null )
+                    subRanges.add( new Range(prevToken, prevRange.right()) );     
+            }
+            
+            prevRange = oldRange;
+            prevToken = oldRange.left();                
+            for ( BigInteger token : tokens )
+            {     
+                List<Range> subRanges = splitRanges.get(oldRange);
+                if ( oldRange.contains(token) )
+                {                        
+                    if ( subRanges == null )
+                    {
+                        subRanges = new ArrayList<Range>();
+                        splitRanges.put(oldRange, subRanges);
+                    }                            
+                    subRanges.add( new Range(prevToken, token) );
+                    prevToken = token;
+                    bVal = true;
+                }
+                else
+                {
+                    if ( bVal )
+                    {
+                        bVal = false;                                                                                
+                        subRanges.add( new Range(prevToken, oldRange.right()) );                            
+                    }
+                }
+            }
+        }
+        /* This is to handle the last range being processed. */
+        if ( bVal )
+        {
+            bVal = false; 
+            List<Range> subRanges = splitRanges.get(prevRange);
+            subRanges.add( new Range(prevToken, prevRange.right()) );                            
+        }
+        return splitRanges;
+    }
+    
+    protected static Map<Range, List<BootstrapSourceTarget>> getRangeSourceTargetInfo(Map<Range, List<EndPoint>> oldRangeToEndPointMap, Map<Range, List<EndPoint>> newRangeToEndPointMap)
+    {
+        Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = new HashMap<Range, List<BootstrapSourceTarget>>();
+        /*
+         * Basically calculate for each range the endpoints handling the
+         * range in the old token set and in the new token set. Whoever
+         * gets bumped out of the top N will have to hand off that range
+         * to the new dude.
+        */
+        Set<Range> oldRangeSet = oldRangeToEndPointMap.keySet();
+        for(Range range : oldRangeSet)
+        {
+            logger_.debug("Attempting to figure out the dudes who are bumped out for " + range + " ...");
+            List<EndPoint> oldEndPoints = oldRangeToEndPointMap.get(range);
+            List<EndPoint> newEndPoints = newRangeToEndPointMap.get(range);
+            if ( newEndPoints != null )
+            {                        
+                List<EndPoint> newEndPoints2 = new ArrayList<EndPoint>(newEndPoints);
+                for ( EndPoint newEndPoint : newEndPoints2 )
+                {
+                    if ( oldEndPoints.contains(newEndPoint) )
+                    {
+                        oldEndPoints.remove(newEndPoint);
+                        newEndPoints.remove(newEndPoint);
+                    }
+                }                        
+            }
+            else
+            {
+                logger_.warn("Trespassing - scram");
+            }
+            logger_.debug("Done figuring out the dudes who are bumped out for range " + range + " ...");
+        }
+        for ( Range range : oldRangeSet )
+        {                    
+            List<EndPoint> oldEndPoints = oldRangeToEndPointMap.get(range);
+            List<EndPoint> newEndPoints = newRangeToEndPointMap.get(range);
+            List<BootstrapSourceTarget> srcTarget = rangesWithSourceTarget.get(range);
+            if ( srcTarget == null )
+            {
+                srcTarget = new ArrayList<BootstrapSourceTarget>();
+                rangesWithSourceTarget.put(range, srcTarget);
+            }
+            int i = 0;
+            for ( EndPoint oldEndPoint : oldEndPoints )
+            {                        
+                srcTarget.add( new BootstrapSourceTarget(oldEndPoint, newEndPoints.get(i++)) );
+            }
+        }
+        return rangesWithSourceTarget;
+    }
+    
+    /**
+     * This method sends messages out to nodes instructing them 
+     * to stream the specified ranges to specified target nodes. 
+    */
+    protected static void assignWork(Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget) throws IOException
+    {
+        assignWork(rangesWithSourceTarget, null);
+    }
+    
+    /**
+     * This method sends messages out to nodes instructing them 
+     * to stream the specified ranges to specified target nodes. 
+    */
+    protected static void assignWork(Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget, List<EndPoint> filters) throws IOException
+    {
+        /*
+         * Map whose key is the source node and the value is a map whose key is the
+         * target and value is the list of ranges to be sent to it. 
+        */
+        Map<EndPoint, Map<EndPoint, List<Range>>> rangeInfo = new HashMap<EndPoint, Map<EndPoint, List<Range>>>();
+        Set<Range> ranges = rangesWithSourceTarget.keySet();
+        
+        for ( Range range : ranges )
+        {
+            List<BootstrapSourceTarget> rangeSourceTargets = rangesWithSourceTarget.get(range);
+            for ( BootstrapSourceTarget rangeSourceTarget : rangeSourceTargets )
+            {
+                Map<EndPoint, List<Range>> targetRangeMap = rangeInfo.get(rangeSourceTarget.source_);
+                if ( targetRangeMap == null )
+                {
+                    targetRangeMap = new HashMap<EndPoint, List<Range>>();
+                    rangeInfo.put(rangeSourceTarget.source_, targetRangeMap);
+                }
+                List<Range> rangesToGive = targetRangeMap.get(rangeSourceTarget.target_);
+                if ( rangesToGive == null )
+                {
+                    rangesToGive = new ArrayList<Range>();
+                    targetRangeMap.put(rangeSourceTarget.target_, rangesToGive);
+                }
+                rangesToGive.add(range);
+            }
+        }
+        
+        Set<EndPoint> sources = rangeInfo.keySet();
+        for ( EndPoint source : sources )
+        {
+            /* only send the message to the nodes that are in the filter. */
+            if ( filters != null && filters.size() > 0 && !filters.contains(source) )
+            {
+                logger_.debug("Filtering endpoint " + source + " as source ...");
+                continue;
+            }
+            
+            Map<EndPoint, List<Range>> targetRangesMap = rangeInfo.get(source);
+            Set<EndPoint> targets = targetRangesMap.keySet();
+            List<BootstrapMetadata> bsmdList = new ArrayList<BootstrapMetadata>();
+            
+            for ( EndPoint target : targets )
+            {
+                List<Range> rangeForTarget = targetRangesMap.get(target);
+                BootstrapMetadata bsMetadata = new BootstrapMetadata(target, rangeForTarget);
+                bsmdList.add(bsMetadata);
+            }
+            
+            BootstrapMetadataMessage bsMetadataMessage = new BootstrapMetadataMessage(bsmdList.toArray( new BootstrapMetadata[0] ) );
+            /* Send this message to the source to do his shit. */
+            Message message = BootstrapMetadataMessage.makeBootstrapMetadataMessage(bsMetadataMessage); 
+            logger_.debug("Sending the BootstrapMetadataMessage to " + source);
+            MessagingService.getMessagingInstance().sendOneWay(message, source);
+        }
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,291 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This class performs the exact opposite of the
+ * operations of the Bootstrapper class. Given 
+ * a bunch of nodes that need to move it determines 
+ * who they need to hand off data in terms of ranges.
+*/
+public class LeaveJoinProtocolImpl implements Runnable
+{
+    private static Logger logger_ = Logger.getLogger(LeaveJoinProtocolImpl.class);    
+    
+    /* endpoints that are to be moved. */
+    protected EndPoint[] targets_ = new EndPoint[0];
+    /* position where they need to be moved */
+    protected BigInteger[] tokens_ = new BigInteger[0];
+    /* token metadata information */
+    protected TokenMetadata tokenMetadata_ = null;
+
+    public LeaveJoinProtocolImpl(EndPoint[] targets, BigInteger[] tokens)
+    {
+        targets_ = targets;
+        tokens_ = tokens;
+        tokenMetadata_ = StorageService.instance().getTokenMetadata();
+    }
+
+    public void run()
+    {  
+        try
+        {
+            logger_.debug("Beginning leave/join process for ...");                                                               
+            /* copy the token to endpoint map */
+            Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+            /* copy the endpoint to token map */
+            Map<EndPoint, BigInteger> endpointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
+            
+            Set<BigInteger> oldTokens = new HashSet<BigInteger>( tokenToEndPointMap.keySet() );
+            Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
+            logger_.debug("Total number of old ranges " + oldRanges.length);
+            /* Calculate the list of nodes that handle the old ranges */
+            Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges);
+            
+            /* Remove the tokens of the nodes leaving the ring */
+            Set<BigInteger> tokens = getTokensForLeavingNodes();
+            oldTokens.removeAll(tokens);
+            Range[] rangesAfterNodesLeave = StorageService.instance().getAllRanges(oldTokens);
+            /* Get expanded range to initial range mapping */
+            Map<Range, List<Range>> expandedRangeToOldRangeMap = getExpandedRangeToOldRangeMapping(oldRanges, rangesAfterNodesLeave);
+            /* add the new token positions to the old tokens set */
+            for ( BigInteger token : tokens_ )
+                oldTokens.add(token);
+            Range[] rangesAfterNodesJoin = StorageService.instance().getAllRanges(oldTokens);
+            /* replace the ranges that were split with the split ranges in the old configuration */
+            addSplitRangesToOldConfiguration(oldRangeToEndPointMap, rangesAfterNodesJoin);
+            
+            /* Re-calculate the new ranges after the new token positions are added */
+            Range[] newRanges = StorageService.instance().getAllRanges(oldTokens);
+            /* Remove the old locations from tokenToEndPointMap and add the new locations they are moving to */
+            for ( int i = 0; i < targets_.length; ++i )
+            {
+                tokenToEndPointMap.remove( endpointToTokenMap.get(targets_[i]) );
+                tokenToEndPointMap.put(tokens_[i], targets_[i]);
+            }            
+            /* Calculate the list of nodes that handle the new ranges */            
+            Map<Range, List<EndPoint>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges, tokenToEndPointMap);
+            /* Remove any expanded ranges and replace them with ranges whose aggregate is the expanded range in the new configuration. */
+            removeExpandedRangesFromNewConfiguration(newRangeToEndPointMap, expandedRangeToOldRangeMap);
+            /* Calculate ranges that need to be sent and from whom to where */
+            Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = LeaveJoinProtocolHelper.getRangeSourceTargetInfo(oldRangeToEndPointMap, newRangeToEndPointMap);
+            /* For debug purposes only */
+            Set<Range> ranges = rangesWithSourceTarget.keySet();
+            for ( Range range : ranges )
+            {
+                System.out.print("RANGE: " + range + ":: ");
+                List<BootstrapSourceTarget> infos = rangesWithSourceTarget.get(range);
+                for ( BootstrapSourceTarget info : infos )
+                {
+                    System.out.print(info);
+                    System.out.print(" ");
+                }
+                System.out.println(System.getProperty("line.separator"));
+            }
+            /* Send messages to respective folks to stream data over to the new nodes being bootstrapped */
+            LeaveJoinProtocolHelper.assignWork(rangesWithSourceTarget);
+        }
+        catch ( Throwable th )
+        {
+            logger_.warn(LogUtil.throwableToString(th));
+        }
+    }
+    
+    /**
+     * This method figures out the ranges that have been split and
+     * replaces them with the split range.
+     * @param oldRangeToEndPointMap old range mapped to their replicas.
+     * @param rangesAfterNodesJoin ranges after the nodes have joined at
+     *        their respective position.
+     */
+    private void addSplitRangesToOldConfiguration(Map<Range, List<EndPoint>> oldRangeToEndPointMap, Range[] rangesAfterNodesJoin)
+    {
+        /* 
+         * Find the ranges that are split. Maintain a mapping between
+         * the range being split and the list of subranges.
+        */                
+        Map<Range, List<Range>> splitRanges = LeaveJoinProtocolHelper.getRangeSplitRangeMapping(oldRangeToEndPointMap.keySet().toArray( new Range[0] ), tokens_);
+        /* Mapping of split ranges to the list of endpoints responsible for the range */                
+        Map<Range, List<EndPoint>> replicasForSplitRanges = new HashMap<Range, List<EndPoint>>();                                
+        Set<Range> rangesSplit = splitRanges.keySet();                
+        for ( Range splitRange : rangesSplit )
+        {
+            replicasForSplitRanges.put( splitRange, oldRangeToEndPointMap.get(splitRange) );
+        }
+        /* Remove the ranges that are split. */
+        for ( Range splitRange : rangesSplit )
+        {
+            oldRangeToEndPointMap.remove(splitRange);
+        }
+        
+        /* Add the subranges of the split range to the map with the same replica set. */
+        for ( Range splitRange : rangesSplit )
+        {
+            List<Range> subRanges = splitRanges.get(splitRange);
+            List<EndPoint> replicas = replicasForSplitRanges.get(splitRange);
+            for ( Range subRange : subRanges )
+            {
+                /* Make sure we clone or else we are hammered. */
+                oldRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
+            }
+        }
+    }
+    
+    /**
+     * Reset the newRangeToEndPointMap and replace the expanded range
+     * with the ranges whose aggregate is the expanded range. This happens
+     * only when nodes leave the ring to migrate to a different position.
+     * 
+     * @param newRangeToEndPointMap all new ranges mapped to the replicas 
+     *        responsible for those ranges.
+     * @param expandedRangeToOldRangeMap mapping between the expanded ranges
+     *        and the ranges whose aggregate is the expanded range.
+     */
+    private void removeExpandedRangesFromNewConfiguration(Map<Range, List<EndPoint>> newRangeToEndPointMap, Map<Range, List<Range>> expandedRangeToOldRangeMap)
+    {
+        /* Get the replicas for the expanded ranges */
+        Map<Range, List<EndPoint>> replicasForExpandedRanges = new HashMap<Range, List<EndPoint>>();
+        Set<Range> expandedRanges = expandedRangeToOldRangeMap.keySet();
+        for ( Range expandedRange : expandedRanges )
+        {            
+            replicasForExpandedRanges.put( expandedRange, newRangeToEndPointMap.get(expandedRange) );
+            newRangeToEndPointMap.remove(expandedRange);            
+        }
+        /* replace the expanded ranges in the newRangeToEndPointMap with the subRanges */
+        for ( Range expandedRange : expandedRanges )
+        {
+            List<Range> subRanges = expandedRangeToOldRangeMap.get(expandedRange);
+            List<EndPoint> replicas = replicasForExpandedRanges.get(expandedRange);          
+            for ( Range subRange : subRanges )
+            {
+                newRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
+            }
+        }        
+    }
+    
+    private Set<BigInteger> getTokensForLeavingNodes()
+    {
+        Set<BigInteger> tokens = new HashSet<BigInteger>();
+        for ( EndPoint target : targets_ )
+        {
+            tokens.add( tokenMetadata_.getToken(target) );
+        }        
+        return tokens;
+    }
+    
+    /**
+     * Here we are removing the nodes that need to leave the
+     * ring and trying to calculate what the ranges would look
+     * like w/o them. For eg if we remove two nodes A and D from
+     * the ring and the order of nodes on the ring is A, B, C
+     * and D. When B is removed the range of C is the old range 
+     * of C and the old range of B. We want a mapping from old
+     * range of B to new range of B. We have 
+     * A----B----C----D----E----F----G and we remove b and e
+     * then we want a mapping from (a, c] --> (a,b], (b, c] and 
+     * (d, f] --> (d, e], (d,f].
+     * @param oldRanges ranges with the previous configuration
+     * @param newRanges ranges with the target endpoints removed.
+     * @return map of expanded range to the list whose aggregate is
+     *             the expanded range.
+     */
+    protected static Map<Range, List<Range>> getExpandedRangeToOldRangeMapping(Range[] oldRanges, Range[] newRanges)
+    {
+        Map<Range, List<Range>> map = new HashMap<Range, List<Range>>();   
+        List<Range> oRanges = new ArrayList<Range>();
+        Collections.addAll(oRanges, oldRanges);
+        List<Range> nRanges = new ArrayList<Range>();
+        Collections.addAll(nRanges, newRanges);
+        
+        /*
+         * Remove the ranges that are the same. 
+         * Now we will be left with the expanded 
+         * ranges in the nRanges list and the 
+         * smaller ranges in the oRanges list. 
+        */
+        for( Range oRange : oldRanges )
+        {            
+            boolean bVal = nRanges.remove(oRange);
+            if ( bVal )
+                oRanges.remove(oRange);
+        }
+        
+        int nSize = nRanges.size();
+        int oSize = oRanges.size();
+        /*
+         * Establish the mapping between expanded ranges
+         * to the smaller ranges whose aggregate is the
+         * expanded range. 
+        */
+        for ( int i = 0; i < nSize; ++i )
+        {
+            Range nRange = nRanges.get(i);
+            for ( int j = 0; j < oSize; ++j )
+            {
+                Range oRange = oRanges.get(j);
+                if ( nRange.contains(oRange.right()) )
+                {
+                    List<Range> smallerRanges = map.get(nRange);
+                    if ( smallerRanges == null )
+                    {
+                        smallerRanges = new ArrayList<Range>();
+                        map.put(nRange, smallerRanges);
+                    }
+                    smallerRanges.add(oRange);
+                    continue;
+                }
+            }
+        }
+        
+        return map;
+    }
+
+    public static void main(String[] args) throws Throwable
+    {
+        StorageService ss = StorageService.instance();
+        ss.updateTokenMetadata(BigInteger.valueOf(3), new EndPoint("A", 7000));
+        ss.updateTokenMetadata(BigInteger.valueOf(6), new EndPoint("B", 7000));
+        ss.updateTokenMetadata(BigInteger.valueOf(9), new EndPoint("C", 7000));
+        ss.updateTokenMetadata(BigInteger.valueOf(12), new EndPoint("D", 7000));
+        ss.updateTokenMetadata(BigInteger.valueOf(15), new EndPoint("E", 7000));
+        ss.updateTokenMetadata(BigInteger.valueOf(18), new EndPoint("F", 7000));
+        ss.updateTokenMetadata(BigInteger.valueOf(21), new EndPoint("G", 7000));
+        ss.updateTokenMetadata(BigInteger.valueOf(24), new EndPoint("H", 7000)); 
+        
+        Runnable runnable = new LeaveJoinProtocolImpl( new EndPoint[]{new EndPoint("C", 7000), new EndPoint("D", 7000)}, new BigInteger[]{BigInteger.valueOf(22), BigInteger.valueOf(23)} );
+        runnable.run();
+    }
+}