You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/02 07:13:17 UTC

svn commit: r749207 [11/12] - in /incubator/cassandra/src/org/apache/cassandra: loader/ locator/ net/ net/http/ net/io/ net/sink/ procedures/ service/ test/ tools/

Added: incubator/cassandra/src/org/apache/cassandra/test/SSTableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/test/SSTableTest.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/test/SSTableTest.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/test/SSTableTest.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.PrimaryKey;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.service.PartitionerType;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FBUtilities;
+
+
+public class SSTableTest
+{
+    private static void rawSSTableWrite() throws Throwable
+    {
+        SSTable ssTable = new SSTable("C:\\Engagements\\Cassandra", "Table-Test-1");
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        BloomFilter bf = new BloomFilter(1000, 8);
+        byte[] bytes = new byte[64*1024];
+        Random random = new Random();
+        for ( int i = 100; i < 1000; ++i )
+        {
+            String key = Integer.toString(i);
+            ColumnFamily cf = new ColumnFamily("Test", "Standard");
+            bufOut.reset();           
+            // random.nextBytes(bytes);
+            cf.createColumn("C", "Avinash Lakshman is a good man".getBytes(), i);
+            ColumnFamily.serializer2().serialize(cf, bufOut);
+            ssTable.append(key, bufOut);            
+            bf.fill(key);
+        }
+        ssTable.close(bf);
+    }
+    
+    private static void hashSSTableWrite() throws Throwable
+    {        
+        Map<String, ColumnFamily> columnFamilies = new HashMap<String, ColumnFamily>();                
+        byte[] bytes = new byte[64*1024];
+        Random random = new Random();
+        for ( int i = 100; i < 1000; ++i )
+        {
+            String key = Integer.toString(i);
+            ColumnFamily cf = new ColumnFamily("Test", "Standard");                      
+            // random.nextBytes(bytes);
+            cf.createColumn("C", "Avinash Lakshman is a good man".getBytes(), i);
+            columnFamilies.put(key, cf);
+        } 
+        flushForRandomPartitioner(columnFamilies);
+    }
+    
+    private static void flushForRandomPartitioner(Map<String, ColumnFamily> columnFamilies) throws Throwable
+    {
+        SSTable ssTable = new SSTable("C:\\Engagements\\Cassandra", "Table-Test-1", PartitionerType.RANDOM);
+        /* List of primary keys in sorted order */
+        List<PrimaryKey> pKeys = PrimaryKey.create( columnFamilies.keySet() );
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        /* Use this BloomFilter to decide if a key exists in a SSTable */
+        BloomFilter bf = new BloomFilter(pKeys.size(), 15);
+        for ( PrimaryKey pKey : pKeys )
+        {
+            buffer.reset();
+            ColumnFamily columnFamily = columnFamilies.get(pKey.key());
+            if ( columnFamily != null )
+            {
+                /* serialize the cf with column indexes */
+                ColumnFamily.serializer2().serialize( columnFamily, buffer );
+                /* Now write the key and value to disk */
+                ssTable.append(pKey.key(), pKey.hash(), buffer);
+                bf.fill(pKey.key());                
+            }
+        }
+        ssTable.close(bf);
+    }
+    
+    private static void readSSTable() throws Throwable
+    {
+        SSTable ssTable = new SSTable("C:\\Engagements\\Cassandra\\Table-Test-1-Data.db");  
+        for ( int i = 100; i < 1000; ++i )
+        {
+            String key = Integer.toString(i);            
+            DataInputBuffer bufIn = ssTable.next(key, "Test:C");
+            ColumnFamily cf = ColumnFamily.serializer().deserialize(bufIn);
+            if ( cf != null )
+            {            
+                System.out.println("KEY:" + key);
+                System.out.println(cf.name());
+                Collection<IColumn> columns = cf.getAllColumns();
+                for ( IColumn column : columns )
+                {
+                    System.out.println(column.name());
+                }
+            }
+            else
+            {
+                System.out.println("CF doesn't exist for key " + key);
+            }                             
+        }
+    }
+    
+    public static void main(String[] args) throws Throwable
+    {
+        BloomFilter bf = new BloomFilter(1024*1024, 15);
+        for ( int i = 0; i < 1024*1024; ++i )
+        {
+            bf.fill(Integer.toString(i));
+        }
+        
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        BloomFilter.serializer().serialize(bf, bufOut);
+        FileOutputStream fos = new FileOutputStream("C:\\Engagements\\bf.dat", true);
+        fos.write(bufOut.getData(), 0, bufOut.getLength());
+        fos.close();
+        
+        FileInputStream fis = new FileInputStream("C:\\Engagements\\bf.dat");
+        byte[] bytes = new byte[fis.available()];
+        fis.read(bytes);
+        DataInputBuffer bufIn = new DataInputBuffer();
+        bufIn.reset(bytes, bytes.length );
+        BloomFilter bf2 = BloomFilter.serializer().deserialize(bufIn);
+        
+        int count = 0;
+        for ( int i = 0; i < 1024*1024; ++i )
+        {
+            if ( bf.isPresent(Integer.toString(i)) )
+                ++count;
+        }
+        System.out.println(count);
+        
+        //DatabaseDescriptor.init();
+        //hashSSTableWrite();
+        //rawSSTableWrite();
+        //readSSTable();
+    } 
+}

Added: incubator/cassandra/src/org/apache/cassandra/test/StressTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/test/StressTest.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/test/StressTest.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/test/StressTest.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,883 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.analytics.AnalyticsContext;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IAsyncResult;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.Cassandra;
+import org.apache.cassandra.service.IResponseResolver;
+import org.apache.cassandra.service.QuorumResponseHandler;
+import org.apache.cassandra.service.ReadResponseResolver;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.batch_mutation_super_t;
+import org.apache.cassandra.service.batch_mutation_t;
+import org.apache.cassandra.service.column_t;
+import org.apache.cassandra.service.superColumn_t;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import com.facebook.thrift.protocol.TBinaryProtocol;
+import com.facebook.thrift.transport.TSocket;
+import com.facebook.thrift.transport.TTransport;
+import com.martiansoftware.jsap.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class StressTest
+{
+	private static Logger logger_ = Logger.getLogger(DataImporter.class);
+
+	private static final String tablename_ = new String("Test");
+
+	public static EndPoint from_ = new EndPoint("172.24.24.209", 10001);
+
+	public static EndPoint to_ = new EndPoint("hadoop071.sf2p.facebook.com", 7000);
+	private static  String server_ = new String("hadoop071.sf2p.facebook.com");
+	private static final String columnFamilyColumn_ = new String("ColumnList");
+	private static final String columnFamilySuperColumn_ = new String("SuperColumnList");
+	private static final String keyFix_ = new String("");
+	private static final String columnFix_ = new String("Column-");
+	private static final String superColumnFix_ = new String("SuperColumn-");
+
+	private Cassandra.Client peerstorageClient_ = null;
+	TTransport transport_ = null;
+	private int requestsPerSecond_ = 1000;
+    private ExecutorService runner_ = null;
+
+    
+    class LoadManager implements Runnable
+    {
+    	private RowMutationMessage rmsg_ = null;
+    	private batch_mutation_t bt_ = null;
+    	private batch_mutation_super_t bts_ = null;
+   
+    	LoadManager(RowMutationMessage rmsg)
+        {
+    		rmsg_ = rmsg;
+        }
+    	LoadManager(batch_mutation_t bt)
+        {
+    		bt_ = bt;
+        }
+    	LoadManager(batch_mutation_super_t bts)
+        {
+    		bts_ = bts;
+        }
+        
+        public void run()
+        {
+        	if( rmsg_ != null )
+        	{
+				Message message = new Message(from_ , StorageService.mutationStage_,
+						StorageService.loadVerbHandler_, new Object[] { rmsg_ });
+				MessagingService.getMessagingInstance().sendOneWay(message, to_);
+        	}
+        	
+       	}
+    }
+
+	
+    /*
+     * This function will apply the given task . It is based on a requests per
+     * second member variable which can be set to teh required ammount , it will
+     * generate only those many requests and if thos emany requests have already
+     * been entered then it will sleep . This function assumes that there is no
+     * waiting in any other part of the code so the requests are being generated
+     * instantaniously .
+     */
+    public void applyLoad(RowMutation rm) throws IOException {
+        try
+        {
+            long t = System.currentTimeMillis();
+            RowMutationMessage rmMsg = new RowMutationMessage(rm);           
+            Message message = new Message(from_, 
+                    StorageService.mutationStage_,
+                    StorageService.mutationVerbHandler_, 
+                    new Object[]{ rmMsg }
+            );                                                            
+			MessagingService.getMessagingInstance().sendOneWay(message, to_);
+            Thread.sleep(1, 1000000000/requestsPerSecond_);
+            
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+        }
+        
+    }	
+	
+    public void readLoad(ReadMessage readMessage)
+    {
+		IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
+		QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(
+				1,
+				readResponseResolver);
+		Message message = new Message(from_, StorageService.readStage_,
+				StorageService.readVerbHandler_,
+				new Object[] { readMessage });
+		MessagingService.getMessagingInstance().sendOneWay(message, to_);
+		/*IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, to_);
+		try
+		{
+			long t = System.currentTimeMillis();
+			iar.get(2000, TimeUnit.MILLISECONDS );
+			logger_.debug("Time taken for read..."
+					+ (System.currentTimeMillis() - t));
+			
+		}
+		catch (Exception ex)
+		{
+            ex.printStackTrace();
+		}*/
+    }
+    
+    
+    
+    
+    
+	public void randomReadColumn  (int keys, int columns, int size, int tps)
+	{
+        Random random = new Random();
+		try
+		{
+			while(true)
+			{
+				int key = random.nextInt(keys) + 1;
+	            String stringKey = new Integer(key).toString();
+	            stringKey = stringKey + keyFix_ ;
+            	int j = random.nextInt(columns) + 1;
+	            ReadMessage rm = new ReadMessage(tablename_, stringKey, columnFamilyColumn_ + ":" + columnFix_ + j);
+	            readLoad(rm);
+				if ( requestsPerSecond_ > 1000)
+					Thread.sleep(0, 1000000000/requestsPerSecond_);
+				else
+					Thread.sleep(1000/requestsPerSecond_);
+			}
+		}
+		catch(Exception ex)
+		{
+			ex.printStackTrace();
+		}
+	}
+
+	public void randomWriteColumn(int keys, int columns, int size, int tps)
+	{
+        Random random = new Random();
+        byte[] bytes = new byte[size];
+		int ts = 1;
+		try
+		{
+			while(true)
+			{
+				int key = random.nextInt(keys) + 1;
+	            String stringKey = new Integer(key).toString();
+	            stringKey = stringKey + keyFix_ ;
+	            RowMutation rm = new RowMutation(tablename_, stringKey);
+            	int j = random.nextInt(columns) + 1;
+                random.nextBytes(bytes);
+                rm.add( columnFamilyColumn_ + ":" + columnFix_ + j, bytes, ts);
+                if ( ts == Integer.MAX_VALUE)
+                {
+                	ts = 0 ;
+                }
+                ts++;
+				for(int k = 0 ; k < requestsPerSecond_/1000 +1 ; k++ )
+				{
+					runner_.submit(new LoadManager(new RowMutationMessage(rm)));
+				}
+				try
+				{
+					if ( requestsPerSecond_ > 1000)
+						Thread.sleep(1);
+					else
+						Thread.sleep(1000/requestsPerSecond_);
+				}
+				catch ( Exception ex)
+				{
+					
+				}
+			}
+		}
+		catch(Exception ex)
+		{
+			ex.printStackTrace();
+		}
+	}
+	
+	public void randomReadSuperColumn(int keys, int superColumns, int columns, int size, int tps)
+	{
+        Random random = new Random();
+		try
+		{
+			while(true)
+			{
+				int key = random.nextInt(keys) + 1;
+	            String stringKey = new Integer(key).toString();
+	            stringKey = stringKey + keyFix_ ;
+            	int i = random.nextInt(superColumns) + 1;
+            	int j = random.nextInt(columns) + 1;
+	            ReadMessage rm = new ReadMessage(tablename_, stringKey, columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j);
+	            readLoad(rm);
+			}
+		}
+		catch(Exception ex)
+		{
+			ex.printStackTrace();
+		}
+	}
+
+	
+	public void randomWriteSuperColumn(int keys, int superColumns,int columns, int size, int tps)
+	{
+        Random random = new Random();
+        byte[] bytes = new byte[size];
+		int ts = 1;
+		try
+		{
+			while(true)
+			{
+				int key = random.nextInt(keys) + 1;
+	            String stringKey = new Integer(key).toString();
+	            stringKey = stringKey + keyFix_ ;
+	            RowMutation rm = new RowMutation(tablename_, stringKey);
+            	int i = random.nextInt(superColumns) + 1;
+            	int j = random.nextInt(columns) + 1;
+                random.nextBytes(bytes);
+                rm.add( columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j, bytes, ts);
+                if ( ts == Integer.MAX_VALUE )
+                {
+                	ts = 0 ;
+                }
+                ts++;
+				for(int k = 0 ; k < requestsPerSecond_/1000 +1 ; k++ )
+				{
+					runner_.submit(new LoadManager(new RowMutationMessage(rm)));
+				}
+				try
+				{
+					if ( requestsPerSecond_ > 1000)
+						Thread.sleep(1);
+					else
+						Thread.sleep(1000/requestsPerSecond_);
+				}
+				catch ( Exception ex)
+				{
+					
+				}
+			}
+		}
+		catch(Exception ex)
+		{
+			ex.printStackTrace();
+		}
+	}
+
+	public void bulkWriteColumn(int keys, int columns, int size, int tps)
+	{
+        Random random = new Random();
+        byte[] bytes = new byte[size];
+		int ts = 1;
+		long time = System.currentTimeMillis();
+		try
+		{
+			for(int key = 1; key <= keys ; key++)
+			{
+	            String stringKey = new Integer(key).toString();
+	            stringKey = stringKey + keyFix_ ;
+	            RowMutation rm = new RowMutation(tablename_, stringKey);
+	            for( int j = 1; j <= columns ; j++)
+	            {
+	                random.nextBytes(bytes);
+	                rm.add( columnFamilyColumn_ + ":" + columnFix_ + j, bytes, ts);
+	            }
+				RowMutationMessage rmMsg = new RowMutationMessage(rm);
+				
+				for(int k = 0 ; k < requestsPerSecond_/1000 +1 ; k++ )
+				{
+					runner_.submit(new LoadManager(rmMsg));
+				}
+				try
+				{
+					if ( requestsPerSecond_ > 1000)
+						Thread.sleep(1);
+					else
+						Thread.sleep(1000/requestsPerSecond_);
+				}
+				catch ( Exception ex)
+				{
+					
+				}
+				
+			}
+		}
+		catch(Exception ex)
+		{
+			ex.printStackTrace();
+		}
+		System.out.println(System.currentTimeMillis() - time);
+	}
+	
+	public void bulkWriteSuperColumn(int keys, int superColumns, int columns, int size, int tps)
+	{
+        Random random = new Random();
+        byte[] bytes = new byte[size];
+		int ts = 1;
+		try
+		{
+			for(int key = 1; key <= keys ; key++)
+			{
+	            String stringKey = new Integer(key).toString();
+	            stringKey = stringKey + keyFix_ ;
+	            RowMutation rm = new RowMutation(tablename_, stringKey);
+	            for( int i = 1; i <= superColumns ; i++)
+	            {
+		            for( int j = 1; j <= columns ; j++)
+		            {
+		                random.nextBytes(bytes);
+		                rm.add( columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j, bytes, ts);
+		            }
+	            }
+	            RowMutationMessage rmMsg = new RowMutationMessage(rm);
+				for(int k = 0 ; k < requestsPerSecond_/1000 +1 ; k++ )
+				{
+					runner_.submit(new LoadManager(rmMsg));
+				}
+				try
+				{
+					if ( requestsPerSecond_ > 1000)
+						Thread.sleep(1);
+					else
+						Thread.sleep(1000/requestsPerSecond_);
+				}
+				catch ( Exception ex)
+				{
+					
+				}
+			}
+		}
+		catch(Exception ex)
+		{
+			ex.printStackTrace();
+		}
+	}
+
+	//  Stress the server using the thrift API 
+	
+	public Cassandra.Client connect() {
+		int port = 9160;
+		TSocket socket = new TSocket(server_, port); 
+		if(transport_ != null)
+			transport_.close();
+		transport_ = socket;
+
+		TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport_, false,
+				false);
+		Cassandra.Client peerstorageClient = new Cassandra.Client(
+				binaryProtocol);
+		try
+		{
+			transport_.open();
+		}
+		catch(Exception e)
+		{
+			e.printStackTrace();
+		}
+		return peerstorageClient;
+	}
+
+	public void applyThrift(String table, String key, String columnFamily, byte[] bytes, long ts ) {
+
+		try {
+			if ( requestsPerSecond_ > 1000)
+				Thread.sleep(0, 1000000000/requestsPerSecond_);
+			else
+				Thread.sleep(1000/requestsPerSecond_);
+			peerstorageClient_.insert(table, key, columnFamily, new String(bytes), ts);
+		} catch (Exception e) {
+			try {
+				peerstorageClient_ = connect();
+				peerstorageClient_.insert(table, key, columnFamily, new String(bytes), ts);
+			} catch (Exception e1) {
+				e1.printStackTrace();
+			}
+		}
+	}
+
+	
+	public void apply(batch_mutation_t batchMutation) {
+
+		try {
+			if ( requestsPerSecond_ > 1000)
+				Thread.sleep(0, 1000000000/requestsPerSecond_);
+			else
+				Thread.sleep(1000/requestsPerSecond_);
+			peerstorageClient_.batch_insert(batchMutation);
+		} catch (Exception e) {
+			try {
+				peerstorageClient_ = connect();
+				peerstorageClient_.batch_insert(batchMutation);
+			} catch (Exception e1) {
+				e1.printStackTrace();
+			}
+		}
+	}
+
+	public void apply(batch_mutation_super_t batchMutation) {
+
+		try {
+			if ( requestsPerSecond_ > 1000)
+				Thread.sleep(0, 1000000000/requestsPerSecond_);
+			else
+				Thread.sleep(1000/requestsPerSecond_);
+			long t = System.currentTimeMillis();
+			peerstorageClient_.batch_insert_superColumn(batchMutation);
+			logger_.debug("Time taken for thrift..."
+					+ (System.currentTimeMillis() - t));
+		} catch (Exception e) {
+			try {
+				peerstorageClient_ = connect();
+				peerstorageClient_.batch_insert_superColumn(batchMutation);
+			} catch (Exception e1) {
+				e1.printStackTrace();
+			}
+		}
+	}
+	
+	public void readLoadColumn(String tableName, String key, String cf)
+	{
+		try
+		{
+			column_t column = peerstorageClient_.get_column(tableName, key, cf);
+		}
+		catch(Exception ex)
+		{
+			peerstorageClient_ = connect();
+			ex.printStackTrace();
+		}
+	}
+	
+	public void randomReadColumnThrift(int keys, int columns, int size, int tps)
+	{
+        Random random = new Random();
+		try
+		{
+			while(true)
+			{
+				int key = random.nextInt(keys) + 1;
+	            String stringKey = new Integer(key).toString();
+	            stringKey = stringKey + keyFix_ ;
+            	int j = random.nextInt(columns) + 1;
+            	readLoadColumn(tablename_, stringKey, columnFamilyColumn_ + ":" + columnFix_ + j);
+				if ( requestsPerSecond_ > 1000)
+					Thread.sleep(0, 1000000000/requestsPerSecond_);
+				else
+					Thread.sleep(1000/requestsPerSecond_);
+			}
+		}
+		catch(Exception ex)
+		{
+			ex.printStackTrace();
+		}
+	}
+
+	public void randomWriteColumnThrift(int keys, int columns, int size, int tps)
+	{
+        Random random = new Random();
+        byte[] bytes = new byte[size];
+		int ts = 1;
+		try
+		{
+			while(true)
+			{
+				int key = random.nextInt(keys) + 1;
+	            String stringKey = new Integer(key).toString();
+	            stringKey = stringKey + keyFix_ ;
+            	int j = random.nextInt(columns) + 1;
+                random.nextBytes(bytes);
+                if ( ts == Integer.MAX_VALUE)
+                {
+                	ts = 0 ;
+                }
+                ts++;
+	            applyThrift(tablename_, stringKey, columnFamilyColumn_ + ":" + columnFix_ + j, bytes, ts);
+			}
+		}
+		catch(Exception ex)
+		{
+			ex.printStackTrace();
+		}
+	}
+	
+	public void randomReadSuperColumnThrift(int keys, int superColumns, int columns, int size, int tps)
+	{
+        Random random = new Random();
+		try
+		{
+			while(true)
+			{
+				int key = random.nextInt(keys) + 1;
+	            String stringKey = new Integer(key).toString();
+	            stringKey = stringKey + keyFix_ ;
+            	int i = random.nextInt(superColumns) + 1;
+            	int j = random.nextInt(columns) + 1;
+            	readLoadColumn(tablename_, stringKey, columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j);
+			}
+		}
+		catch(Exception ex)
+		{
+			ex.printStackTrace();
+		}
+	}
+
+	
+	public void randomWriteSuperColumnThrift(int keys, int superColumns,int columns, int size, int tps)
+	{
+        Random random = new Random();
+        byte[] bytes = new byte[size];
+		int ts = 1;
+		try
+		{
+			while(true)
+			{
+				int key = random.nextInt(keys) + 1;
+	            String stringKey = new Integer(key).toString();
+	            stringKey = stringKey + keyFix_ ;
+            	int i = random.nextInt(superColumns) + 1;
+            	int j = random.nextInt(columns) + 1;
+                random.nextBytes(bytes);
+                if ( ts == Integer.MAX_VALUE)
+                {
+                	ts = 0 ;
+                }
+                ts++;
+	            applyThrift(tablename_, stringKey, columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j, bytes, ts);
+			}
+		}
+		catch(Exception ex)
+		{
+			ex.printStackTrace();
+		}
+
+	
+	}
+
+	public void bulkWriteColumnThrift(int keys, int columns, int size, int tps)
+	{
+        Random random = new Random();
+        byte[] bytes = new byte[size];
+		int ts = 1;
+		long time = System.currentTimeMillis();
+		try
+		{
+			for(int key = 1; key <= keys ; key++)
+			{
+	            String stringKey = new Integer(key).toString();
+	            stringKey = stringKey + keyFix_ ;
+	            batch_mutation_t bt = new batch_mutation_t();
+	            bt.key = stringKey;
+	            bt.table = tablename_;
+	            bt.cfmap = new HashMap<String,List<column_t>>();
+	            ArrayList<column_t> column_arr = new ArrayList<column_t>();
+	            for( int j = 1; j <= columns ; j++)
+	            {
+	                random.nextBytes(bytes);
+	                column_arr.add(new column_t(columnFix_ + j, bytes.toString(), ts));
+	            }
+	            bt.cfmap.put(columnFamilyColumn_, column_arr);
+	            apply(bt);
+			}
+		}
+		catch(Exception ex)
+		{
+			ex.printStackTrace();
+		}
+		System.out.println(System.currentTimeMillis() - time);
+	}
+	
+	public void bulkWriteSuperColumnThrift(int keys, int supercolumns, int columns, int size, int tps)
+	{
+        Random random = new Random();
+        byte[] bytes = new byte[size];
+		int ts = 1;
+		long time = System.currentTimeMillis();
+		try
+		{
+			for(int key = 1; key <= keys ; key++)
+			{
+	            String stringKey = new Integer(key).toString();
+	            stringKey = stringKey + keyFix_ ;
+	            batch_mutation_super_t bt = new batch_mutation_super_t();
+	            bt.key = stringKey;
+	            bt.table = tablename_;
+	            bt.cfmap = new HashMap<String,List<superColumn_t>>();
+	            ArrayList<superColumn_t> superColumn_arr = new ArrayList<superColumn_t>();
+	            
+	            for( int i = 1; i <= supercolumns; i++ )
+	            {
+		            ArrayList<column_t> column_arr = new ArrayList<column_t>();
+		            for( int j = 1; j <= columns ; j++)
+		            {
+		                random.nextBytes(bytes);
+		                column_arr.add(new column_t(columnFix_ + j, bytes.toString(), ts));
+		            }
+	            	superColumn_arr.add(new superColumn_t(superColumnFix_ + i, column_arr));	
+	            }
+	            bt.cfmap.put(columnFamilySuperColumn_, superColumn_arr);
+	            apply(bt);
+			}
+		}
+		catch(Exception ex)
+		{
+			ex.printStackTrace();
+		}
+		System.out.println(System.currentTimeMillis() - time);
+	}
+	
+	public void testCommitLog() throws Throwable
+	{
+        Random random = new Random(System.currentTimeMillis());
+    	byte[] bytes = new byte[4096];
+    	random.nextBytes(bytes);
+    	byte[] bytes1 = new byte[64];
+    	random.nextBytes(bytes1);
+    	peerstorageClient_ = connect();
+    	int t = 0 ;
+    	while( true )
+    	{
+	    	int key = random.nextInt();
+	    	int threadId = random.nextInt();
+	    	int word = random.nextInt();
+			peerstorageClient_.insert("Mailbox", Integer.toString(key), "MailboxMailList0:" + Integer.toString(threadId),  new String(bytes1), t++);
+			peerstorageClient_.insert("Mailbox", Integer.toString(key), "MailboxThreadList0:" + Integer.toString(word) + ":" + Integer.toString(threadId),  new String(bytes), t++);
+			peerstorageClient_.insert("Mailbox", Integer.toString(key), "MailboxUserList0:"+ Integer.toString(word) + ":" + Integer.toString(threadId),  new String(bytes), t++);
+    	}
+	}
+
+	JSAPResult ParseArguments(String[] args)
+	{
+        JSAPResult config = null;    
+		try
+		{
+			
+	        SimpleJSAP jsap = new SimpleJSAP( 
+	                "StressTest", 
+	                "Runs stress test for Cassandra",
+	                new Parameter[] {
+	                    new FlaggedOption( "keys", JSAP.INTEGER_PARSER, "10000", JSAP.REQUIRED, 'k', JSAP.NO_LONGFLAG, 
+	                        "The number of keys from 1 to this number" ),
+	                    new FlaggedOption( "columns", JSAP.INTEGER_PARSER, "1000", JSAP.REQUIRED, 'c', JSAP.NO_LONGFLAG, 
+	                        "The number of columns from 1 to this number" ),
+	                    new FlaggedOption( "supercolumns", JSAP.INTEGER_PARSER, "0", JSAP.NOT_REQUIRED, 'u', JSAP.NO_LONGFLAG, 
+	                        "The number of super columns from 1 to this number" ),
+	                    new FlaggedOption( "size", JSAP.INTEGER_PARSER, "1000", JSAP.REQUIRED, 's', JSAP.NO_LONGFLAG, 
+	                        "The Size in bytes of each column" ),
+	                    new FlaggedOption( "tps", JSAP.INTEGER_PARSER, "1000", JSAP.REQUIRED, 't', JSAP.NO_LONGFLAG, 
+	                        "Requests per second" ),
+	                    new FlaggedOption( "thrift", JSAP.INTEGER_PARSER, "0", JSAP.REQUIRED, 'h', JSAP.NO_LONGFLAG, 
+	                        "Use Thrift - 1 , use messaging - 0" ),
+	                    new FlaggedOption( "mailboxstress", JSAP.INTEGER_PARSER, "0", JSAP.REQUIRED, 'M', JSAP.NO_LONGFLAG, 
+	                        "Run mailbox stress  - 1 , hmm default - 0" ),
+	                    new FlaggedOption( "commitLogTest", JSAP.INTEGER_PARSER, "0", JSAP.REQUIRED, 'C', JSAP.NO_LONGFLAG, 
+	                        "Run mailbox stress  - 1 , hmm default - 0" ),
+	                    new QualifiedSwitch( "randomize", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, JSAP.NOT_REQUIRED, 'z', "randomize", 
+	                        "Random reads or writes" ).setList( true ).setListSeparator( ',' ),
+	                    new QualifiedSwitch( "reads", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, JSAP.NOT_REQUIRED, 'r', "reads", 
+	                        "Read data" ).setList( true ).setListSeparator( ',' ),
+	                    new QualifiedSwitch( "writes", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, JSAP.NOT_REQUIRED, 'w', "writes", 
+	                        "Write Data" ).setList( false ).setListSeparator( ',' ),
+	                    new QualifiedSwitch( "bulkwrites", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, JSAP.NOT_REQUIRED, 'b', "bulkwrites", 
+	                        "Bulk Write Data" ).setList( false ).setListSeparator( ',' ),
+                        new UnflaggedOption( "Server", JSAP.STRING_PARSER, JSAP.REQUIRED, "Name of the server the request needs to be sent to." ) }
+	            )	;
+	        
+	            
+	        	config = jsap.parse(args);    
+	    		if( !config.success())
+	    		{
+	                System.err.println();
+	                System.err.println("Usage: java "
+	                                    + StressTest.class.getName());
+	                System.err.println("                "
+	                                    + jsap.getUsage());
+	                System.err.println();
+	                // show full help as well
+	                System.err.println(jsap.getHelp());	                
+	                System.err.println("**********Errors*************");
+	    		}
+	            if ( jsap.messagePrinted() ) return null;		
+	    		String hostName = FBUtilities.getHostName();
+	    		from_ = new EndPoint(hostName,10001);
+	    		MessagingService.getMessagingInstance().listen(from_, false);
+		}
+		catch ( Exception ex) 
+		{
+			logger_.debug(LogUtil.throwableToString(ex));
+		}
+        return config;
+	}
+	
+	void run( JSAPResult config ) throws Throwable
+	{
+		requestsPerSecond_ = config.getInt("tps");
+		int numThreads = requestsPerSecond_/1000 + 1;
+		if(config.getString("Server") != null)
+		{
+			server_ = config.getString("Server");
+			to_ = new EndPoint(config.getString("Server"), 7000);
+		}
+		runner_ = new DebuggableThreadPoolExecutor( numThreads,
+				numThreads,
+	            Integer.MAX_VALUE,
+	            TimeUnit.SECONDS,
+	            new LinkedBlockingQueue<Runnable>(),
+	            new ThreadFactoryImpl("MEMTABLE-FLUSHER-POOL")
+	            );  
+		if(config.getInt("mailboxstress") == 1)
+		{
+//			stressMailboxWrites();
+			return;
+		}
+		if(config.getInt("commitLogTest") == 1)
+		{
+			testCommitLog();
+			return;
+		}
+		if(config.getInt("thrift") == 0)
+		{
+			if(config.getInt("supercolumns") == 0)
+			{
+				if(config.getBoolean("reads"))
+				{
+					randomReadColumn(config.getInt("keys"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+					return;
+				}
+				if(config.getBoolean("bulkwrites"))
+				{
+					bulkWriteColumn(config.getInt("keys"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+					return;
+				}
+				if(config.getBoolean("writes"))
+				{
+					randomWriteColumn(config.getInt("keys"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+					return;
+				}
+			}
+			else
+			{
+				if(config.getBoolean("reads"))
+				{
+					randomReadSuperColumn(config.getInt("keys"), config.getInt("supercolumns"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+					return;
+				}
+				if(config.getBoolean("bulkwrites"))
+				{
+					bulkWriteSuperColumn(config.getInt("keys"), config.getInt("supercolumns"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+					return;
+				}
+				if(config.getBoolean("writes"))
+				{
+					randomWriteSuperColumn(config.getInt("keys"), config.getInt("supercolumns"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+					return;
+				}
+				
+			}
+		}
+		else
+		{
+			peerstorageClient_ = connect();
+			if(config.getInt("supercolumns") == 0)
+			{
+				if(config.getBoolean("reads"))
+				{
+					randomReadColumnThrift(config.getInt("keys"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+					return;
+				}
+				if(config.getBoolean("bulkwrites"))
+				{
+					bulkWriteColumnThrift(config.getInt("keys"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+					return;
+				}
+				if(config.getBoolean("writes"))
+				{
+					randomWriteColumnThrift(config.getInt("keys"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+					return;
+				}
+			}
+			else
+			{
+				if(config.getBoolean("reads"))
+				{
+					randomReadSuperColumnThrift(config.getInt("keys"), config.getInt("supercolumns"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+					return;
+				}
+				if(config.getBoolean("bulkwrites"))
+				{
+					bulkWriteSuperColumnThrift(config.getInt("keys"), config.getInt("supercolumns"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+					return;
+				}
+				if(config.getBoolean("writes"))
+				{
+					randomWriteSuperColumnThrift(config.getInt("keys"), config.getInt("supercolumns"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+					return;
+				}
+				
+			}
+			
+		}
+		System.out.println(" StressTest : Done !!!!!!");
+	}
+	
+	/**
+	 * @param args
+	 */
+	public static void main(String[] args) throws Throwable
+	{
+		LogUtil.init();
+  		StressTest stressTest = new StressTest();
+		JSAPResult config = stressTest.ParseArguments( args );
+		if( config == null ) System.exit(-1);
+		stressTest.run(config);
+	}
+
+}

Added: incubator/cassandra/src/org/apache/cassandra/test/TestChoice.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/test/TestChoice.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/test/TestChoice.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/test/TestChoice.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.EndPointSnitch;
+import org.apache.cassandra.locator.IEndPointSnitch;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+public class TestChoice
+{
+    private static final Logger logger_ = Logger.getLogger(TestChoice.class);
+    private Set<EndPoint> allNodes_;
+    private Map<EndPoint, List<EndPoint>> nodeToReplicaMap_ = new HashMap<EndPoint, List<EndPoint>>();
+    
+    public TestChoice(Set<EndPoint> allNodes)
+    {
+        allNodes_ = new HashSet<EndPoint>(allNodes);
+    }
+    
+    public void assignReplicas()
+    {
+        IEndPointSnitch snitch = new EndPointSnitch();
+        Set<EndPoint> allNodes = new HashSet<EndPoint>(allNodes_);
+        Map<EndPoint, Integer> nOccurences = new HashMap<EndPoint, Integer>();
+        
+        for ( EndPoint node : allNodes_ )
+        {
+            nOccurences.put(node, 1);
+        }
+        
+        for ( EndPoint node : allNodes_ )
+        {
+            allNodes.remove(node);
+            for ( EndPoint choice : allNodes )
+            {
+                List<EndPoint> replicasChosen = nodeToReplicaMap_.get(node);
+                if ( replicasChosen == null || replicasChosen.size() < DatabaseDescriptor.getReplicationFactor() - 1 )
+                {
+                    try
+                    {
+                        if ( !snitch.isInSameDataCenter(node, choice) )
+                        {
+                            if ( replicasChosen == null )
+                            {
+                                replicasChosen = new ArrayList<EndPoint>();
+                                nodeToReplicaMap_.put(node, replicasChosen);
+                            }
+                            int nOccurence = nOccurences.get(choice);
+                            if ( nOccurence < DatabaseDescriptor.getReplicationFactor() )
+                            {
+                                nOccurences.put(choice, ++nOccurence);
+                                replicasChosen.add(choice);
+                            }                            
+                        }
+                    }
+                    catch ( UnknownHostException ex )
+                    {
+                        ex.printStackTrace();
+                    }
+                }
+                else
+                {
+                    allNodes.add(node);
+                    break;
+                }
+            }
+        }
+        
+        
+        Set<EndPoint> nodes = nodeToReplicaMap_.keySet();
+        for ( EndPoint node : nodes )
+        {
+            List<EndPoint> replicas = nodeToReplicaMap_.get(node);
+            StringBuilder sb = new StringBuilder("");
+            for ( EndPoint replica : replicas )
+            {
+                sb.append(replica);
+                sb.append(", ");
+            }
+            System.out.println(node + " ---> " + sb.toString() );
+        }
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/test/TestRunner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/test/TestRunner.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/test/TestRunner.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/test/TestRunner.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.StandardWatchEventKind;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.concurrent.ContinuationContext;
+import org.apache.cassandra.concurrent.ContinuationStage;
+import org.apache.cassandra.concurrent.ContinuationsExecutor;
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.IStage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.mapreduce.SequentialScanner;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageDeliveryTask;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.commons.javaflow.Continuation;
+import org.apache.log4j.Logger;
+
+
+public class TestRunner
+{
+    private static EndPoint to_ = new EndPoint("tdsearch001.sf2p.facebook.com", 7000);
+    
+    private static void doWrite() throws Throwable
+    {
+
+        Table table = Table.open("Mailbox");  
+        Random random = new Random();
+        byte[] bytes = new byte[1024];
+        for (int i = 1001; i <= 1130; ++i)
+        {
+            String key = Integer.toString(i);
+            RowMutation rm = new RowMutation("Mailbox", key);
+            random.nextBytes(bytes);
+            for ( int j = 0; j < 1; ++j )
+            {
+                for ( int k = 0; k < 1; ++k )
+                {                    
+                    rm.add("MailboxMailData0:SuperColumn-" + j + ":Column-" + k, bytes, k);                    
+                }
+            }
+            rm.apply();
+        }
+        System.out.println("Write done");
+    }
+    
+    private static void doRead() throws Throwable
+    {
+        Table table = Table.open("Mailbox");
+        String key = "511055962";
+                
+        /*
+        List<String> list = new ArrayList<String>();
+        list.add("SuperColumn-0");
+        Row row = table.getRow(key, "MailboxMailList0", list);
+        System.out.println(row);
+        */
+        
+        ColumnFamily cf = table.get(key, "MailboxMailData0");
+        try
+        {
+            Collection<IColumn> columns = cf.getAllColumns();            
+            for ( IColumn column : columns )
+            {                
+                System.out.println(column.name());                
+                Collection<IColumn> subColumns = column.getSubColumns();
+                for ( IColumn subColumn : subColumns )
+                {
+                    System.out.println(subColumn);
+                }                            
+            }
+        }
+        catch ( Throwable th )
+        {
+            th.printStackTrace();
+        }
+    }
+    
+    private static void doDeletes()
+    {
+        
+    }
+    
+            
+    public static void main(String[] args) throws Throwable
+    {  
+        /*
+        String name = "/var/cassandra/test.dat";
+        FileInputStream f = new FileInputStream(name);
+        File file = new File("/var/cassandra");
+        Path path = file.toPath();
+        WatchService watcher = FileSystems.getDefault().newWatchService(); 
+        Thread thread = new Thread( new WatchKeyMonitor(watcher) );
+        thread.start();
+        
+        WatchKey wKey = path.register( watcher, StandardWatchEventKind.ENTRY_DELETE );          
+        file = new File(name);
+        file.delete();
+        
+        Thread.sleep(3000);
+        System.out.println("Closing the stream ...");
+        f.close();
+        */
+        
+        /*
+        LogUtil.init();
+        StorageService s = StorageService.instance();
+        s.start();    
+        doRead();
+        */
+        /*
+        FileOutputStream fos = new FileOutputStream("C:\\Engagements\\Test.dat", true);
+        SequentialScanner scanner = new SequentialScanner("Mailbox");            
+        int count = 0;
+        while ( scanner.hasNext() )
+        {
+            Row row = scanner.next();  
+            String value = row.key() + System.getProperty("line.separator");
+            fos.write( value.getBytes() );
+           
+            Map<String, ColumnFamily> cfs = row.getColumnFamilies();
+            Set<String> keys = cfs.keySet();
+            
+            for ( String key : keys )
+            {
+                System.out.println(row.getColumnFamily(key));
+            }           
+        }          
+        fos.close();
+        System.out.println("Done ...");
+        */
+        /*
+        ExecutorService es = new DebuggableThreadPoolExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("TEST"));
+        es.execute(new TestImpl());
+        */
+        /*
+        LogUtil.init();
+        StorageService s = StorageService.instance();
+        s.start();
+        */
+        /*
+        ReadMessage readMessage = new ReadMessage("Mailbox", args[1], "Test");
+        Message message = ReadMessage.makeReadMessage(readMessage);
+        Runnable task = new MessageDeliveryTask(message);
+               
+        ExecutorService es = new ContinuationsExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("TEST"));
+        int end = Integer.parseInt(args[0]);
+        for ( int i = 0; i < end; ++i )
+        {
+            es.execute(task);
+        }
+        */
+        
+        /*
+        if ( args[0].equals("S") )
+        {  
+            ExecutorService es = new ContinuationsExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() );               
+            es.execute( new Scanner() );
+        }
+        */
+        /*
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        String value = "Avinash Lakshman";
+        for ( int i = 0; i < 100; ++i )
+        {
+            bufOut.writeUTF(Integer.toString(i));
+            bufOut.writeInt(value.length());
+            bufOut.write(value.getBytes());                        
+        }
+        
+        DataInputBuffer bufIn = new DataInputBuffer();
+        bufIn.reset(bufOut.getData(), bufOut.getLength());
+        DataOutputBuffer buffer = new DataOutputBuffer();        
+        IFileWriter writer = SequenceFile.aioWriter("C:\\Engagements\\test.dat", 64*1024);
+        SortedMap<String, Integer> offsets = getOffsets(bufIn);
+        Set<String> keys = offsets.keySet();                                                      
+        for ( String key : keys )
+        {
+            bufIn.setPosition(offsets.get(key));
+            buffer.reset();
+            buffer.write(bufIn, bufIn.readInt());
+            writer.append(key, buffer);            
+        }
+        writer.close();
+        */
+    }
+}
+
+@Suspendable
+class Scanner implements Runnable
+{   
+    private static final Logger logger_ = Logger.getLogger(Scanner.class);
+    
+    public void run()
+    {        
+        try
+        {            
+            SequentialScanner scanner = new SequentialScanner("Mailbox");            
+            
+            while ( scanner.hasNext() )
+            {
+                Row row = scanner.next();    
+                logger_.debug(row.key());
+            }            
+        }
+        catch ( IOException ex )
+        {
+            ex.printStackTrace();
+        }        
+    }
+}
+
+class Test 
+{
+    public static void goo()
+    {
+        System.out.println("I am goo()");
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/test/UtilsTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/test/UtilsTest.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/test/UtilsTest.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/test/UtilsTest.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,49 @@
+package org.apache.cassandra.test;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FastHashMap;
+import org.apache.cassandra.utils.GuidGenerator;
+
+public class UtilsTest 
+{
+	private static void doHashPerf() throws Throwable
+	{
+		List<BigInteger> list = new ArrayList<BigInteger>();
+		for ( int i = 0; i < 100; ++i )
+		{
+			String guid = GuidGenerator.guid();
+			list.add( FBUtilities.hash(guid) );
+		}
+		Collections.sort(list);
+		
+		int startValue = 1000000;
+		
+		while ( true )
+		{
+			long start = System.currentTimeMillis();
+			for ( int i = 0; i < 1024; ++i )
+			{
+				String key = Integer.toString(startValue + i);
+				BigInteger hash = FBUtilities.hash(key);
+				Collections.binarySearch(list, hash);
+			}
+			System.out.println("TIME TAKEN: " + (System.currentTimeMillis() - start));
+			Thread.sleep(100);
+		}
+	}
+	
+	public static void main(String[] args) throws Throwable
+	{
+	}
+}

Added: incubator/cassandra/src/org/apache/cassandra/tools/AdminTool.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/tools/AdminTool.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/tools/AdminTool.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/tools/AdminTool.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BasicUtilities;
+import org.apache.cassandra.utils.LogUtil;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class AdminTool
+{
+
+	String server_ = null;
+	String tableName_ = "Mailbox";
+	String key_ = "Random";
+	String cf1_ = "MailboxThreadList0";
+	String cf2_ = "MailboxUserList0";
+	String cf3_ = "MailboxMailList0";
+	String cf4_ = "MailboxMailData0";
+//	String cf5_ = "MailboxUserList";
+	public static EndPoint from_ = new EndPoint("hadoop071.sf2p.facebook.com", 10001);
+	private static final String[] servers_ =
+	{
+		"insearch001.sf2p.facebook.com",
+		"insearch002.sf2p.facebook.com",
+		"insearch003.sf2p.facebook.com",
+		"insearch004.sf2p.facebook.com",
+		"insearch005.sf2p.facebook.com",
+		"insearch016.sf2p.facebook.com",
+		"insearch007.sf2p.facebook.com",
+		"insearch008.sf2p.facebook.com",
+		"insearch009.sf2p.facebook.com",
+		"insearch010.sf2p.facebook.com",
+		"insearch011.sf2p.facebook.com",
+		"insearch012.sf2p.facebook.com",
+		"insearch013.sf2p.facebook.com",
+		"insearch014.sf2p.facebook.com",
+		"insearch015.sf2p.facebook.com",
+		"insearch016.sf2p.facebook.com",
+		"insearch017.sf2p.facebook.com",
+		"insearch018.sf2p.facebook.com",
+		"insearch019.sf2p.facebook.com",
+		"insearch020.sf2p.facebook.com",
+		"insearch021.sf2p.facebook.com",
+		"insearch022.sf2p.facebook.com",
+		"insearch023.sf2p.facebook.com",
+		"insearch024.sf2p.facebook.com",
+		"insearch025.sf2p.facebook.com",
+		"insearch026.sf2p.facebook.com",
+		"insearch027.sf2p.facebook.com",
+		"insearch028.sf2p.facebook.com",
+		"insearch029.sf2p.facebook.com",
+		"insearch030.sf2p.facebook.com",
+		"insearch031.sf2p.facebook.com",
+		"insearch032.sf2p.facebook.com",
+		"insearch033.sf2p.facebook.com",
+		"insearch034.sf2p.facebook.com",
+		"insearch035.sf2p.facebook.com",
+		"insearch036.sf2p.facebook.com",
+		"insearch037.sf2p.facebook.com",
+		"insearch038.sf2p.facebook.com",
+		"insearch039.sf2p.facebook.com",
+		"insearch040.sf2p.facebook.com",
+
+		"insearch001.ash1.facebook.com",
+		"insearch002.ash1.facebook.com",
+		"insearch003.ash1.facebook.com",
+		"insearch004.ash1.facebook.com",
+		"insearch005.ash1.facebook.com",
+		"insearch016.ash1.facebook.com",
+		"insearch007.ash1.facebook.com",
+		"insearch008.ash1.facebook.com",
+		"insearch009.ash1.facebook.com",
+		"insearch010.ash1.facebook.com",
+		"insearch011.ash1.facebook.com",
+		"insearch012.ash1.facebook.com",
+		"insearch013.ash1.facebook.com",
+		"insearch014.ash1.facebook.com",
+		"insearch015.ash1.facebook.com",
+		"insearch016.ash1.facebook.com",
+		"insearch017.ash1.facebook.com",
+		"insearch018.ash1.facebook.com",
+		"insearch019.ash1.facebook.com",
+		"insearch020.ash1.facebook.com",
+		"insearch021.ash1.facebook.com",
+		"insearch022.ash1.facebook.com",
+		"insearch023.ash1.facebook.com",
+		"insearch024.ash1.facebook.com",
+		"insearch025.ash1.facebook.com",
+		"insearch026.ash1.facebook.com",
+		"insearch027.ash1.facebook.com",
+		"insearch028.ash1.facebook.com",
+		"insearch029.ash1.facebook.com",
+		"insearch030.ash1.facebook.com",
+		"insearch031.ash1.facebook.com",
+		"insearch032.ash1.facebook.com",
+		"insearch033.ash1.facebook.com",
+		"insearch034.ash1.facebook.com",
+		"insearch035.ash1.facebook.com",
+		"insearch036.ash1.facebook.com",
+		"insearch037.ash1.facebook.com",
+		"insearch038.ash1.facebook.com",
+		"insearch039.ash1.facebook.com",
+		"insearch040.ash1.facebook.com",
+	};
+
+	AdminTool()
+	{
+		server_ = null;
+	}
+
+	AdminTool(String server)
+	{
+		server_ = server;
+	}
+
+	public void run(int operation, String columnFamilyName, long skip) throws Throwable
+	{
+        byte[] bytes =  BasicUtilities.longToByteArray( skip );
+        RowMutation rm = new RowMutation(tableName_, key_);
+        if( columnFamilyName == null )
+        {
+			rm.add(Table.recycleBin_ + ":" + cf1_, bytes, operation);
+			rm.add(Table.recycleBin_ + ":" + cf2_, bytes, operation);
+			rm.add(Table.recycleBin_ + ":" + cf3_, bytes, operation);
+			rm.add(Table.recycleBin_ + ":" + cf4_, bytes, operation);
+//			rm.add(Table.recycleBin_ + ":" + cf5_, bytes, operation);
+        }
+        else
+        {
+			rm.add(Table.recycleBin_ + ":" + columnFamilyName, bytes, operation);
+        }
+		RowMutationMessage rmMsg = new RowMutationMessage(rm);
+        if( server_ != null)
+        {
+            Message message = RowMutationMessage.makeRowMutationMessage(rmMsg, StorageService.binaryVerbHandler_);
+	        EndPoint to = new EndPoint(server_, 7000);
+			MessagingService.getMessagingInstance().sendOneWay(message, to);
+        }
+        else
+        {
+        	for( String server : servers_ )
+        	{
+                Message message = RowMutationMessage.makeRowMutationMessage(rmMsg, StorageService.binaryVerbHandler_);
+		        EndPoint to = new EndPoint(server, 7000);
+				MessagingService.getMessagingInstance().sendOneWay(message, to);
+        	}
+        }
+	}
+
+
+	/**
+	 * @param args
+	 */
+	public static void main(String[] args) throws Throwable
+	{
+		LogUtil.init();
+		AdminTool postLoad = null;
+		int operation = 1;
+		String columnFamilyName = null;
+		long skip = 0L;
+		if(args.length < 1 )
+		{
+			System.out.println("Usage: PostLoad <serverName>  < operation 1- flushBinary 2 - compactions 3- flush> <ColumnFamilyName> <skip factor for compactions> or  PostLoad <-all> <operation> <ColumnFamilyName> <skip factor for compactions>");
+		}
+		if(args[0].equals("-all"))
+		{
+			 postLoad = new AdminTool();
+		}
+		else
+		{
+			 postLoad = new AdminTool(args[0]);
+		}
+		if(args.length > 1 )
+			operation = Integer.parseInt(args[1]);
+		if(args.length > 2 )
+			columnFamilyName = args[2];
+		if(args.length > 3 )
+			skip = Long.parseLong(args[3]);
+		postLoad.run(operation, columnFamilyName, skip);
+
+		Thread.sleep(10000);
+		System.out.println("Exiting app...");
+		System.exit(0);
+	}
+
+}

Added: incubator/cassandra/src/org/apache/cassandra/tools/ClusterTool.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/tools/ClusterTool.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/tools/ClusterTool.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/tools/ClusterTool.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,107 @@
+package org.apache.cassandra.tools;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.InetAddress;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+
+
+public class ClusterTool
+{
+	public static final String SET_TOKEN = "settoken";
+	public static final String HASH_KEY = "hash";
+	public static final String BUILD_INDEX = "build_index";
+	public static final String READ_TEST = "read_test";
+	public static final String WRITE_TEST = "write_test";
+
+    public static void applyToken(String serverName, BigInteger token) throws IOException
+    {
+        try
+        {
+        	EndPoint from = new EndPoint(InetAddress.getLocalHost().getHostName(), 7000);
+        	System.out.println("Updating token of server " + serverName + " with token " + token);
+            Message message = new Message(from, "", StorageService.tokenVerbHandler_, new Object[]{ token.toByteArray() });
+            EndPoint ep = new EndPoint(serverName, 7000);
+        	MessagingService.getMessagingInstance().sendOneWay(message, ep);
+        	Thread.sleep(1000);
+	    	System.out.println("Successfully calibrated " + serverName);
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace(System.out);
+        }
+    }
+
+    public static void printUsage()
+    {
+		System.out.println("Usage: java -jar <cassandra-tools.jar> <command> <options>");
+		System.out.println("Commands:");
+		System.out.println("\t" + SET_TOKEN + " <server> <token>");
+		System.out.println("\t" + HASH_KEY + " <key>");
+		System.out.println("\t" + BUILD_INDEX + "  <full path to the data file>");
+		System.out.println("\t" + READ_TEST + " <number of threads> <requests per sec per thread> <machine(s) to read (':' separated list)>");
+		System.out.println("\t" + WRITE_TEST + " <number of threads> <requests per sec per thread> <machine(s) to write (':' separated list)>");
+    }
+
+    public static void main(String[] args) throws Exception
+    {
+    	if(args.length < 2)
+    	{
+    		printUsage();
+    		return;
+    	}
+
+    	int argc = 0;
+    	try
+    	{
+    		/* set the token for a particular node in the Cassandra cluster */
+	    	if(SET_TOKEN.equals(args[argc]))
+	    	{
+		    	String serverName = args[argc + 1];
+		    	BigInteger token = new BigInteger(args[argc + 2]);
+		    	//System.out.println("Calibrating " + serverName + " with token " + token);
+		    	applyToken(serverName, token);
+	    	}
+	    	/* Print the hash of a given key */
+	    	else if(HASH_KEY.equals(args[argc]))
+	    	{
+	    		System.out.println("Hash = [" + StorageService.hash(args[argc + 1]) + "]");
+	    	}
+	    	/* build indexes given the data file */
+	    	else if(BUILD_INDEX.equals(args[argc]))
+	    	{
+	    		IndexBuilder.main(args);
+	    	}
+	    	/* test reads */
+	    	else if(READ_TEST.equals(args[argc]))
+	    	{
+	    		System.out.println("Testing reads...");
+	    		int numThreads = Integer.parseInt(args[argc + 1]);
+				int rpsPerThread = Integer.parseInt(args[argc + 2]);
+				String machinesToRead = args[argc + 3];
+//				ReadTest.runReadTest(numThreads, rpsPerThread, machinesToRead);
+	    	}
+	    	/* test writes */
+	    	else if(WRITE_TEST.equals(args[argc]))
+	    	{
+	    		System.out.println("Testing writes...");
+	    		int numThreads = Integer.parseInt(args[argc + 1]);
+				int rpsPerThread = Integer.parseInt(args[argc + 2]);
+				String machinesToWrite = args[argc + 3];
+//				WriteTest.runWriteTest(numThreads, rpsPerThread, machinesToWrite);
+	    	}
+    	} catch(Exception e)
+    	{
+    		System.err.println("Exception " + e.getMessage());
+			e.printStackTrace(System.err);
+    		printUsage();
+    	}
+
+    	System.exit(0);
+    }
+
+}

Added: incubator/cassandra/src/org/apache/cassandra/tools/FileSizeTokenGenerator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/tools/FileSizeTokenGenerator.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/tools/FileSizeTokenGenerator.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/tools/FileSizeTokenGenerator.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
+
+public class FileSizeTokenGenerator
+{
+    private static Logger logger_ = Logger.getLogger(IndexBuilder.class);
+    
+    public static void main(String[] args)
+    {
+        if ( args.length != 4 )
+        {
+            System.out.println("Usage : java com.facebook.infrastructure.tools.IndexBuilder <full path to the data file> < split factor>");
+            System.exit(1);
+        }
+        
+        try
+        {
+            int splitCount = Integer.parseInt(args[3]);
+            BigInteger l = new BigInteger(args[1]);
+            BigInteger h = new BigInteger(args[2]);
+            long totalSize = getTotalSize(args[0], l, h);
+        	System.out.println(" Total Size :  " + totalSize);
+            BigInteger[] tokens = generateTokens(args[0], l, h, totalSize, splitCount);
+            int i = 0 ;
+            for( BigInteger token : tokens)
+            {
+            	System.out.println(i++ + " th Token " + token);
+            }
+        }
+        catch( Throwable th )
+        {
+            logger_.warn(LogUtil.throwableToString(th));
+        }
+    }
+
+    private static long  getTotalSize(String dataFile, BigInteger l , BigInteger h) throws IOException
+    {
+        final int bufferSize = 64*1024;
+        
+        IFileReader dataReader = SequenceFile.bufferedReader(dataFile, bufferSize);
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        DataInputBuffer bufIn = new DataInputBuffer();
+        long totalSize = 0;
+        try
+        {                                            
+            while ( !dataReader.isEOF() )
+            {                
+                bufOut.reset();                
+                /* Record the position of the key. */
+                dataReader.next(bufOut);
+                bufIn.reset(bufOut.getData(), bufOut.getLength());
+                /* Key just read */
+                String key = bufIn.readUTF();    
+                if ( !key.equals(SSTable.blockIndexKey_) && l.compareTo(StorageService.hash(key)) < 0 && h.compareTo(StorageService.hash(key)) > 0 )
+                {                                        
+                    int sz = bufIn.readInt();
+                	byte[] keyData = new byte[sz];
+                	bufIn.read(keyData, 0, sz);
+                    totalSize= totalSize + sz;
+                }
+            }
+        }
+        finally
+        {
+            dataReader.close();
+        }
+        return totalSize;
+    }
+    
+    
+    private static BigInteger[] generateTokens(String dataFile,BigInteger l , BigInteger h, long totalSize, int splitCount) throws IOException
+    {
+        final int bufferSize = 64*1024;
+        
+        IFileReader dataReader = SequenceFile.bufferedReader(dataFile, bufferSize);
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        DataInputBuffer bufIn = new DataInputBuffer();
+        long splitFactor = totalSize/(splitCount+1);
+        long curSize = 0;
+        BigInteger[] tokens = new BigInteger[splitCount];
+        int k = 0 ;
+        try
+        {                                            
+            while ( !dataReader.isEOF())
+            {                
+                bufOut.reset();                
+                /* Record the position of the key. */
+                dataReader.next(bufOut);
+                bufIn.reset(bufOut.getData(), bufOut.getLength());
+                /* Key just read */
+                String key = bufIn.readUTF();       
+                if ( !key.equals(SSTable.blockIndexKey_) && l.compareTo(StorageService.hash(key)) < 0 && h.compareTo(StorageService.hash(key)) > 0 )
+                {                                        
+                        int sz = bufIn.readInt();
+                        curSize = curSize + sz;
+                    	byte[] keyData = new byte[sz];
+                    	bufIn.read(keyData, 0, sz);
+                        
+                        if( curSize > splitFactor)
+                        {
+                        	tokens[k++] = StorageService.hash(key);
+                        	curSize = 0 ;
+                        	if( k == splitCount)
+                        	{
+                        		break;
+                        	}
+                        }
+                    }
+                }
+        }
+        finally
+        {
+            dataReader.close();
+        }
+        return tokens;
+    }
+        
+}

Added: incubator/cassandra/src/org/apache/cassandra/tools/IndexBuilder.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/tools/IndexBuilder.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/tools/IndexBuilder.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/tools/IndexBuilder.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.utils.BasicUtilities;
+import org.apache.cassandra.utils.BloomFilter;
+
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
+
+public class IndexBuilder
+{
+    private static final int bufferSize_ = 64*1024;
+
+    public static void main(String[] args)
+    {
+        if ( args.length != 1 )
+        {
+            System.out.println("Usage : java com.facebook.infrastructure.tools.IndexBuilder <full path to the data file>");
+            System.exit(1);
+        }
+
+        try
+        {
+	        int blockCount = getBlockCount(args[0]);
+	        System.out.println("Number of keys in the data file : " + (blockCount + 1)*SSTable.indexInterval());
+	        buildIndex(args[0], blockCount);
+        }
+        catch(Throwable t)
+        {
+        	System.err.println("Exception: " + t.getMessage());
+        	t.printStackTrace(System.err);
+        }
+    }
+
+    private static int getBlockCount(String dataFile) throws IOException
+    {
+        IFileReader dataReader = SequenceFile.bufferedReader(dataFile, bufferSize_);
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        DataInputBuffer bufIn = new DataInputBuffer();
+        int blockCount = 0;
+
+        try
+        {
+            while ( !dataReader.isEOF() )
+            {
+                bufOut.reset();
+                dataReader.next(bufOut);
+                bufIn.reset(bufOut.getData(), bufOut.getLength());
+                /* Key just read */
+                String key = bufIn.readUTF();
+                if ( key.equals(SSTable.blockIndexKey_) )
+                {
+                    ++blockCount;
+                }
+            }
+        }
+        finally
+        {
+            dataReader.close();
+        }
+        return blockCount;
+    }
+
+    private static void buildIndex(String dataFile, int blockCount) throws IOException
+    {
+        String indexFile = dataFile.replace("-Data.", "-Index.");
+        final int bufferSize = 64*1024;
+
+        IFileWriter indexWriter = SequenceFile.bufferedWriter(indexFile, bufferSize);
+        IFileReader dataReader = SequenceFile.bufferedReader(dataFile, bufferSize);
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        DataInputBuffer bufIn = new DataInputBuffer();
+        /* BloomFilter of all data in the data file */
+        BloomFilter bf = new BloomFilter((SSTable.indexInterval() + 1)*blockCount, 8);
+
+        try
+        {
+            while ( !dataReader.isEOF() )
+            {
+                bufOut.reset();
+                /* Record the position of the key. */
+                long blockIndexOffset = dataReader.getCurrentPosition();
+                dataReader.next(bufOut);
+                bufIn.reset(bufOut.getData(), bufOut.getLength());
+                /* Key just read */
+                String key = bufIn.readUTF();
+                if ( key.equals(SSTable.blockIndexKey_) )
+                {
+                    /* Ignore the size of the data associated with the block index */
+                    bufIn.readInt();
+                    /* Number of keys in the block. */
+                    int blockSize = bufIn.readInt();
+                    /* Largest key in the block */
+                    String largestKey = null;
+
+                    /*
+                     * Read the keys in this block and find the largest key in
+                     * this block. This is the key that gets written into the
+                     * index file.
+                    */
+                    for ( int i = 0; i < blockSize; ++i )
+                    {
+                        String currentKey = bufIn.readUTF();
+                        bf.fill(currentKey);
+                        if ( largestKey == null )
+                        {
+                            largestKey = currentKey;
+                        }
+                        else
+                        {
+                            if ( currentKey.compareTo(largestKey) > 0 )
+                            {
+                                /* record this key */
+                                largestKey = currentKey;
+                            }
+                        }
+                        /* read the position of the key and the size of key data and throws it away. */
+                        bufIn.readLong();
+                        bufIn.readLong();
+                    }
+
+                    /*
+                     * Write into the index file the largest key in the block
+                     * and the offset of the block index in the data file.
+                    */
+                    indexWriter.append(largestKey, BasicUtilities.longToByteArray(blockIndexOffset));
+                }
+            }
+        }
+        finally
+        {
+            dataReader.close();
+            /* Cache the bloom filter */
+            SSTable.storeBloomFilter(dataFile, bf);
+            /* Write the bloom filter into the index file */
+            bufOut.reset();
+            BloomFilter.serializer().serialize(bf, bufOut);
+            byte[] bytes = new byte[bufOut.getLength()];
+            System.arraycopy(bufOut.getData(), 0, bytes, 0, bytes.length);
+            indexWriter.close(bytes, bytes.length);
+            bufOut.close();
+        }
+    }
+
+}

Added: incubator/cassandra/src/org/apache/cassandra/tools/KeyChecker.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/tools/KeyChecker.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/tools/KeyChecker.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/tools/KeyChecker.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.io.RandomAccessFile;
+
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LogUtil;
+
+
+public class KeyChecker
+{
+    private static final int bufSize_ = 128*1024*1024;
+    /*
+     * This function checks if the local storage endpoint 
+     * is reponsible for storing this key .
+     */
+    private static boolean checkIfProcessKey(String key)
+    {
+        EndPoint[] endPoints = StorageService.instance().getNStorageEndPoint(key);
+        EndPoint localEndPoint = StorageService.getLocalStorageEndPoint();
+        for(EndPoint endPoint : endPoints)
+        {
+            if(endPoint.equals(localEndPoint))
+                return true;
+        }
+        return false;
+    }
+    
+    public static void main(String[] args) throws Throwable
+    {
+        if ( args.length != 1 )
+        {
+            System.out.println("Usage : java com.facebook.infrastructure.tools.KeyChecker <file containing all keys>");
+            System.exit(1);
+        }
+        
+        LogUtil.init();
+        StorageService s = StorageService.instance();
+        s.start();
+        
+        /* Sleep for proper discovery */
+        Thread.sleep(240000);
+        /* Create the file for the missing keys */
+        RandomAccessFile raf = new RandomAccessFile( "Missing-" + FBUtilities.getHostName() + ".dat", "rw");
+        
+        /* Start reading the file that contains the keys */
+        BufferedReader bufReader = new BufferedReader( new InputStreamReader( new FileInputStream(args[0]) ), KeyChecker.bufSize_ );
+        String key = null;
+        boolean bStarted = false;
+        
+        while ( ( key = bufReader.readLine() ) != null )
+        {            
+            if ( !bStarted )
+            {
+                bStarted = true;
+                System.out.println("Started the processing of the file ...");
+            }
+            
+            key = key.trim();
+            if ( StorageService.instance().isPrimary(key) )
+            {
+                System.out.println("Processing key " + key);
+                Row row = Table.open("Mailbox").getRow(key, "MailboxMailList0");
+                if ( row.isEmpty() )
+                {
+                    System.out.println("MISSING KEY : " + key);
+                    raf.write(key.getBytes());
+                    raf.write(System.getProperty("line.separator").getBytes());
+                }
+            }
+        }
+        System.out.println("DONE checking keys ...");
+        raf.close();
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/tools/KeyExtracter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/tools/KeyExtracter.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/tools/KeyExtracter.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/tools/KeyExtracter.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.DataOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.io.SSTable.KeyPositionInfo;
+import org.apache.cassandra.utils.BasicUtilities;
+
+
+public class KeyExtracter
+{
+    private static final int bufferSize_ = 64*1024;
+
+    public static void main(String[] args) throws Throwable
+    {
+        if ( args.length != 3 )
+        {
+            System.out.println("Usage : java com.facebook.infrastructure.tools.IndexBuilder <key to extract> <data file> <output file>");
+            System.exit(1);
+        }
+		String keyToExtract = args[0];
+		String dataFile = args[1];
+		String outputFile = args[2];
+
+        extractKeyIntoFile(keyToExtract, dataFile, outputFile);
+    }
+
+    public static boolean extractKeyIntoFile(String keyToExtract, String dataFile, String outputFile) throws IOException
+    {
+		IFileReader dataReader = SequenceFile.bufferedReader(dataFile, bufferSize_);
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        DataInputBuffer bufIn = new DataInputBuffer();
+
+    	try
+    	{
+            while ( !dataReader.isEOF() )
+            {
+                bufOut.reset();
+                dataReader.next(bufOut);
+                bufIn.reset(bufOut.getData(), bufOut.getLength());
+                /* Key just read */
+                String key = bufIn.readUTF();
+                /* check if we want this key */
+                if ( key.equals(keyToExtract) )
+                {
+                	int keySize = bufIn.readInt();
+                	byte[] keyData = new byte[keySize];
+                	bufIn.read(keyData, 0, keySize);
+
+                	/* write the key data into a file */
+                    RandomAccessFile raf = new RandomAccessFile(outputFile, "rw");                	
+                	raf.writeUTF(key);
+                	raf.writeInt(keySize);
+                	raf.write(keyData);
+                    dumpBlockIndex(keyToExtract, 0L, keySize, raf);
+                    raf.close();
+                    return true;
+                }
+            }
+        }
+        finally
+        {
+            dataReader.close();
+        }
+
+        return false;
+    }
+    
+    private static void dumpBlockIndex(String key, long position, long size, RandomAccessFile raf) throws IOException
+    {
+        DataOutputBuffer bufOut = new DataOutputBuffer();                       
+        /* Number of keys in this block */
+        bufOut.writeInt(1);
+        bufOut.writeUTF(key);
+        bufOut.writeLong(position);
+        bufOut.writeLong(size);
+        
+        /* Write out the block index. */
+        raf.writeUTF(SSTable.blockIndexKey_);
+        raf.writeInt(bufOut.getLength());
+        raf.write(bufOut.getData(), 0, bufOut.getLength());
+    }
+}