You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/02 07:13:17 UTC

svn commit: r749207 [7/12] - in /incubator/cassandra/src/org/apache/cassandra: loader/ locator/ net/ net/http/ net/io/ net/sink/ procedures/ service/ test/ tools/

Added: incubator/cassandra/src/org/apache/cassandra/service/CassandraException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/service/CassandraException.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/service/CassandraException.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/service/CassandraException.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,119 @@
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import com.facebook.thrift.*;
+
+import com.facebook.thrift.protocol.*;
+import com.facebook.thrift.transport.*;
+
+public class CassandraException extends Exception implements TBase, java.io.Serializable {
+  public String error;
+
+  public final Isset __isset = new Isset();
+  public static final class Isset implements java.io.Serializable {
+    public boolean error = false;
+  }
+
+  public CassandraException() {
+  }
+
+  public CassandraException(
+    String error)
+  {
+    this();
+    this.error = error;
+    this.__isset.error = true;
+  }
+
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof CassandraException)
+      return this.equals((CassandraException)that);
+    return false;
+  }
+
+  public boolean equals(CassandraException that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_error = true && (this.error != null);
+    boolean that_present_error = true && (that.error != null);
+    if (this_present_error || that_present_error) {
+      if (!(this_present_error && that_present_error))
+        return false;
+      if (!this.error.equals(that.error))
+        return false;
+    }
+
+    return true;
+  }
+
+  public int hashCode() {
+    return 0;
+  }
+
+  public void read(TProtocol iprot) throws TException {
+    TField field;
+    iprot.readStructBegin();
+    while (true)
+    {
+      field = iprot.readFieldBegin();
+      if (field.type == TType.STOP) { 
+        break;
+      }
+      switch (field.id)
+      {
+        case 1:
+          if (field.type == TType.STRING) {
+            this.error = iprot.readString();
+            this.__isset.error = true;
+          } else { 
+            TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        default:
+          TProtocolUtil.skip(iprot, field.type);
+          break;
+      }
+      iprot.readFieldEnd();
+    }
+    iprot.readStructEnd();
+  }
+
+  public void write(TProtocol oprot) throws TException {
+    TStruct struct = new TStruct("CassandraException");
+    oprot.writeStructBegin(struct);
+    TField field = new TField();
+    if (this.error != null) {
+      field.name = "error";
+      field.type = TType.STRING;
+      field.id = 1;
+      oprot.writeFieldBegin(field);
+      oprot.writeString(this.error);
+      oprot.writeFieldEnd();
+    }
+    oprot.writeFieldStop();
+    oprot.writeStructEnd();
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder("CassandraException(");
+    sb.append("error:");
+    sb.append(this.error);
+    sb.append(")");
+    return sb.toString();
+  }
+
+}
+

Added: incubator/cassandra/src/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/service/CassandraServer.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/service/CassandraServer.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/service/CassandraServer.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,1072 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import com.facebook.thrift.*;
+import com.facebook.thrift.server.*;
+import com.facebook.thrift.server.TThreadPoolServer.Options;
+import com.facebook.thrift.transport.*;
+import com.facebook.thrift.protocol.*;
+import com.facebook.fb303.FacebookBase;
+import com.facebook.fb303.fb_status;
+import java.io.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql.common.CqlResult;
+import org.apache.cassandra.cql.driver.CqlDriver;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class CassandraServer extends FacebookBase implements
+		Cassandra.Iface
+{
+
+	private static Logger logger_ = Logger.getLogger(CassandraServer.class);
+	/*
+	 * Handle to the storage service to interact with the other machines in the
+	 * cluster.
+	 */
+	protected StorageService storageService;
+
+	protected CassandraServer(String name)
+	{
+		super(name);
+		// Create the instance of the storage service
+		storageService = StorageService.instance();
+	}
+
+	public CassandraServer() throws Throwable
+	{
+		super("CassandraServer");
+		// Create the instance of the storage service
+		storageService = StorageService.instance();
+	}
+
+	/*
+	 * The start function initializes the server and start's listening on the
+	 * specified port.
+	 */
+	public void start() throws Throwable
+	{
+		LogUtil.init();
+		//LogUtil.setLogLevel("com.facebook", "DEBUG");
+		// Start the storage service
+		storageService.start();
+	}
+	
+	private void validateTable(String table) throws CassandraException
+	{
+		if ( !DatabaseDescriptor.getTables().contains(table) )
+		{
+			throw new CassandraException("Table " + table + " does not exist in this schema.");
+		}
+	}
+    
+	protected ColumnFamily get_cf(String tablename, String key, String columnFamily, List<String> columNames) throws CassandraException, TException
+	{
+    	ColumnFamily cfamily = null;
+		try
+		{
+			validateTable(tablename);
+	        String[] values = RowMutation.getColumnAndColumnFamily(columnFamily);
+	        // check for  values 
+	        if( values.length < 1 )
+	        {
+	        	throw new CassandraException("Column Family " + columnFamily + " is invalid.");	        	
+	        }
+	        Row row = StorageProxy.readProtocol(tablename, key, columnFamily, columNames, StorageService.ConsistencyLevel.WEAK);
+	        if (row == null)
+			{
+				throw new CassandraException("No row exists for key " + key);			
+			}
+			Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
+			if (cfMap == null || cfMap.size() == 0)
+			{				
+				logger_	.info("ERROR ColumnFamily " + columnFamily + " map is missing.....: " + "   key:" + key );
+				throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
+			}
+			cfamily = cfMap.get(values[0]);
+			if (cfamily == null)
+			{
+				logger_.info("ERROR ColumnFamily " + columnFamily + " is missing.....: " + "   key:" + key + "  ColumnFamily:" + values[0]);
+				throw new CassandraException("Either the key " + key + " is not present or the column family " + values[0] +  " is not present.");
+			}
+		}
+		catch (Throwable ex)
+		{
+			String exception = LogUtil.throwableToString(ex);
+			logger_.info( exception );
+			throw new CassandraException(exception);
+		}
+		return cfamily;
+	}
+
+    public  ArrayList<column_t> get_columns_since(String tablename, String key, String columnFamily_column, long timeStamp) throws CassandraException,TException
+	{
+		ArrayList<column_t> retlist = new ArrayList<column_t>();
+        long startTime = System.currentTimeMillis();
+		try
+		{
+			validateTable(tablename);
+	        String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
+	        // check for  values 
+	        if( values.length < 1 )
+	        {
+	        	throw new CassandraException("Column Family " + columnFamily_column + " is invalid.");	        	
+	        }
+	        Row row = StorageProxy.readProtocol(tablename, key, columnFamily_column, timeStamp, StorageService.ConsistencyLevel.WEAK);
+			if (row == null)
+			{
+				logger_.info("ERROR No row for this key .....: " + key);
+	        	throw new CassandraException("ERROR No row for this key .....: " + key);	        	
+			}
+
+			Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
+			if (cfMap == null || cfMap.size() == 0)
+			{
+				logger_	.info("ERROR ColumnFamily " + columnFamily_column + " map is missing.....: " + "   key:" + key);
+				throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
+			}
+			ColumnFamily cfamily = cfMap.get(values[0]);
+			if (cfamily == null)
+			{
+				logger_.info("ERROR ColumnFamily " + columnFamily_column + " is missing.....: "+"   key:" + key	+ "  ColumnFamily:" + values[0]);
+				throw new CassandraException("Either the key " + key + " is not present or the columns requested" + columnFamily_column + "are not present.");
+			}
+			Collection<IColumn> columns = null;
+			if( values.length > 1 )
+			{
+				// this is the super column case 
+				IColumn column = cfamily.getColumn(values[1]);
+				if(column != null)
+					columns = column.getSubColumns();
+			}
+			else
+			{
+				columns = cfamily.getAllColumns();
+			}
+			if (columns == null || columns.size() == 0)
+			{
+				logger_	.info("ERROR Columns are missing.....: " + "   key:" + key + "  ColumnFamily:" + values[0]);
+				throw new CassandraException("ERROR Columns are missing.....: " + "   key:" + key + "  ColumnFamily:" + values[0]);
+			}
+			for(IColumn column : columns)
+			{
+				column_t thrift_column = new column_t();
+				thrift_column.columnName = column.name();
+				thrift_column.value = new String(column.value()); // This needs to be Utf8ed
+				thrift_column.timestamp = column.timestamp();
+				retlist.add(thrift_column);
+			}
+		}
+		catch (Exception ex)
+		{
+			String exception = LogUtil.throwableToString(ex);
+			logger_.info( exception );
+			throw new CassandraException(exception);
+		}
+        logger_.debug("get_slice2: " + (System.currentTimeMillis() - startTime) + " ms.");
+		return retlist;
+	}
+	
+
+    public List<column_t> get_slice_by_names(String tablename, String key, String columnFamily, List<String> columnNames) throws CassandraException, TException
+    {
+		ArrayList<column_t> retlist = new ArrayList<column_t>();
+        long startTime = System.currentTimeMillis();
+		try
+		{
+			validateTable(tablename);
+			ColumnFamily cfamily = get_cf(tablename, key, columnFamily, columnNames);
+			if (cfamily == null)
+			{
+				logger_.info("ERROR ColumnFamily " + columnFamily + " is missing.....: "
+							+"   key:" + key
+							+ "  ColumnFamily:" + columnFamily);
+				throw new CassandraException("Either the key " + key + " is not present or the columnFamily requested" + columnFamily + "is not present.");
+			}
+			Collection<IColumn> columns = null;
+			columns = cfamily.getAllColumns();
+			if (columns == null || columns.size() == 0)
+			{
+				logger_	.info("ERROR Columns are missing.....: "
+							   + "   key:" + key
+								+ "  ColumnFamily:" + columnFamily);
+				throw new CassandraException("ERROR Columns are missing.....: " + "   key:" + key + "  ColumnFamily:" + columnFamily);
+			}
+			
+			for(IColumn column : columns)
+			{
+				column_t thrift_column = new column_t();
+				thrift_column.columnName = column.name();
+				thrift_column.value = new String(column.value()); // This needs to be Utf8ed
+				thrift_column.timestamp = column.timestamp();
+				retlist.add(thrift_column);
+			}
+		}
+		catch (Exception ex)
+		{
+			String exception = LogUtil.throwableToString(ex);
+			logger_.info( exception );
+			throw new CassandraException(exception);
+		}
+		
+        logger_.debug("get_slice2: " + (System.currentTimeMillis() - startTime)
+                + " ms.");
+		return retlist;
+    }
+    
+    public ArrayList<column_t> get_slice(String tablename, String key, String columnFamily_column, int start, int count) throws CassandraException,TException
+	{
+		ArrayList<column_t> retlist = new ArrayList<column_t>();
+        long startTime = System.currentTimeMillis();
+		try
+		{
+			validateTable(tablename);
+	        String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
+	        // check for  values 
+	        if( values.length < 1 )
+	        {
+	        	throw new CassandraException("Column Family " + columnFamily_column + " is invalid.");	        	
+	        }
+	        Row row = StorageProxy.readProtocol(tablename, key, columnFamily_column, start, count, StorageService.ConsistencyLevel.WEAK);
+			if (row == null)
+			{
+				logger_.info("ERROR No row for this key .....: " + key);
+	        	throw new CassandraException("ERROR No row for this key .....: " + key);	        	
+			}
+
+			Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
+			if (cfMap == null || cfMap.size() == 0)
+			{
+				logger_	.info("ERROR ColumnFamily " + columnFamily_column + " map is missing.....: " + "   key:" + key);
+				throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
+			}
+			ColumnFamily cfamily = cfMap.get(values[0]);
+			if (cfamily == null)
+			{
+				logger_.info("ERROR ColumnFamily " + columnFamily_column + " is missing.....: "	+ "   key:" + key + "  ColumnFamily:" + values[0]);
+				throw new CassandraException("Either the key " + key + " is not present or the columns requested" + columnFamily_column + "are not present.");
+			}
+			Collection<IColumn> columns = null;
+			if( values.length > 1 )
+			{
+				// this is the super column case 
+				IColumn column = cfamily.getColumn(values[1]);
+				if(column != null)
+					columns = column.getSubColumns();
+			}
+			else
+			{
+				columns = cfamily.getAllColumns();
+			}
+			if (columns == null || columns.size() == 0)
+			{
+				logger_	.info("ERROR Columns are missing.....: " + "   key:" + key + "  ColumnFamily:" + values[0]);
+				throw new CassandraException("ERROR Columns are missing.....: " + "   key:" + key + "  ColumnFamily:" + values[0]);
+			}
+			for(IColumn column : columns)
+			{
+				column_t thrift_column = new column_t();
+				thrift_column.columnName = column.name();
+				thrift_column.value = new String(column.value()); // This needs to be Utf8ed
+				thrift_column.timestamp = column.timestamp();
+				retlist.add(thrift_column);
+			}
+		}
+		catch (Exception ex)
+		{
+			String exception = LogUtil.throwableToString(ex);
+			logger_.info( exception );
+			throw new CassandraException(exception);
+		}
+        logger_.debug("get_slice2: " + (System.currentTimeMillis() - startTime)
+                + " ms.");
+		return retlist;
+	}
+    
+    public column_t get_column(String tablename, String key, String columnFamily_column) throws CassandraException,TException
+    {
+		column_t ret = null;
+		try
+		{
+			validateTable(tablename);
+	        String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
+	        // check for  values 
+	        if( values.length < 2 )
+	        {
+	        	throw new CassandraException("Column Family " + columnFamily_column + " is invalid.");	        	
+	        }
+	        Row row = StorageProxy.readProtocol(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE, StorageService.ConsistencyLevel.WEAK);
+			if (row == null)
+			{
+				logger_.info("ERROR No row for this key .....: " + key);
+	        	throw new CassandraException("ERROR No row for this key .....: " + key);	        	
+			}
+			
+			Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
+			if (cfMap == null || cfMap.size() == 0)
+			{
+				logger_	.info("ERROR ColumnFamily map is missing.....: "
+							   + "   key:" + key
+								);
+				throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
+			}
+			ColumnFamily cfamily = cfMap.get(values[0]);
+			if (cfamily == null)
+			{
+				logger_.info("ERROR ColumnFamily  is missing.....: "
+							+"   key:" + key
+							+ "  ColumnFamily:" + values[0]);
+				throw new CassandraException("Either the key " + key + " is not present or the columns requested" + columnFamily_column + "are not present.");
+			}
+			Collection<IColumn> columns = null;
+			if( values.length > 2 )
+			{
+				// this is the super column case 
+				IColumn column = cfamily.getColumn(values[1]);
+				if(column != null)
+					columns = column.getSubColumns();
+			}
+			else
+			{
+				columns = cfamily.getAllColumns();
+			}
+			if (columns == null || columns.size() == 0)
+			{
+				logger_	.info("ERROR Columns are missing.....: "
+							   + "   key:" + key
+								+ "  ColumnFamily:" + values[0]);
+				throw new CassandraException("ERROR Columns are missing.....: " + "   key:" + key + "  ColumnFamily:" + values[0]);
+			}
+			ret = new column_t();
+			for(IColumn column : columns)
+			{
+				ret.columnName = column.name();
+				ret.value = new String(column.value());
+				ret.timestamp = column.timestamp();
+			}
+		}
+		catch (Exception ex)
+		{
+			String exception = LogUtil.throwableToString(ex);
+			logger_.info( exception );
+			throw new CassandraException(exception);
+		}
+		return ret;
+    }
+    
+
+    public int get_column_count(String tablename, String key, String columnFamily_column) throws CassandraException
+	{
+    	int count = -1;
+		try
+		{
+			validateTable(tablename);
+	        String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
+	        // check for  values 
+	        if( values.length < 1 )
+	        {
+	        	throw new CassandraException("Column Family " + columnFamily_column + " is invalid.");	        	
+	        }
+	        Row row = StorageProxy.readProtocol(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE, StorageService.ConsistencyLevel.WEAK);
+			if (row == null)
+			{
+				logger_.info("ERROR No row for this key .....: " + key);
+	        	throw new CassandraException("ERROR No row for this key .....: " + key);	        	
+			}
+
+			Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
+			if (cfMap == null || cfMap.size() == 0)
+			{
+				logger_	.info("ERROR ColumnFamily map is missing.....: "
+							   + "   key:" + key
+								);
+				throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
+			}
+			ColumnFamily cfamily = cfMap.get(values[0]);
+			if (cfamily == null)
+			{
+				logger_.info("ERROR ColumnFamily  is missing.....: "
+							+"   key:" + key
+							+ "  ColumnFamily:" + values[0]);
+				throw new CassandraException("Either the key " + key + " is not present or the columns requested" + columnFamily_column + "are not present.");
+			}
+			Collection<IColumn> columns = null;
+			if( values.length > 1 )
+			{
+				// this is the super column case 
+				IColumn column = cfamily.getColumn(values[1]);
+				if(column != null)
+					columns = column.getSubColumns();
+			}
+			else
+			{
+				columns = cfamily.getAllColumns();
+			}
+			if (columns == null || columns.size() == 0)
+			{
+				logger_	.info("ERROR Columns are missing.....: "
+							   + "   key:" + key
+								+ "  ColumnFamily:" + values[0]);
+				throw new CassandraException("ERROR Columns are missing.....: " + "   key:" + key + "  ColumnFamily:" + values[0]);
+			}
+			count = columns.size();
+		}
+		catch (Exception ex)
+		{
+			String exception = LogUtil.throwableToString(ex);
+			logger_.info( exception );
+			throw new CassandraException(exception);
+		}
+		return count;
+	}
+
+    public void insert(String tablename, String key, String columnFamily_column, String cellData, long timestamp)
+	{
+		try
+		{
+			validateTable(tablename);
+			RowMutation rm = new RowMutation(tablename, key.trim());
+			rm.add(columnFamily_column, cellData.getBytes(), timestamp);
+			StorageProxy.insert(rm);
+		}
+		catch (Exception e)
+		{
+			logger_.debug( LogUtil.throwableToString(e) );
+		}
+		return;
+	}
+    
+    public boolean batch_insert_blocking(batch_mutation_t batchMutation)
+    {
+		// 1. Get the N nodes from storage service where the data needs to be
+		// replicated
+		// 2. Construct a message for read\write
+		// 3. SendRR ( to all the nodes above )
+		// 4. Wait for a response from atleast X nodes where X <= N
+		// 5. return success
+    	boolean result = false;
+		try
+		{
+			logger_.warn(" batch_insert_blocking");
+			validateTable(batchMutation.table);
+			IResponseResolver<Boolean> writeResponseResolver = new WriteResponseResolver();
+			QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(
+					DatabaseDescriptor.getReplicationFactor(),
+					writeResponseResolver);
+			EndPoint[] endpoints = storageService.getNStorageEndPoint(batchMutation.key);
+			// TODO: throw a thrift exception if we do not have N nodes
+
+			logger_.debug(" Creating the row mutation");
+			RowMutation rm = new RowMutation(batchMutation.table,
+					batchMutation.key.trim());
+			Set keys = batchMutation.cfmap.keySet();
+			Iterator keyIter = keys.iterator();
+			while (keyIter.hasNext())
+			{
+				Object key = keyIter.next(); // Get the next key.
+				List<column_t> list = batchMutation.cfmap.get(key);
+				for (column_t columnData : list)
+				{
+					rm.add(key.toString() + ":" + columnData.columnName,
+							columnData.value.getBytes(), columnData.timestamp);
+
+				}
+			}            
+            
+			RowMutationMessage rmMsg = new RowMutationMessage(rm);           
+			Message message = new Message(StorageService.getLocalStorageEndPoint(), 
+                    StorageService.mutationStage_,
+					StorageService.mutationVerbHandler_, 
+                    new Object[]{ rmMsg }
+            );
+			MessagingService.getMessagingInstance().sendRR(message, endpoints,
+					quorumResponseHandler);
+			logger_.debug(" Calling quorum response handler's get");
+			result = quorumResponseHandler.get(); 
+                       
+			// TODO: if the result is false that means the writes to all the
+			// servers failed hence we need to throw an exception or return an
+			// error back to the client so that it can take appropriate action.
+		}
+		catch (Exception e)
+		{
+			logger_.info( LogUtil.throwableToString(e) );
+		}
+		return result;
+    	
+    }
+	public void batch_insert(batch_mutation_t batchMutation)
+	{
+		// 1. Get the N nodes from storage service where the data needs to be
+		// replicated
+		// 2. Construct a message for read\write
+		// 3. SendRR ( to all the nodes above )
+		// 4. Wait for a response from atleast X nodes where X <= N
+		// 5. return success
+
+		try
+		{
+			logger_.debug(" batch_insert");
+			logger_.debug(" Creating the row mutation");
+			validateTable(batchMutation.table);
+			RowMutation rm = new RowMutation(batchMutation.table,
+					batchMutation.key.trim());
+			if(batchMutation.cfmap != null)
+			{
+				Set keys = batchMutation.cfmap.keySet();
+				Iterator keyIter = keys.iterator();
+				while (keyIter.hasNext())
+				{
+					Object key = keyIter.next(); // Get the next key.
+					List<column_t> list = batchMutation.cfmap.get(key);
+					for (column_t columnData : list)
+					{
+						rm.add(key.toString() + ":" + columnData.columnName,
+								columnData.value.getBytes(), columnData.timestamp);
+	
+					}
+				}
+			}
+			if(batchMutation.cfmapdel != null)
+			{
+				Set keys = batchMutation.cfmapdel.keySet();
+				Iterator keyIter = keys.iterator();
+				while (keyIter.hasNext())
+				{
+					Object key = keyIter.next(); // Get the next key.
+					List<column_t> list = batchMutation.cfmapdel.get(key);
+					for (column_t columnData : list)
+					{
+						rm.delete(key.toString() + ":" + columnData.columnName);
+					}
+				}            
+			}
+			StorageProxy.insert(rm);
+		}
+		catch (Exception e)
+		{
+			logger_.info( LogUtil.throwableToString(e) );
+		}
+		return;
+	}
+
+    public void remove(String tablename, String key, String columnFamily_column)
+	{
+		try
+		{
+			validateTable(tablename);
+			RowMutation rm = new RowMutation(tablename, key.trim());
+			rm.delete(columnFamily_column);
+            StorageProxy.insert(rm);
+		}
+		catch (Exception e)
+		{
+			logger_.debug( LogUtil.throwableToString(e) );
+		}
+		return;
+	}
+
+    public List<superColumn_t> get_slice_super_by_names(String tablename, String key, String columnFamily, List<String> superColumnNames) throws CassandraException, TException
+    {
+		ArrayList<superColumn_t> retlist = new ArrayList<superColumn_t>();
+        long startTime = System.currentTimeMillis();
+		
+		try
+		{
+			validateTable(tablename);
+			ColumnFamily cfamily = get_cf(tablename, key, columnFamily, superColumnNames);
+			if (cfamily == null)
+			{
+				logger_.info("ERROR ColumnFamily " + columnFamily + " is missing.....: "+"   key:" + key
+							+ "  ColumnFamily:" + columnFamily);
+				throw new CassandraException("Either the key " + key + " is not present or the column family requested" + columnFamily + "is not present.");
+			}
+			Collection<IColumn> columns = null;
+			columns = cfamily.getAllColumns();
+			if (columns == null || columns.size() == 0)
+			{
+				logger_	.info("ERROR Columns are missing.....: " + "   key:" + key + "  ColumnFamily:" + columnFamily);
+				throw new CassandraException("ERROR Columns are missing.....: " + "   key:" + key + "  ColumnFamily:" + columnFamily);
+			}
+			
+			for(IColumn column : columns)
+			{
+				superColumn_t thrift_superColumn = new superColumn_t();
+				thrift_superColumn.name = column.name();
+				Collection<IColumn> subColumns = column.getSubColumns();
+				if(subColumns.size() != 0 )
+				{
+					thrift_superColumn.columns = new ArrayList<column_t>();
+					for( IColumn subColumn : subColumns )
+					{
+						column_t thrift_column = new column_t();
+						thrift_column.columnName = subColumn.name();
+						thrift_column.value = new String(subColumn.value());
+						thrift_column.timestamp = subColumn.timestamp();
+						thrift_superColumn.columns.add(thrift_column);
+					}
+				}
+				retlist.add(thrift_superColumn);
+			}
+		}
+		catch (Exception ex)
+		{
+			String exception = LogUtil.throwableToString(ex);
+			logger_.info( exception );
+			throw new CassandraException(exception);
+		}
+        logger_.debug("get_slice2: " + (System.currentTimeMillis() - startTime)
+                + " ms.");
+		return retlist;
+    }
+
+    
+    public ArrayList<superColumn_t> get_slice_super(String tablename, String key, String columnFamily_superColumnName, int start, int count) throws CassandraException
+    {
+		ArrayList<superColumn_t> retlist = new ArrayList<superColumn_t>();
+		try
+		{
+			validateTable(tablename);
+	        String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_superColumnName);
+	        // check for  values 
+	        if( values.length < 1 )
+	        {
+	        	throw new CassandraException("Column Family " + columnFamily_superColumnName + " is invalid.");	        	
+	        }
+	        Row row = StorageProxy.readProtocol(tablename, key, columnFamily_superColumnName, start, count, StorageService.ConsistencyLevel.WEAK);
+			if (row == null)
+			{
+				logger_.info("ERROR No row for this key .....: " + key);
+	        	throw new CassandraException("ERROR No row for this key .....: " + key);	        	
+			}
+
+			Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
+			if (cfMap == null || cfMap.size() == 0)
+			{
+				logger_	.info("ERROR ColumnFamily map is missing.....: "
+							   + "   key:" + key
+								);
+				throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
+			}
+			ColumnFamily cfamily = cfMap.get(values[0]);
+			if (cfamily == null)
+			{
+				logger_.info("ERROR ColumnFamily  is missing.....: "
+							+"   key:" + key
+							+ "  ColumnFamily:" + values[0]);
+				throw new CassandraException("Either the key " + key + " is not present or the columns requested" + columnFamily_superColumnName + "are not present.");
+			}
+			Collection<IColumn> columns = cfamily.getAllColumns();
+			if (columns == null || columns.size() == 0)
+			{
+				logger_	.info("ERROR Columns are missing.....: "
+							   + "   key:" + key
+								+ "  ColumnFamily:" + values[0]);
+				throw new CassandraException("ERROR Columns are missing.....: " + "   key:" + key + "  ColumnFamily:" + values[0]);
+			}
+			
+			for(IColumn column : columns)
+			{
+				superColumn_t thrift_superColumn = new superColumn_t();
+				thrift_superColumn.name = column.name();
+				Collection<IColumn> subColumns = column.getSubColumns();
+				if(subColumns.size() != 0 )
+				{
+					thrift_superColumn.columns = new ArrayList<column_t>();
+					for( IColumn subColumn : subColumns )
+					{
+						column_t thrift_column = new column_t();
+						thrift_column.columnName = subColumn.name();
+						thrift_column.value = new String(subColumn.value());
+						thrift_column.timestamp = subColumn.timestamp();
+						thrift_superColumn.columns.add(thrift_column);
+					}
+				}
+				retlist.add(thrift_superColumn);
+			}
+		}
+		catch (Exception ex)
+		{
+			String exception = LogUtil.throwableToString(ex);
+			logger_.info( exception );
+			throw new CassandraException(exception);
+		}
+		return retlist;
+    	
+    }
+    
+    public superColumn_t get_superColumn(String tablename, String key, String columnFamily_column) throws CassandraException
+    {
+    	superColumn_t ret = null;
+		try
+		{
+			validateTable(tablename);
+	        String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
+	        // check for  values 
+	        if( values.length < 2 )
+	        {
+	        	throw new CassandraException("Column Family " + columnFamily_column + " is invalid.");	        	
+	        }
+
+	        Row row = StorageProxy.readProtocol(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE, StorageService.ConsistencyLevel.WEAK);
+			if (row == null)
+			{
+				logger_.info("ERROR No row for this key .....: " + key);
+	        	throw new CassandraException("ERROR No row for this key .....: " + key);	        	
+			}
+
+			Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
+			if (cfMap == null || cfMap.size() == 0)
+			{
+				logger_	.info("ERROR ColumnFamily map is missing.....: "
+							   + "   key:" + key
+								);
+				throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
+			}
+			ColumnFamily cfamily = cfMap.get(values[0]);
+			if (cfamily == null)
+			{
+				logger_.info("ERROR ColumnFamily  is missing.....: "
+							+"   key:" + key
+							+ "  ColumnFamily:" + values[0]);
+				throw new CassandraException("Either the key " + key + " is not present or the columns requested" + columnFamily_column + "are not present.");
+			}
+			Collection<IColumn> columns = cfamily.getAllColumns();
+			if (columns == null || columns.size() == 0)
+			{
+				logger_	.info("ERROR Columns are missing.....: "
+							   + "   key:" + key
+								+ "  ColumnFamily:" + values[0]);
+				throw new CassandraException("ERROR Columns are missing.....: " + "   key:" + key + "  ColumnFamily:" + values[0]);
+			}
+			
+			for(IColumn column : columns)
+			{
+				ret = new superColumn_t();
+				ret.name = column.name();
+				Collection<IColumn> subColumns = column.getSubColumns();
+				if(subColumns.size() != 0 )
+				{
+					ret.columns = new ArrayList<column_t>();
+					for(IColumn subColumn : subColumns)
+					{
+						column_t thrift_column = new column_t();
+						thrift_column.columnName = subColumn.name();
+						thrift_column.value = new String(subColumn.value());
+						thrift_column.timestamp = subColumn.timestamp();
+						ret.columns.add(thrift_column);
+					}
+				}
+			}
+		}
+		catch (Exception ex)
+		{
+			String exception = LogUtil.throwableToString(ex);
+			logger_.info( exception );
+			throw new CassandraException(exception);
+		}
+		return ret;
+    	
+    }
+    
+    public boolean batch_insert_superColumn_blocking(batch_mutation_super_t batchMutationSuper)
+    {
+    	boolean result = false;
+		try
+		{
+			logger_.warn(" batch_insert_SuperColumn_blocking");
+			logger_.debug(" Creating the row mutation");
+			validateTable(batchMutationSuper.table);
+			RowMutation rm = new RowMutation(batchMutationSuper.table,
+					batchMutationSuper.key.trim());
+			Set keys = batchMutationSuper.cfmap.keySet();
+			Iterator keyIter = keys.iterator();
+			while (keyIter.hasNext())
+			{
+				Object key = keyIter.next(); // Get the next key.
+				List<superColumn_t> list = batchMutationSuper.cfmap.get(key);
+				for (superColumn_t superColumnData : list)
+				{
+					if(superColumnData.columns.size() != 0 )
+					{
+						for (column_t columnData : superColumnData.columns)
+						{
+							rm.add(key.toString() + ":" + superColumnData.name  +":" + columnData.columnName,
+									columnData.value.getBytes(), columnData.timestamp);
+						}
+					}
+					else
+					{
+						rm.add(key.toString() + ":" + superColumnData.name, new byte[0], 0);
+					}
+				}
+			}            
+            StorageProxy.insert(rm);
+		}
+		catch (Exception e)
+		{
+			logger_.info( LogUtil.throwableToString(e) );
+		}
+		return result;
+    	
+    }
+    public void batch_insert_superColumn(batch_mutation_super_t batchMutationSuper)
+    {
+		try
+		{
+			logger_.debug(" batch_insert");
+			logger_.debug(" Creating the row mutation");
+			validateTable(batchMutationSuper.table);
+			RowMutation rm = new RowMutation(batchMutationSuper.table,
+					batchMutationSuper.key.trim());
+			if(batchMutationSuper.cfmap != null)
+			{
+				Set keys = batchMutationSuper.cfmap.keySet();
+				Iterator keyIter = keys.iterator();
+				while (keyIter.hasNext())
+				{
+					Object key = keyIter.next(); // Get the next key.
+					List<superColumn_t> list = batchMutationSuper.cfmap.get(key);
+					for (superColumn_t superColumnData : list)
+					{
+						if(superColumnData.columns.size() != 0 )
+						{
+							for (column_t columnData : superColumnData.columns)
+							{
+								rm.add(key.toString() + ":" + superColumnData.name  +":" + columnData.columnName,
+										columnData.value.getBytes(), columnData.timestamp);
+							}
+						}
+						else
+						{
+							rm.add(key.toString() + ":" + superColumnData.name, new byte[0], 0);
+						}
+					}
+				} 
+			}
+			if(batchMutationSuper.cfmapdel != null)
+			{
+				Set keys = batchMutationSuper.cfmapdel.keySet();
+				Iterator keyIter = keys.iterator();
+				while (keyIter.hasNext())
+				{
+					Object key = keyIter.next(); // Get the next key.
+					List<superColumn_t> list = batchMutationSuper.cfmapdel.get(key);
+					for (superColumn_t superColumnData : list)
+					{
+						if(superColumnData.columns.size() != 0 )
+						{
+							for (column_t columnData : superColumnData.columns)
+							{
+								rm.delete(key.toString() + ":" + superColumnData.name  +":" + columnData.columnName);
+							}
+						}
+						else
+						{
+							rm.delete(key.toString() + ":" + superColumnData.name);
+						}
+					}
+				} 
+			}
+            StorageProxy.insert(rm);
+		}
+		catch (Exception e)
+		{
+			logger_.info( LogUtil.throwableToString(e) );
+		}
+		return;
+    }
+
+    public String getStringProperty(String propertyName) throws TException
+    {
+        if (propertyName.equals("cluster name"))
+        {
+            return DatabaseDescriptor.getClusterName();
+        }
+        else if (propertyName.equals("config file"))
+        {
+            String filename = DatabaseDescriptor.getConfigFileName();
+            try
+            {
+                StringBuffer fileData = new StringBuffer(8192);
+                BufferedInputStream stream = new BufferedInputStream(new FileInputStream(filename));
+                byte[] buf = new byte[1024];
+                int numRead;
+                while( (numRead = stream.read(buf)) != -1)
+                {
+                    String str = new String(buf, 0, numRead);
+                    fileData.append(str);
+                }
+                stream.close();
+                return fileData.toString();
+            }
+            catch (IOException e)
+            {
+                return "file not found!";
+            }
+        }
+        else if (propertyName.equals("version"))
+        {
+            return getVersion();
+        }
+        else
+        {
+            return "?";
+        }
+    }
+
+    public List<String> getStringListProperty(String propertyName) throws TException
+    {
+        if (propertyName.equals("tables"))
+        {
+            return DatabaseDescriptor.getTables();        
+        }
+        else
+        {
+            return new ArrayList<String>();
+        }
+    }
+
+    public String describeTable(String tableName) throws TException
+    {
+        String desc = "";
+        Map<String, CFMetaData> tableMetaData = DatabaseDescriptor.getTableMetaData(tableName);
+
+        if (tableMetaData == null)
+        {
+            return "Table " + tableName +  " not found.";
+        }
+
+        Iterator iter = tableMetaData.entrySet().iterator();
+        while (iter.hasNext())
+        {
+            Map.Entry<String, CFMetaData> pairs = (Map.Entry<String, CFMetaData>)iter.next();
+            desc = desc + pairs.getValue().pretty() + "-----\n";
+        }
+        return desc;
+    }
+
+    public CqlResult_t executeQuery(String query) throws TException
+    {
+        CqlResult_t result = new CqlResult_t();
+
+        CqlResult cqlResult = CqlDriver.executeQuery(query);
+        
+        // convert CQL result type to Thrift specific return type
+        if (cqlResult != null)
+        {
+            result.errorTxt = cqlResult.errorTxt;
+            result.resultSet = cqlResult.resultSet;
+            result.errorCode = cqlResult.errorCode;
+        }
+        return result;
+    }
+    
+    /*
+     * This method is used to ensure that all keys
+     * prior to the specified key, as dtermined by
+     * the SSTable index bucket it falls in, are in
+     * buffer cache.  
+    */
+    public void touch (String key , boolean fData) 
+    {
+    	try
+    	{
+    		StorageProxy.touchProtocol(DatabaseDescriptor.getTables().get(0), key, fData, StorageService.ConsistencyLevel.WEAK);
+    	}
+    	catch ( Exception e)
+    	{
+			logger_.info( LogUtil.throwableToString(e) );
+    	}
+	}
+    
+    
+	public String getVersion()
+	{
+		return "1";
+	}
+
+	public int getStatus()
+	{
+		return fb_status.ALIVE;
+	}
+
+	public String getStatusDetails()
+	{
+		return null;
+	}
+
+	public static void main(String[] args) throws Throwable
+	{
+		int port = 9160;		
+		try
+		{
+			CassandraServer peerStorageServer = new CassandraServer();
+			peerStorageServer.start();
+			Cassandra.Processor processor = new Cassandra.Processor(
+					peerStorageServer);
+			// Transport
+			TServerSocket tServerSocket =  new TServerSocket(port);
+			 // Protocol factory
+			TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory();
+			 // ThreadPool Server
+			Options options = new Options();
+			options.minWorkerThreads = 64;
+			TThreadPoolServer serverEngine = new TThreadPoolServer(processor, tServerSocket, tProtocolFactory);
+			serverEngine.serve();
+
+		}
+		catch (Exception x)
+		{
+			System.err.println("UNCAUGHT EXCEPTION IN main()");
+			x.printStackTrace();
+			System.exit(1);
+		}
+
+	}
+
+}

Added: incubator/cassandra/src/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/service/ConsistencyManager.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/service/ConsistencyManager.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/service/ConsistencyManager.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadResponseMessage;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.Cachetable;
+import org.apache.cassandra.utils.ICacheExpungeHook;
+import org.apache.cassandra.utils.ICachetable;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+class ConsistencyManager implements Runnable
+{
+	private static Logger logger_ = Logger.getLogger(ConsistencyManager.class);
+	
+	class DigestResponseHandler implements IAsyncCallback
+	{
+		List<Message> responses_ = new ArrayList<Message>();
+		
+		public void response(Message msg)
+		{
+			logger_.debug("Received reponse : " + msg.toString());
+			responses_.add(msg);
+			if ( responses_.size() == ConsistencyManager.this.replicas_.size() )
+				handleDigestResponses();
+		}
+		
+		private void handleDigestResponses()
+		{
+			DataInputBuffer bufIn = new DataInputBuffer();
+			logger_.debug("Handle Digest reponses");
+			for( Message response : responses_ )
+			{
+				byte[] body = (byte[])response.getMessageBody()[0];            
+	            bufIn.reset(body, body.length);
+	            try
+	            {	               
+	                ReadResponseMessage result = ReadResponseMessage.serializer().deserialize(bufIn);  
+	                byte[] digest = result.digest();
+	                if( !Arrays.equals(row_.digest(), digest) )
+					{
+	                	doReadRepair();
+	                	break;
+					}
+	            }
+	            catch( IOException ex )
+	            {
+	            	logger_.info(LogUtil.throwableToString(ex));
+	            }
+			}
+		}
+		
+		private void doReadRepair() throws IOException
+		{
+			IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
+            /* Add the local storage endpoint to the replicas_ list */
+            replicas_.add(StorageService.getLocalStorageEndPoint());
+			IAsyncCallback responseHandler = new DataRepairHandler(ConsistencyManager.this.replicas_.size(), readResponseResolver);	
+			String table = DatabaseDescriptor.getTables().get(0);
+			ReadMessage readMessage = new ReadMessage(table, row_.key(), columnFamily_);
+            Message message = ReadMessage.makeReadMessage(readMessage);
+			MessagingService.getMessagingInstance().sendRR(message, replicas_.toArray( new EndPoint[0] ), responseHandler);			
+		}
+	}
+	
+	class DataRepairHandler implements IAsyncCallback, ICacheExpungeHook<String, String>
+	{
+		private List<Message> responses_ = new ArrayList<Message>();
+		private IResponseResolver<Row> readResponseResolver_;
+		private int majority_;
+		
+		DataRepairHandler(int responseCount, IResponseResolver<Row> readResponseResolver)
+		{
+			readResponseResolver_ = readResponseResolver;
+			majority_ = (responseCount >> 1) + 1;  
+		}
+		
+		public void response(Message message)
+		{
+			logger_.debug("Received responses in DataRepairHandler : " + message.toString());
+			responses_.add(message);
+			if ( responses_.size() == majority_ )
+			{
+				String messageId = message.getMessageId();
+				readRepairTable_.put(messageId, messageId, this);
+				// handleResponses();
+			}
+		}
+		
+		public void callMe(String key, String value)
+		{
+			handleResponses();
+		}
+		
+		private void handleResponses()
+		{
+			try
+			{
+				readResponseResolver_.resolve(new ArrayList<Message>(responses_));
+			}
+			catch ( DigestMismatchException ex )
+			{
+				logger_.info("We should not be coming here under any circumstances ...");
+				logger_.info(LogUtil.throwableToString(ex));
+			}
+		}
+	}
+	private static long scheduledTimeMillis_ = 600;
+	private static ICachetable<String, String> readRepairTable_ = new Cachetable<String, String>(scheduledTimeMillis_);
+	private Row row_;
+	protected List<EndPoint> replicas_;
+	private String columnFamily_;
+	private int start_;
+	private int count_;
+	private long sinceTimestamp_;
+	private List<String> columnNames_ = new ArrayList<String>();	
+	
+	ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, List<String> columns)
+	{
+		row_ = row;
+		replicas_ = replicas;
+		columnFamily_ = columnFamily;
+		columnNames_ = columns;
+	}
+	
+	ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, int start, int count)
+	{
+		row_ = row;
+		replicas_ = replicas;
+		columnFamily_ = columnFamily;
+		start_ = start;
+		count_ = count;
+	}
+	
+	ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, long sinceTimestamp)
+	{
+		row_ = row;
+		replicas_ = replicas;
+		columnFamily_ = columnFamily;
+		sinceTimestamp_ = sinceTimestamp;
+	}
+
+	public void run()
+	{
+		logger_.debug(" Run the consistency checks for " + columnFamily_);
+		String table = DatabaseDescriptor.getTables().get(0);
+		ReadMessage readMessageDigestOnly = null;
+		if(columnNames_.size() == 0)
+		{
+			if( start_ >= 0 && count_ < Integer.MAX_VALUE)
+			{
+				readMessageDigestOnly = new ReadMessage(table, row_.key(), columnFamily_, start_, count_);
+			}
+			else if(sinceTimestamp_ > 0)
+			{
+				readMessageDigestOnly = new ReadMessage(table, row_.key(), columnFamily_, sinceTimestamp_);
+			}
+			else
+			{
+				readMessageDigestOnly = new ReadMessage(table, row_.key(), columnFamily_);
+			}
+		}
+		else
+		{
+			readMessageDigestOnly = new ReadMessage(table, row_.key(), columnFamily_, columnNames_);
+			
+		}
+		readMessageDigestOnly.setIsDigestQuery(true);
+		try
+		{
+			Message messageDigestOnly = ReadMessage.makeReadMessage(readMessageDigestOnly);
+			IAsyncCallback digestResponseHandler = new DigestResponseHandler();
+			MessagingService.getMessagingInstance().sendRR(messageDigestOnly, replicas_.toArray(new EndPoint[0]), digestResponseHandler);
+		}
+		catch ( IOException ex )
+		{
+			logger_.info(LogUtil.throwableToString(ex));
+		}
+	}
+}

Added: incubator/cassandra/src/org/apache/cassandra/service/Constants.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/service/Constants.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/service/Constants.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/service/Constants.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,18 @@
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import com.facebook.thrift.*;
+
+public class Constants {
+
+}

Added: incubator/cassandra/src/org/apache/cassandra/service/CqlResult_t.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/service/CqlResult_t.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/service/CqlResult_t.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/service/CqlResult_t.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,216 @@
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import com.facebook.thrift.*;
+
+import com.facebook.thrift.protocol.*;
+import com.facebook.thrift.transport.*;
+
+public class CqlResult_t implements TBase, java.io.Serializable {
+  public int errorCode;
+  public String errorTxt;
+  public List<Map<String,String>> resultSet;
+
+  public final Isset __isset = new Isset();
+  public static final class Isset implements java.io.Serializable {
+    public boolean errorCode = false;
+    public boolean errorTxt = false;
+    public boolean resultSet = false;
+  }
+
+  public CqlResult_t() {
+  }
+
+  public CqlResult_t(
+    int errorCode,
+    String errorTxt,
+    List<Map<String,String>> resultSet)
+  {
+    this();
+    this.errorCode = errorCode;
+    this.__isset.errorCode = true;
+    this.errorTxt = errorTxt;
+    this.__isset.errorTxt = true;
+    this.resultSet = resultSet;
+    this.__isset.resultSet = true;
+  }
+
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof CqlResult_t)
+      return this.equals((CqlResult_t)that);
+    return false;
+  }
+
+  public boolean equals(CqlResult_t that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_errorCode = true;
+    boolean that_present_errorCode = true;
+    if (this_present_errorCode || that_present_errorCode) {
+      if (!(this_present_errorCode && that_present_errorCode))
+        return false;
+      if (this.errorCode != that.errorCode)
+        return false;
+    }
+
+    boolean this_present_errorTxt = true && (this.errorTxt != null);
+    boolean that_present_errorTxt = true && (that.errorTxt != null);
+    if (this_present_errorTxt || that_present_errorTxt) {
+      if (!(this_present_errorTxt && that_present_errorTxt))
+        return false;
+      if (!this.errorTxt.equals(that.errorTxt))
+        return false;
+    }
+
+    boolean this_present_resultSet = true && (this.resultSet != null);
+    boolean that_present_resultSet = true && (that.resultSet != null);
+    if (this_present_resultSet || that_present_resultSet) {
+      if (!(this_present_resultSet && that_present_resultSet))
+        return false;
+      if (!this.resultSet.equals(that.resultSet))
+        return false;
+    }
+
+    return true;
+  }
+
+  public int hashCode() {
+    return 0;
+  }
+
+  public void read(TProtocol iprot) throws TException {
+    TField field;
+    iprot.readStructBegin();
+    while (true)
+    {
+      field = iprot.readFieldBegin();
+      if (field.type == TType.STOP) { 
+        break;
+      }
+      switch (field.id)
+      {
+        case 1:
+          if (field.type == TType.I32) {
+            this.errorCode = iprot.readI32();
+            this.__isset.errorCode = true;
+          } else { 
+            TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 2:
+          if (field.type == TType.STRING) {
+            this.errorTxt = iprot.readString();
+            this.__isset.errorTxt = true;
+          } else { 
+            TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 3:
+          if (field.type == TType.LIST) {
+            {
+              TList _list40 = iprot.readListBegin();
+              this.resultSet = new ArrayList<Map<String,String>>(_list40.size);
+              for (int _i41 = 0; _i41 < _list40.size; ++_i41)
+              {
+                Map<String,String> _elem42 = new HashMap<String,String>();
+                {
+                  TMap _map43 = iprot.readMapBegin();
+                  _elem42 = new HashMap<String,String>(2*_map43.size);
+                  for (int _i44 = 0; _i44 < _map43.size; ++_i44)
+                  {
+                    String _key45;
+                    String _val46;
+                    _key45 = iprot.readString();
+                    _val46 = iprot.readString();
+                    _elem42.put(_key45, _val46);
+                  }
+                  iprot.readMapEnd();
+                }
+                this.resultSet.add(_elem42);
+              }
+              iprot.readListEnd();
+            }
+            this.__isset.resultSet = true;
+          } else { 
+            TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        default:
+          TProtocolUtil.skip(iprot, field.type);
+          break;
+      }
+      iprot.readFieldEnd();
+    }
+    iprot.readStructEnd();
+  }
+
+  public void write(TProtocol oprot) throws TException {
+    TStruct struct = new TStruct("CqlResult_t");
+    oprot.writeStructBegin(struct);
+    TField field = new TField();
+    field.name = "errorCode";
+    field.type = TType.I32;
+    field.id = 1;
+    oprot.writeFieldBegin(field);
+    oprot.writeI32(this.errorCode);
+    oprot.writeFieldEnd();
+    if (this.errorTxt != null) {
+      field.name = "errorTxt";
+      field.type = TType.STRING;
+      field.id = 2;
+      oprot.writeFieldBegin(field);
+      oprot.writeString(this.errorTxt);
+      oprot.writeFieldEnd();
+    }
+    if (this.resultSet != null) {
+      field.name = "resultSet";
+      field.type = TType.LIST;
+      field.id = 3;
+      oprot.writeFieldBegin(field);
+      {
+        oprot.writeListBegin(new TList(TType.MAP, this.resultSet.size()));
+        for (Map<String,String> _iter47 : this.resultSet)        {
+          {
+            oprot.writeMapBegin(new TMap(TType.STRING, TType.STRING, _iter47.size()));
+            for (String _iter48 : _iter47.keySet())            {
+              oprot.writeString(_iter48);
+              oprot.writeString(_iter47.get(_iter48));
+            }
+            oprot.writeMapEnd();
+          }
+        }
+        oprot.writeListEnd();
+      }
+      oprot.writeFieldEnd();
+    }
+    oprot.writeFieldStop();
+    oprot.writeStructEnd();
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder("CqlResult_t(");
+    sb.append("errorCode:");
+    sb.append(this.errorCode);
+    sb.append(",errorTxt:");
+    sb.append(this.errorTxt);
+    sb.append(",resultSet:");
+    sb.append(this.resultSet);
+    sb.append(")");
+    return sb.toString();
+  }
+
+}
+

Added: incubator/cassandra/src/org/apache/cassandra/service/DigestMismatchException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/service/DigestMismatchException.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/service/DigestMismatchException.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/service/DigestMismatchException.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class DigestMismatchException extends Exception
+{
+	public DigestMismatchException(String message)
+	{
+		super(message);
+	}
+}

Added: incubator/cassandra/src/org/apache/cassandra/service/HttpRequestVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/service/HttpRequestVerbHandler.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/service/HttpRequestVerbHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/service/HttpRequestVerbHandler.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,729 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.lang.management.RuntimeMXBean;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.CalloutDeployMessage;
+import org.apache.cassandra.db.CalloutManager;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.http.ColumnFamilyFormatter;
+import org.apache.cassandra.net.http.HTMLFormatter;
+import org.apache.cassandra.net.http.HttpConnection;
+import org.apache.cassandra.net.http.HttpRequest;
+import org.apache.cassandra.net.http.HttpWriteResponse;
+import org.apache.cassandra.procedures.GroovyScriptRunner;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.net.*;
+
+/*
+ * This class handles the incoming HTTP request after
+ * it has been parsed.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class HttpRequestVerbHandler implements IVerbHandler
+{
+    private static final Logger logger_ = Logger.getLogger(HttpRequestVerbHandler.class);
+    /* These are the list of actions supported */
+    private static final String DETAILS = "details";
+    private static final String LOADME = "loadme";
+    private static final String KILLME = "killme";
+    private static final String COMPACTME = "compactme";
+    private static final String LB_HEALTH_CHECK = "lb_health_check";
+    private static final String LB_HEALTH_CHECK_RESPONSE = "I-AM-ALIVE";
+    private static final String QUERY = "query";
+    private static final String INSERT = "insert";
+    private static final String SCRIPT = "script";
+    private static final String QUERYRESULTSDIV = "queryResultsDiv";
+    private static final String INSERTRESULTSDIV = "insertResultsDiv";
+    private static final String SCRIPTRESULTSDIV = "insertResultsDiv";
+    private static final String JS_UPDATE_QUERY_FUNCTION = "updateQueryResults";
+    private static final String JS_UPDATE_INSERT_FUNCTION = "updateInsertResults";
+
+    private StorageService storageService_;
+
+    public HttpRequestVerbHandler(StorageService storageService)
+    {
+        storageService_ = storageService;
+    }
+
+    public void doVerb(Message message)
+    {
+        HttpConnection.HttpRequestMessage httpRequestMessage = (HttpConnection.HttpRequestMessage)message.getMessageBody()[0];
+        try
+        {
+            HttpRequest httpRequest = httpRequestMessage.getHttpRequest();
+            HttpWriteResponse httpServerResponse = new HttpWriteResponse(httpRequest);
+            if(httpRequest.getMethod().toUpperCase().equals("GET"))
+            {
+                // handle the get request type
+                doGet(httpRequest, httpServerResponse);
+            }
+            else if(httpRequest.getMethod().toUpperCase().equals("POST"))
+            {
+                // handle the POST request type
+                doPost(httpRequest, httpServerResponse);
+            }
+
+            // write the response we have constructed into the socket
+            ByteBuffer buffer = httpServerResponse.flush();
+            httpRequestMessage.getHttpConnection().write(buffer);
+        }
+        catch(Exception e)
+        {
+            logger_.warn(LogUtil.throwableToString(e));
+        }
+    }
+
+    private void doGet(HttpRequest httpRequest, HttpWriteResponse httpResponse)
+    {
+        boolean fServeSummary = true;
+        HTMLFormatter formatter = new HTMLFormatter();
+        String query = httpRequest.getQuery();
+        /*
+         * we do not care about the path for most requests except those
+         * from the load balancer
+         */
+        String path = httpRequest.getPath();
+        /* for the health checks, just return the string only */
+        if(path.indexOf(LB_HEALTH_CHECK) != -1)
+        {
+        	httpResponse.println(handleLBHealthCheck());
+            return;
+        }
+
+        formatter.startBody(true, getJSFunctions(), true, true);
+        formatter.appendLine("<h1><font color=\"white\"> Cluster map </font></h1>");
+
+        StringBuilder sbResult = new StringBuilder();
+        do
+        {
+            if(query.indexOf(DETAILS) != -1)
+            {
+                fServeSummary = false;
+                sbResult.append(handleNodeDetails());
+                break;
+            }
+            else if(query.indexOf(LOADME) != -1)
+            {
+                sbResult.append(handleLoadMe());
+                break;
+            }
+            else if(query.indexOf(KILLME) != -1)
+            {
+                sbResult.append(handleKillMe());
+                break;
+            }
+            else if(query.indexOf(COMPACTME) != -1)
+            {
+                sbResult.append(handleCompactMe());
+                break;
+            }
+        }
+        while(false);
+
+        //formatter.appendLine("<br>-------END DEBUG INFO-------<br><br>");
+
+        if(fServeSummary)
+        {
+            formatter.appendLine(handlePageDisplay(null, null, null));
+        }
+
+        formatter.appendLine("<br>");
+
+        if(sbResult.toString() != null)
+        {
+            formatter.appendLine(sbResult.toString());
+        }
+
+        formatter.endBody();
+        httpResponse.println(formatter.toString());
+    }
+
+    /*
+     * As a result of the POST query, we currently only send back some
+     * javascript that updates the data in some place on the browser.
+    */
+    private void doPost(HttpRequest httpRequest, HttpWriteResponse httpResponse)
+    {
+        String query = httpRequest.getQuery();
+
+        HTMLFormatter formatter = new HTMLFormatter();
+        formatter.startBody(true, getJSFunctions(), true, true);
+        formatter.appendLine("<h1><font color=\"white\"> Cluster map </font></h1>");
+
+        // write a shell for adding some javascript to do in-place updates
+        StringBuilder sbResult = new StringBuilder();
+        do
+        {
+            if(query.indexOf(QUERY) != -1)
+            {
+                sbResult.append(handleQuery(httpRequest));
+                break;
+            }
+            else if(query.indexOf(INSERT) != -1)
+            {
+                sbResult.append(handleInsert(httpRequest));
+                break;
+            }
+            else if(query.indexOf(SCRIPT) != -1)
+            {
+                sbResult.append(handleScript(httpRequest));
+                break;
+            }
+        }
+        while(false);
+
+        if(sbResult.toString() != null)
+        {
+            formatter.appendLine(sbResult.toString());
+        }
+
+        formatter.endBody();
+
+    	httpResponse.println(formatter.toString());
+    }
+
+    private String handleNodeDetails()
+    {
+        HTMLFormatter formatter = new HTMLFormatter();
+
+        formatter.appendLine("Token: " + storageService_.getToken());
+        RuntimeMXBean runtimeMxBean = ManagementFactory.getRuntimeMXBean();
+        formatter.appendLine("Up time (in seconds): " + (runtimeMxBean.getUptime()/1000));
+
+        MemoryMXBean memoryMxBean = ManagementFactory.getMemoryMXBean();
+        MemoryUsage memUsage = memoryMxBean.getHeapMemoryUsage();
+        java.text.DecimalFormat df = new java.text.DecimalFormat("#0.00");
+        String smemUsed = df.format((double)memUsage.getUsed()/(1024 * 1024));
+        String smemMax = df.format((double)memUsage.getMax()/(1024 * 1024));
+        formatter.appendLine("Heap memory usage (in MB): " + smemUsed + "/" + smemMax);
+
+        formatter.appendLine("<br>");
+        formatter.appendLine("<br>");
+
+        /*
+         * Display DB statatics if we have something to show.
+        */
+        displayDBStatistics(formatter, df);
+
+        formatter.appendLine("<button onClick=\"window.location='" + StorageService.getHostUrl() + "?" + LOADME + "=T'\">Load Me</button>");
+        formatter.appendLine("<button onClick=\"window.location='" + StorageService.getHostUrl() + "?" + COMPACTME + "=T'\">Compact Me</button>");
+        formatter.appendLine("<button onClick=\"window.location='" + StorageService.getHostUrl() + "?" + KILLME + "=T'\">Kill Me</button>");
+
+        formatter.appendLine("<br>");
+        formatter.appendLine("<br><a href='" + StorageService.getHostUrl() + "'>Back to live nodes list" + "</a>");
+
+        return formatter.toString();
+    }
+
+    private void displayDBStatistics(HTMLFormatter formatter, java.text.DecimalFormat df)
+    {
+        String tableStats = Table.open( DatabaseDescriptor.getTables().get(0) ).tableStats("\n<br>\n", df);
+
+        if ( tableStats.length() == 0 )
+            return;
+
+        formatter.appendLine("DB statistics:");
+        formatter.appendLine("<br>");
+        formatter.appendLine("<br>");
+
+        formatter.appendLine(tableStats);
+        formatter.appendLine("<br>");
+        formatter.appendLine("<br>");
+    }
+
+    private String handlePageDisplay(String queryFormData, String insertFormData, String scriptFormData)
+    {
+    	StringBuilder sb = new StringBuilder();
+		sb.append("\n<div id=\"header\"> \n");
+		sb.append("<ul>\n");
+		sb.append("	<li name=\"one\" onclick=\"javascript:selectTab('one')\"><a href=\"#\">Cluster</a></li>\n");
+		sb.append("	<li name=\"two\" onclick=\"javascript:selectTab('two')\"><a href=\"#\">SQL</a></li>\n");
+		sb.append("	<li name=\"three\" onclick=\"javascript:selectTab('three')\"><a href=\"#\">Ring</a></li>\n");
+		sb.append("</ul>\n");
+		sb.append("</div>\n\n");
+
+		sb.append("<div name=\"one\" id=\"content\"> <!-- start tab one -->\n\n");
+        sb.append(serveSummary());
+        sb.append("</div> <!-- finish tab one -->\n\n");
+
+        sb.append("<div name=\"two\" id=\"content\"> <!-- start tab two -->\n\n");
+        sb.append(serveInsertForm(insertFormData));
+        sb.append(serveQueryForm(queryFormData));
+        sb.append(serveGroovyForm(scriptFormData));
+        sb.append("</div> <!-- finish tab two -->\n\n");
+
+        sb.append("<div name=\"three\" id=\"content\"> <!-- start tab three -->\n\n");
+        sb.append(serveRingView());
+        sb.append("</div> <!-- finish tab three -->\n\n");
+
+        sb.append("\n<script type=\"text/javascript\">\n");
+        if(queryFormData != null || insertFormData != null || scriptFormData != null)
+        	sb.append("selectTab(\"two\");\n");
+        else
+        	sb.append("selectTab(\"one\");\n");
+
+        sb.append("</script>\n");
+
+        return (sb.toString() == null)?"":sb.toString();
+    }
+
+    /*
+     * Serve the summary of the current node.
+     */
+    private String serveSummary()
+    {
+        HTMLFormatter formatter = new HTMLFormatter();
+
+        Set<EndPoint> liveNodeList = Gossiper.instance().getAllMembers();
+        // we want this set of live nodes sorted based on the hostname
+        EndPoint[] liveNodes = liveNodeList.toArray(new EndPoint[0]);
+        Arrays.sort(liveNodes);
+
+        String[] sHeaders = {"Node No.", "Host:Port", "Status", "Leader", "Load Info", "Token", "Generation No."};
+        formatter.startTable();
+        formatter.addHeaders(sHeaders);
+        int iNodeNumber = 0;
+        for( EndPoint curNode : liveNodes )
+        {
+            formatter.startRow();
+            ++iNodeNumber;
+
+            // Node No.
+            formatter.addCol("" + iNodeNumber);
+            // Host:Port
+            formatter.addCol("<a href='http://" + curNode.getHost() + ":" + DatabaseDescriptor.getHttpPort() + "/home?" + DETAILS + "=T'>" + curNode.getHost() + ":" + curNode.getPort() + "</a>");
+            //Status
+            String status = ( FailureDetector.instance().isAlive(curNode) ) ? "Up" : "Down";
+            formatter.addCol(status);
+            //Leader
+            boolean isLeader = StorageService.instance().isLeader(curNode);
+            formatter.addCol(Boolean.toString(isLeader));
+            //Load Info
+            String loadInfo = getLoadInfo(curNode);
+            formatter.addCol(loadInfo);
+            // Token
+            if(curNode == null)
+                formatter.addCol("NULL!");
+            else
+                formatter.addCol(storageService_.getToken(curNode));
+            // Generation Number
+            formatter.addCol(Integer.toString(Gossiper.instance().getCurrentGenerationNumber(curNode)));
+
+            formatter.endRow();
+        }
+
+        formatter.endTable();
+
+        return formatter.toString();
+    }
+
+    private String serveRingView()
+    {
+        HTMLFormatter formatter = new HTMLFormatter();
+        String[] sHeaders = {"Range No.", "Range", "N1", "N2", "N3"};
+        formatter.startTable();
+        formatter.addHeaders(sHeaders);
+
+        Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().getRangeToEndPointMap();
+        Set<Range> rangeSet = oldRangeToEndPointMap.keySet();
+
+        int iNodeNumber = 0;
+        for ( Range range : rangeSet )
+        {
+        	formatter.startRow();
+            ++iNodeNumber;
+
+            // Range No.
+            formatter.addCol("" + iNodeNumber);
+
+            // Range
+            formatter.addCol("(" + range.left() + ",<br>" + range.right() + "]");
+
+            List<EndPoint> replicas = oldRangeToEndPointMap.get(range);
+            for ( EndPoint replica : replicas )
+            {
+            	// N1 N2 N3
+            	formatter.addCol(replica.toString());
+            }
+
+            formatter.endRow();
+        }
+
+        formatter.endTable();
+
+        return formatter.toString();
+    }
+
+    private String getLoadInfo(EndPoint ep)
+    {
+        if ( StorageService.getLocalControlEndPoint().equals(ep) )
+        {
+            return StorageService.instance().getLoadInfo();
+        }
+        else
+        {
+            return StorageService.instance().getLoadInfo(ep);
+        }
+    }
+
+    /*
+     * Returns the HTML code for a form to query data from the db cluster.
+     */
+    private String serveQueryForm(String queryResult)
+    {
+        HTMLFormatter formatter = new HTMLFormatter();
+        formatter.appendLine("<BR><fieldset><legend>Query the cluster</legend>");
+        formatter.appendLine("<FORM action=\"" + StorageService.getHostUrl() + "/home?" + QUERY + "=T\" method=\"post\">");
+
+        // get the list of column families
+        Table table = Table.open("Mailbox");
+        Set<String> columnFamilyComboBoxSet = table.getColumnFamilies();
+
+        formatter.append("select from ");
+        formatter.addCombobox(columnFamilyComboBoxSet, "columnfamily", 0);
+        formatter.append(" : <INPUT name=columnName>");
+        formatter.appendLine(" where key = <INPUT name=key>");
+        formatter.appendLine("<BR>");
+        formatter.appendLine("<INPUT type=\"submit\" value=\"Send\"> <INPUT type=\"reset\">");
+
+        formatter.appendLine("</FORM>");
+        formatter.addDivElement(QUERYRESULTSDIV, queryResult);
+        formatter.appendLine("</fieldset><BR>");
+
+        return formatter.toString();
+    }
+
+    /*
+     * Returns the HTML code for a form to to run custom code on the cluster.
+     */
+    private String serveGroovyForm(String scriptResult)
+    {
+        HTMLFormatter formatter = new HTMLFormatter();
+        formatter.appendLine("<BR><fieldset><legend>Run custom code on the cluster</legend>");
+        formatter.appendLine("<FORM action=\"" + StorageService.getHostUrl() + "/home?" + SCRIPT + "=T\" method=\"post\">");
+        formatter.append(" Callout name : <INPUT name=calloutName>");
+        formatter.appendLine("<BR>");
+        formatter.append("Groovy code to run on the server:<br>");
+        formatter.append("<textarea name=scriptTextArea rows=\"10\" cols=\"100\"></textarea>");
+        formatter.appendLine("<BR>");
+        formatter.appendLine("<INPUT name=deploy type=\"submit\" value=\"Deploy\"> <INPUT name=execute type=\"submit\" value=\"Execute\"> <INPUT name=reset type=\"reset\">");
+
+        formatter.appendLine("</FORM>");
+        formatter.addDivElement(SCRIPTRESULTSDIV, scriptResult);
+        formatter.appendLine("</fieldset><BR>");
+
+        return formatter.toString();
+    }
+
+    /*
+     * Returns the HTML code for a form to insert data into the db cluster.
+     */
+    private String serveInsertForm(String insertResult)
+    {
+        HTMLFormatter formatter = new HTMLFormatter();
+        formatter.appendLine("<BR><fieldset>\n<legend>Insert data into the cluster</legend>\n");
+        formatter.appendLine("<FORM action=\"" + StorageService.getHostUrl() + "/home?" + INSERT + "=T\" method=\"post\">");
+
+        // get the list of column families
+        Table table = Table.open("Mailbox");
+        Set<String> columnFamilyComboBoxSet = table.getColumnFamilies();
+
+        formatter.append("insert into ");
+        formatter.addCombobox(columnFamilyComboBoxSet, "columnfamily", 0);
+        formatter.append(" : <INPUT name=columnName>");
+        formatter.append(" data = <INPUT name=data>");
+        formatter.appendLine(" where key = <INPUT name=key>\n");
+        formatter.appendLine("<BR>\n");
+        formatter.appendLine("<INPUT type=\"submit\" value=\"Send\"> <INPUT type=\"reset\">\n");
+
+        formatter.appendLine("</FORM>\n");
+        formatter.addDivElement(INSERTRESULTSDIV, insertResult);
+        formatter.appendLine("</fieldset>\n<BR>\n");
+
+        return formatter.toString();
+    }
+
+    /*
+     * Handle the query of some data from the client.
+     */
+    private String handleQuery(HttpRequest httpRequest)
+    {
+    	boolean fQuerySuccess = false;
+    	String sRetVal = "";
+
+    	// get the various values for this HTTP request
+    	String sColumnFamily = httpRequest.getParameter("columnfamily");
+    	String sColumn = httpRequest.getParameter("columnName");
+    	String sKey = httpRequest.getParameter("key");
+
+    	// get the table name
+    	String sTableName = DatabaseDescriptor.getTables().get(0);
+
+    	StringBuilder sb = new StringBuilder();
+    	ColumnFamilyFormatter cformatter = new ColumnFamilyFormatter(sb);
+
+        try
+        {
+	    	Table table = Table.open(sTableName);
+	    	String queryFor = sColumnFamily;
+	    	if(sColumn != null && !"*".equals(sColumn))
+    		{
+	    		queryFor += ":" + sColumn;
+    		}
+	        ColumnFamily cf = table.get(sKey, queryFor);
+
+	        if (cf == null)
+	        {
+	            sRetVal = "Key [" + sKey + "], column family [" + sColumnFamily + "] not found.";
+	        }
+	        else
+	        {
+		        cformatter.printKeyColumnFamily(sb, sKey, cf);
+	        	fQuerySuccess = true;
+	        	sRetVal = sb.toString();
+	        }
+        }
+        catch (Exception e)
+        {
+        	// write failed - return the reason
+        	sRetVal = e.getMessage();
+        }
+
+        if(fQuerySuccess)
+        	sRetVal = "Success: " + sRetVal;
+        else
+        	sRetVal = "Error: " + sRetVal;
+
+        return handlePageDisplay(sRetVal, null, null);
+    }
+
+    /*
+     * Handle the query of some data from the client.
+     */
+    private String handleInsert(HttpRequest httpRequest)
+    {
+    	boolean fInsertSuccess = false;
+    	String sRetVal = "";
+
+    	// get the various values for this HTTP request
+    	String sColumnFamily = httpRequest.getParameter("columnfamily");
+    	String sColumn = httpRequest.getParameter("columnName");
+    	String sKey = httpRequest.getParameter("key");
+    	String sDataToInsert = httpRequest.getParameter("data");
+
+    	// get the table name
+    	String sTableName = DatabaseDescriptor.getTables().get(0);
+
+        try
+        {
+        	// do the insert first
+            RowMutation rm = new RowMutation(sTableName, sKey);
+            rm.add(sColumnFamily + ":" + sColumn, sDataToInsert.getBytes(), 0);
+            rm.apply();
+
+            fInsertSuccess = true;
+	        sRetVal = "columnfamily=" + httpRequest.getParameter("columnfamily") + " key=" + httpRequest.getParameter("key") + " data=" + httpRequest.getParameter("data");
+        }
+        catch (Exception e)
+        {
+        	// write failed - return the reason
+        	sRetVal = e.getMessage();
+        }
+        System.out.println("Write done ...");
+
+        if(fInsertSuccess)
+        	sRetVal = "The insert was successful : " + sRetVal;
+        else
+        	sRetVal = "The insert was failed : " + sRetVal;
+
+        return handlePageDisplay(null, sRetVal, null);
+    }
+
+    /*
+     * Handle the script to be run on the server.
+     */
+    private String handleScript(HttpRequest httpRequest)
+    {
+    	boolean fQuerySuccess = false;
+    	String sRetVal = "";
+
+    	// get the various values for this HTTP request
+        String callout = httpRequest.getParameter("calloutName");        
+    	String script = httpRequest.getParameter("scriptTextArea");
+        String deploy = httpRequest.getParameter("deploy");
+        String execute = httpRequest.getParameter("execute");
+    	try
+    	{
+            if ( deploy != null )
+            {
+                if ( callout != null && script != null )
+                {
+                    doDeploy(callout, script);
+                    sRetVal = "Finished deployment of callouts ...";
+                }
+            }
+            if ( execute != null )
+            {
+                if ( script != null )
+                {
+            		sRetVal = GroovyScriptRunner.evaluateString(script);            		
+                }
+            }
+            fQuerySuccess = true;
+    	}
+    	catch(Throwable t)
+    	{
+    		sRetVal = t.getMessage();
+    		logger_.warn(LogUtil.throwableToString(t));
+    	}
+
+        if(fQuerySuccess)
+        	sRetVal = "Result: Success<br>\nReturn value: <br>\n" + sRetVal;
+        else
+        	sRetVal = "Result: Error<br>\nError: <br>\n" + sRetVal;
+
+        return handlePageDisplay(null, null, sRetVal);
+    }
+    
+    private void doDeploy(String callout, String script)
+    {
+        Set<EndPoint> allMbrs = Gossiper.instance().getAllMembers();                
+        /* Send the script to all mbrs to deploy it locally. */
+        CalloutDeployMessage cdMessage = new CalloutDeployMessage(callout, script);
+        try
+        {
+            Message message = CalloutDeployMessage.getCalloutDeployMessage(cdMessage);
+            for ( EndPoint mbr : allMbrs )
+            {
+                if ( mbr.equals( StorageService.getLocalControlEndPoint() ) )
+                {
+                    /* Deploy locally */
+                    CalloutManager.instance().addCallout(callout, script);
+                }
+                else
+                {
+                    EndPoint to = new EndPoint(mbr.getHost(), DatabaseDescriptor.getStoragePort());
+                    logger_.debug("Deploying the script to " + mbr);
+                    MessagingService.getMessagingInstance().sendOneWay(message, to);
+                }
+            }
+        }
+        catch ( IOException ex )
+        {
+            logger_.warn( LogUtil.throwableToString(ex) );
+        }
+    }
+
+    private String getJSFunctions()
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append("function " + JS_UPDATE_QUERY_FUNCTION + "(text)\n");
+        sb.append("{\n");
+        sb.append("    obj = document.getElementById(\"" + QUERYRESULTSDIV + "\");\n");
+        sb.append("    if(obj)\n");
+        sb.append("        obj.innerHTML = text;\n");
+        sb.append("}\n");
+        sb.append("\n");
+        sb.append("function " + JS_UPDATE_INSERT_FUNCTION + "(text)\n");
+        sb.append("{\n");
+        sb.append("    obj = document.getElementById(\"" + INSERTRESULTSDIV + "\");\n");
+        sb.append("    if(obj)\n");
+        sb.append("        obj.innerHTML = text;\n");
+        sb.append("}\n");
+        sb.append("\n");
+
+        return sb.toString();
+    }
+
+    /*
+     * Load the current node with data.
+     */
+    private String handleLoadMe()
+    {
+        return "Loading...";
+    }
+
+    private String handleCompactMe()
+    {
+        Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+        try
+        {
+            table.forceCompaction();
+        }
+        catch (IOException ex)
+        {
+            logger_.debug(LogUtil.throwableToString(ex));
+        }
+        return "Compacting ...";
+    }
+
+    private String handleLBHealthCheck()
+    {
+    	if(StorageService.instance().isShutdown())
+    		return "";
+    	return LB_HEALTH_CHECK_RESPONSE;
+    }
+
+    /*
+     * Kill the current node.
+     */
+    private String handleKillMe()
+    {
+    	if(StorageService.instance().isShutdown())
+    		return "Already scheduled for being shutdown";
+    	/*
+    	 * The storage service will wait for a period of time to let the
+    	 * VIP know that we are shutting down, then will perform an actual
+    	 * shutdown on a separate thread.
+    	 */
+        String status = "Service has been killed";
+        try
+        {
+            StorageService.instance().killMe();
+        }
+        catch( Throwable th )
+        {
+            logger_.warn(LogUtil.throwableToString(th));
+            status = "Failed to kill service.";
+        }
+    	return status;
+    }
+
+}

Added: incubator/cassandra/src/org/apache/cassandra/service/IComponentShutdown.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/service/IComponentShutdown.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/service/IComponentShutdown.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/service/IComponentShutdown.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+public interface IComponentShutdown
+{
+	public void shutdown();
+}

Added: incubator/cassandra/src/org/apache/cassandra/service/IPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/service/IPartitioner.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/service/IPartitioner.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/service/IPartitioner.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.math.BigInteger;
+
+public interface IPartitioner
+{
+    public BigInteger hash(String key);
+}