You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC

svn commit: r749218 [22/34] - in /incubator/cassandra: branches/ dist/ nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/ trunk/src/org/apache/ trunk/src/org/apache/cassandra/ trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...

Added: incubator/cassandra/trunk/src/org/apache/cassandra/loader/KeyType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/loader/KeyType.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/loader/KeyType.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/loader/KeyType.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,125 @@
+//
+// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, vJAXB 2.1.3 in JDK 1.6 
+// See <a href="http://java.sun.com/xml/jaxb">http://java.sun.com/xml/jaxb</a> 
+// Any modifications to this file will be lost upon recompilation of the source schema. 
+// Generated on: 2007.10.19 at 01:36:41 PM PDT 
+//
+
+
+package org.apache.cassandra.loader;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlType;
+
+
+/**
+ * <p>Java class for KeyType complex type.
+ * 
+ * <p>The following schema fragment specifies the expected content contained within this class.
+ * 
+ * <pre>
+ * &lt;complexType name="KeyType">
+ *   &lt;complexContent>
+ *     &lt;restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
+ *       &lt;sequence>
+ *         &lt;element name="OptimizeIt" type="{http://www.w3.org/2001/XMLSchema}boolean"/>
+ *         &lt;element name="Combiner" type="{http://www.w3.org/2001/XMLSchema}string"/>
+ *         &lt;element name="Fields" type="{}FieldCollection"/>
+ *       &lt;/sequence>
+ *     &lt;/restriction>
+ *   &lt;/complexContent>
+ * &lt;/complexType>
+ * </pre>
+ * 
+ * 
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(name = "KeyType", propOrder = {
+    "optimizeIt",
+    "combiner",
+    "fields"
+})
+public class KeyType {
+
+    @XmlElement(name = "OptimizeIt", required = true, type = Boolean.class, nillable = true)
+    protected Boolean optimizeIt;
+    @XmlElement(name = "Combiner", required = true)
+    protected String combiner;
+    @XmlElement(name = "Fields", required = true)
+    protected FieldCollection fields;
+
+    /**
+     * Gets the value of the optimizeIt property.
+     * 
+     * @return
+     *     possible object is
+     *     {@link Boolean }
+     *     
+     */
+    public Boolean isOptimizeIt() {
+        return optimizeIt;
+    }
+
+    /**
+     * Sets the value of the optimizeIt property.
+     * 
+     * @param value
+     *     allowed object is
+     *     {@link Boolean }
+     *     
+     */
+    public void setOptimizeIt(Boolean value) {
+        this.optimizeIt = value;
+    }
+
+    /**
+     * Gets the value of the combiner property.
+     * 
+     * @return
+     *     possible object is
+     *     {@link String }
+     *     
+     */
+    public String getCombiner() {
+        return combiner;
+    }
+
+    /**
+     * Sets the value of the combiner property.
+     * 
+     * @param value
+     *     allowed object is
+     *     {@link String }
+     *     
+     */
+    public void setCombiner(String value) {
+        this.combiner = value;
+    }
+
+    /**
+     * Gets the value of the fields property.
+     * 
+     * @return
+     *     possible object is
+     *     {@link FieldCollection }
+     *     
+     */
+    public FieldCollection getFields() {
+        return fields;
+    }
+
+    /**
+     * Sets the value of the fields property.
+     * 
+     * @param value
+     *     allowed object is
+     *     {@link FieldCollection }
+     *     
+     */
+    public void setFields(FieldCollection value) {
+        this.fields = value;
+    }
+
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/loader/Loader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/loader/Loader.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/loader/Loader.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/loader/Loader.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,356 @@
+/**
+ * 
+ */
+package org.apache.cassandra.loader;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.locator.EndPointSnitch;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.Token;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.cassandra.utils.*;
+
+
+/**
+ * This class is used to load the storage endpoints with the relevant data
+ * The data should be both what they are responsible for and what should be replicated on the specific
+ * endpoints.
+ * Population is done based on a xml file which should adhere to a schema.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class Loader
+{
+	private static long siesta_ = 60*1000;
+    private static Logger logger_ = Logger.getLogger( Loader.class );
+	private Importer importer_;
+    private StorageService storageService_;
+    
+    public Loader(StorageService storageService)
+    {
+        storageService_ = storageService;
+    }
+    
+    /*
+     * This method loads all the keys into a special column family 
+     * called "RecycleBin". This column family is used for temporary
+     * processing of data and then can be recycled. The idea is that 
+     * after the load is complete we have all the keys in the system.
+     * Now we force a compaction and examine the single Index file 
+     * that is generated to determine how the nodes need to relocate
+     * to be perfectly load balanced.
+     * 
+     *  param @ rootDirectory - rootDirectory at which the parsing begins.
+     *  param @ table - table that will be populated.
+     *  param @ cfName - name of the column that will be populated. This is 
+     *  passed in so that we do not unncessary allocate temporary String objects.
+    */
+    private void preParse(File rootDirectory, String table, String cfName) throws Throwable
+    {        
+        File[] files = rootDirectory.listFiles();
+        
+        for ( File file : files )
+        {
+            if ( file.isDirectory() )
+                preParse(file, table, cfName);
+            else
+            {
+                String fileName = file.getName();
+                RowMutation rm = new RowMutation(table, fileName);
+                rm.add(cfName, fileName.getBytes(), 0);
+                rm.apply();
+            }
+        }
+    }
+    
+    /*
+     * Merges a list of strings with a particular combiner.
+     */
+    String merge( List<String> listFields,  String combiner)
+    {
+    	if(listFields.size() == 0 )
+    		return null;
+    	if(listFields.size() == 1)
+    		return listFields.get(0);
+    	String mergedKey = null;
+    	for(String field: listFields)
+    	{
+    		if(mergedKey == null)
+    		{
+    			mergedKey = field;
+    		}
+    		else
+    		{
+    			mergedKey = mergedKey + combiner + field;
+    		}
+    	}
+    	return mergedKey;
+    	
+    }
+    
+    /*
+     * This function checks if the local storage endpoint 
+     * is reponsible for storing this key .
+     */
+    boolean checkIfProcessKey(String key)
+    {
+		EndPoint[] endPoints = storageService_.getNStorageEndPoint(key);
+    	EndPoint localEndPoint = StorageService.getLocalStorageEndPoint();
+    	for(EndPoint endPoint : endPoints)
+    	{
+    		if(endPoint.equals(localEndPoint))
+    			return true;
+    	}
+    	return false;
+    }
+    
+   /*
+    * This functions parses each file based on delimiters specified in the 
+    * xml file. It also looks at all the parameters specified in teh xml and based
+    * on that populates the internal Row structure.
+    */ 
+    void parse(String filepath) throws Throwable
+    {
+        BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+                new FileInputStream(filepath)), 16 * 1024 * 1024);
+        String line = null;
+        String delimiter_ = new String(",");
+        RowMutation rm = null;
+        Map<String, RowMutation> rms = new HashMap<String, RowMutation>();
+        if(importer_.columnFamily.delimiter != null)
+        {
+        	delimiter_ = importer_.columnFamily.delimiter;
+        }
+        while ((line = bufReader.readLine()) != null)
+        {
+            StringTokenizer st = new StringTokenizer(line, delimiter_);
+            List<String> tokenList = new ArrayList<String>();
+            String key = null;
+            while (st.hasMoreElements())
+            {
+            	tokenList.add((String)st.nextElement());
+            }
+            /* Construct the Key */
+            List<String> keyFields = new ArrayList<String> ();
+            for(int fieldId: importer_.key.fields.field)
+            {
+            	keyFields.add(tokenList.get(fieldId));
+            }
+            key = merge(keyFields, importer_.key.combiner);
+            if(importer_.key.optimizeIt != null && !importer_.key.optimizeIt)
+            {
+	            if(!checkIfProcessKey(key))
+	            {
+	            	continue;
+	            }
+            }
+            rm = rms.get(key);
+            if( rm == null)
+            {
+            	rm = new RowMutation(importer_.table, key);
+            	rms.put(key, rm);
+            }
+            if(importer_.columnFamily.superColumn != null)
+            {
+            	List<String> superColumnList = new ArrayList<String>();
+            	for(int fieldId : importer_.columnFamily.superColumn.fields.field)
+            	{
+            		superColumnList.add(tokenList.get(fieldId));
+            	}
+            	String superColumnName = merge(superColumnList, " ");
+            	superColumnList.clear();
+            	if(importer_.columnFamily.superColumn.tokenize)
+            	{
+            	    Analyzer analyzer = new StandardAnalyzer();
+            	    TokenStream ts = analyzer.tokenStream("superColumn", new StringReader(superColumnName));
+            	    Token token = null;
+            	    token = ts.next();
+            	    while(token != null)
+            	    {
+            	    	superColumnList.add(token.termText());
+                	    token = ts.next();
+            	    }
+            	}
+            	else
+            	{
+            		superColumnList.add(superColumnName);
+            	}
+            	for(String sName : superColumnList)
+            	{
+            		String cfName = importer_.columnFamily.name + ":" + sName;
+    	            if(importer_.columnFamily.column != null)
+    	            {
+    	            	for(ColumnType column : importer_.columnFamily.column )
+    	            	{
+    	            		String cfColumn = cfName +":" + (column.name == null ? tokenList.get(column.field):column.name);
+    	            		rm.add(cfColumn, tokenList.get(column.value.field).getBytes(), Integer.parseInt(tokenList.get(column.timestamp.field)));
+    	            	}
+    	            }
+            		
+            	}
+            	
+            }
+            else
+            {
+	            if(importer_.columnFamily.column != null)
+	            {
+	            	for(ColumnType column : importer_.columnFamily.column )
+	            	{
+	            		String cfColumn = importer_.columnFamily.name +":" + (column.name == null ? tokenList.get(column.field):column.name);
+	            		rm.add(cfColumn, tokenList.get(column.value.field).getBytes(), Integer.parseInt(tokenList.get(column.timestamp.field)));
+	            	}
+	            }
+            }
+        }
+        // Now apply the data for all keys  
+        // TODO : Add checks for large data
+        // size maybe we want to check the 
+        // data size and then apply.
+        Set<String> keys = rms.keySet();
+        for(String pKey : keys)
+        {
+        	rm = rms.get(pKey);
+        	if( rm != null)
+        	{
+        		rm.apply();
+        	}
+        }
+    }
+    
+    
+    void parseFileList(File dir) 
+    {
+		int fileCount = dir.list().length;
+		for ( int i = 0 ; i < fileCount ; i++ ) 
+		{
+			File file = new File(dir.list()[i]);
+			if ( file.isDirectory())
+			{
+				parseFileList(file);
+			}
+			else 
+			{
+				try
+				{
+					if(importer_.key.optimizeIt != null && importer_.key.optimizeIt)
+					{
+						if(checkIfProcessKey(dir.list()[i]))
+						{
+							parse(dir.listFiles()[i].getAbsolutePath());
+						}
+					}
+					else
+					{
+						parse(dir.listFiles()[i].getAbsolutePath());
+					}
+				}
+				catch ( Throwable ex ) 
+				{
+					logger_.error(ex.toString());
+				}
+			}
+		}
+    }
+	
+    void preLoad(File rootDirectory) throws Throwable
+    {
+        String table = DatabaseDescriptor.getTables().get(0);
+        String cfName = Table.recycleBin_ + ":" + "Keys";
+        /* populate just the keys. */
+        preParse(rootDirectory, table, cfName);
+        /* dump the memtables */
+        Table.open(table).flush(false);
+        /* force a compaction of the files. */
+        Table.open(table).forceCompaction(null,null,null);
+        
+        /*
+         * This is a hack to let everyone finish. Just sleep for
+         * a couple of minutes. 
+        */
+        logger_.info("Taking a nap after forcing a compaction ...");
+        Thread.sleep(Loader.siesta_);
+        
+        /* Figure out the keys in the index file to relocate the node */
+        List<String> ssTables = Table.open(table).getAllSSTablesOnDisk();
+        /* Load the indexes into memory */
+        for ( String df : ssTables )
+        {
+        	SSTable ssTable = new SSTable(df);
+        	ssTable.close();
+        }
+        /* We should have only one file since we just compacted. */        
+        List<String> indexedKeys = SSTable.getIndexedKeys();        
+        storageService_.relocate(indexedKeys.toArray( new String[0]) );
+        
+        /*
+         * This is a hack to let everyone relocate and learn about
+         * each other. Just sleep for a couple of minutes. 
+        */
+        logger_.info("Taking a nap after relocating ...");
+        Thread.sleep(Loader.siesta_);  
+        
+        /* 
+         * Do the cleanup necessary. Delete all commit logs and
+         * the SSTables and reset the load state in the StorageService. 
+        */
+        SSTable.delete(ssTables.get(0));
+//        File commitLogDirectory = new File( DatabaseDescriptor.getLogFileLocation() );
+//        FileUtils.delete(commitLogDirectory.listFiles());
+        storageService_.resetLoadState();
+        logger_.info("Finished all the requisite clean up ...");
+    }
+    
+	void load(String xmlFile) throws Throwable
+	{
+		try
+		{
+			JAXBContext jc = JAXBContext.newInstance(this.getClass().getPackage().getName());			
+			Unmarshaller u = jc.createUnmarshaller();			
+			importer_ = (Importer)u.unmarshal(new FileInputStream( xmlFile ) );
+			String directory = importer_.columnFamily.directory;
+            File rootDirectory = new File(directory);
+            preLoad(rootDirectory);
+			parseFileList(rootDirectory);
+		}
+		catch (Exception e)
+		{
+			logger_.info(LogUtil.throwableToString(e));
+		}
+		
+	}
+
+	/**
+	 * @param args
+	 */
+	public static void main(String[] args) throws Throwable
+	{
+		LogUtil.init();
+        StorageService s = StorageService.instance();
+        s.start();
+		Loader loader = new Loader(s);
+		loader.load("mbox_importer.xml");
+	}
+
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/loader/ObjectFactory.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/loader/ObjectFactory.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/loader/ObjectFactory.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/loader/ObjectFactory.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,136 @@
+//
+// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, vJAXB 2.1.3 in JDK 1.6 
+// See <a href="http://java.sun.com/xml/jaxb">http://java.sun.com/xml/jaxb</a> 
+// Any modifications to this file will be lost upon recompilation of the source schema. 
+// Generated on: 2007.10.19 at 01:36:41 PM PDT 
+//
+
+
+package org.apache.cassandra.loader;
+
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.annotation.XmlElementDecl;
+import javax.xml.bind.annotation.XmlRegistry;
+import javax.xml.namespace.QName;
+
+
+/**
+ * This object contains factory methods for each 
+ * Java content interface and Java element interface 
+ * generated in the com.facebook.infrastructure.loader package. 
+ * <p>An ObjectFactory allows you to programatically 
+ * construct new instances of the Java representation 
+ * for XML content. The Java representation of XML 
+ * content can consist of schema derived interfaces 
+ * and classes representing the binding of schema 
+ * type definitions, element declarations and model 
+ * groups.  Factory methods for each of these are 
+ * provided in this class.
+ * 
+ */
+@XmlRegistry
+public class ObjectFactory {
+
+    private final static QName _SuperColumn_QNAME = new QName("", "SuperColumn");
+    private final static QName _Table_QNAME = new QName("", "Table");
+    private final static QName _Column_QNAME = new QName("", "Column");
+
+    /**
+     * Create a new ObjectFactory that can be used to create new instances of schema derived classes for package: com.facebook.infrastructure.loader
+     * 
+     */
+    public ObjectFactory() {
+    }
+
+    /**
+     * Create an instance of {@link KeyType }
+     * 
+     */
+    public KeyType createKeyType() {
+        return new KeyType();
+    }
+
+    /**
+     * Create an instance of {@link SuperColumnType }
+     * 
+     */
+    public SuperColumnType createSuperColumnType() {
+        return new SuperColumnType();
+    }
+
+    /**
+     * Create an instance of {@link ColumnType }
+     * 
+     */
+    public ColumnType createColumnType() {
+        return new ColumnType();
+    }
+
+    /**
+     * Create an instance of {@link Importer }
+     * 
+     */
+    public Importer createImporter() {
+        return new Importer();
+    }
+
+    /**
+     * Create an instance of {@link ColumnFamilyType }
+     * 
+     */
+    public ColumnFamilyType createColumnFamilyType() {
+        return new ColumnFamilyType();
+    }
+
+    /**
+     * Create an instance of {@link TimestampType }
+     * 
+     */
+    public TimestampType createTimestampType() {
+        return new TimestampType();
+    }
+
+    /**
+     * Create an instance of {@link FieldCollection }
+     * 
+     */
+    public FieldCollection createFieldCollection() {
+        return new FieldCollection();
+    }
+
+    /**
+     * Create an instance of {@link ValueType }
+     * 
+     */
+    public ValueType createValueType() {
+        return new ValueType();
+    }
+
+    /**
+     * Create an instance of {@link JAXBElement }{@code <}{@link SuperColumnType }{@code >}}
+     * 
+     */
+    @XmlElementDecl(namespace = "", name = "SuperColumn")
+    public JAXBElement<SuperColumnType> createSuperColumn(SuperColumnType value) {
+        return new JAXBElement<SuperColumnType>(_SuperColumn_QNAME, SuperColumnType.class, null, value);
+    }
+
+    /**
+     * Create an instance of {@link JAXBElement }{@code <}{@link String }{@code >}}
+     * 
+     */
+    @XmlElementDecl(namespace = "", name = "Table")
+    public JAXBElement<String> createTable(String value) {
+        return new JAXBElement<String>(_Table_QNAME, String.class, null, value);
+    }
+
+    /**
+     * Create an instance of {@link JAXBElement }{@code <}{@link ColumnType }{@code >}}
+     * 
+     */
+    @XmlElementDecl(namespace = "", name = "Column")
+    public JAXBElement<ColumnType> createColumn(ColumnType value) {
+        return new JAXBElement<ColumnType>(_Column_QNAME, ColumnType.class, null, value);
+    }
+
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/loader/PreLoad.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/loader/PreLoad.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/loader/PreLoad.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/loader/PreLoad.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.loader;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class PreLoad
+{
+	
+	private static long siesta_ = 2*60*1000;
+	private static Logger logger_ = Logger.getLogger( Loader.class );
+    private StorageService storageService_;
+	
+	public PreLoad(StorageService storageService)
+    {
+        storageService_ = storageService;
+    }
+    /*
+     * This method loads all the keys into a special column family 
+     * called "RecycleBin". This column family is used for temporary
+     * processing of data and then can be recycled. The idea is that 
+     * after the load is complete we have all the keys in the system.
+     * Now we force a compaction and examine the single Index file 
+     * that is generated to determine how the nodes need to relocate
+     * to be perfectly load balanced.
+     * 
+     *  param @ rootDirectory - rootDirectory at which the parsing begins.
+     *  param @ table - table that will be populated.
+     *  param @ cfName - name of the column that will be populated. This is 
+     *  passed in so that we do not unncessary allocate temporary String objects.
+    */
+    private void preParse(String rootDirectory, String table, String cfName) throws Throwable
+    {        
+        BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+                new FileInputStream(rootDirectory)), 16 * 1024 * 1024);
+        String line = null;
+        while ((line = bufReader.readLine()) != null)
+        {
+                String userId = line;
+                RowMutation rm = new RowMutation(table, userId);
+                rm.add(cfName, userId.getBytes(), 0);
+                rm.apply();
+        }
+    }
+    
+    void run(String userFile) throws Throwable
+    {
+        String table = DatabaseDescriptor.getTables().get(0);
+        String cfName = Table.recycleBin_ + ":" + "Keys";
+        /* populate just the keys. */
+        preParse(userFile, table, cfName);
+        /* dump the memtables */
+        Table.open(table).flush(false);
+        /* force a compaction of the files. */
+        Table.open(table).forceCompaction(null, null,null);
+        
+        /*
+         * This is a hack to let everyone finish. Just sleep for
+         * a couple of minutes. 
+        */
+        logger_.info("Taking a nap after forcing a compaction ...");
+        Thread.sleep(PreLoad.siesta_);
+        
+        /* Figure out the keys in the index file to relocate the node */
+        List<String> ssTables = Table.open(table).getAllSSTablesOnDisk();
+        /* Load the indexes into memory */
+        for ( String df : ssTables )
+        {
+        	SSTable ssTable = new SSTable(df);
+        	ssTable.close();
+        }
+        /* We should have only one file since we just compacted. */        
+        List<String> indexedKeys = SSTable.getIndexedKeys();        
+        storageService_.relocate(indexedKeys.toArray( new String[0]) );
+        
+        /*
+         * This is a hack to let everyone relocate and learn about
+         * each other. Just sleep for a couple of minutes. 
+        */
+        logger_.info("Taking a nap after relocating ...");
+        Thread.sleep(PreLoad.siesta_);  
+        
+        /* 
+         * Do the cleanup necessary. Delete all commit logs and
+         * the SSTables and reset the load state in the StorageService. 
+        */
+        SSTable.delete(ssTables.get(0));
+        storageService_.resetLoadState();
+        logger_.info("Finished all the requisite clean up ...");
+    }
+
+    
+	/**
+	 * @param args
+	 */
+	public static void main(String[] args) throws Throwable
+	{
+		if(args.length != 1)
+		{
+			System.out.println("Usage: PreLoad <Fully qualified path of the file containing user names>");
+		}
+		// TODO Auto-generated method stub
+		LogUtil.init();
+        StorageService s = StorageService.instance();
+        s.start();
+        PreLoad preLoad = new PreLoad(s);
+        preLoad.run(args[0]);
+	}
+
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/loader/SuperColumnType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/loader/SuperColumnType.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/loader/SuperColumnType.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/loader/SuperColumnType.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,97 @@
+//
+// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, vJAXB 2.1.3 in JDK 1.6 
+// See <a href="http://java.sun.com/xml/jaxb">http://java.sun.com/xml/jaxb</a> 
+// Any modifications to this file will be lost upon recompilation of the source schema. 
+// Generated on: 2007.10.19 at 01:36:41 PM PDT 
+//
+
+
+package org.apache.cassandra.loader;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlType;
+
+
+/**
+ * <p>Java class for SuperColumnType complex type.
+ * 
+ * <p>The following schema fragment specifies the expected content contained within this class.
+ * 
+ * <pre>
+ * &lt;complexType name="SuperColumnType">
+ *   &lt;complexContent>
+ *     &lt;restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
+ *       &lt;sequence>
+ *         &lt;element name="Fields" type="{}FieldCollection"/>
+ *       &lt;/sequence>
+ *       &lt;attribute name="Tokenize" type="{http://www.w3.org/2001/XMLSchema}boolean" />
+ *     &lt;/restriction>
+ *   &lt;/complexContent>
+ * &lt;/complexType>
+ * </pre>
+ * 
+ * 
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(name = "SuperColumnType", propOrder = {
+    "fields"
+})
+public class SuperColumnType {
+
+    @XmlElement(name = "Fields", required = true)
+    protected FieldCollection fields;
+    @XmlAttribute(name = "Tokenize")
+    protected Boolean tokenize;
+
+    /**
+     * Gets the value of the fields property.
+     * 
+     * @return
+     *     possible object is
+     *     {@link FieldCollection }
+     *     
+     */
+    public FieldCollection getFields() {
+        return fields;
+    }
+
+    /**
+     * Sets the value of the fields property.
+     * 
+     * @param value
+     *     allowed object is
+     *     {@link FieldCollection }
+     *     
+     */
+    public void setFields(FieldCollection value) {
+        this.fields = value;
+    }
+
+    /**
+     * Gets the value of the tokenize property.
+     * 
+     * @return
+     *     possible object is
+     *     {@link Boolean }
+     *     
+     */
+    public Boolean isTokenize() {
+        return tokenize;
+    }
+
+    /**
+     * Sets the value of the tokenize property.
+     * 
+     * @param value
+     *     allowed object is
+     *     {@link Boolean }
+     *     
+     */
+    public void setTokenize(Boolean value) {
+        this.tokenize = value;
+    }
+
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/loader/TimestampType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/loader/TimestampType.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/loader/TimestampType.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/loader/TimestampType.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,65 @@
+//
+// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, vJAXB 2.1.3 in JDK 1.6 
+// See <a href="http://java.sun.com/xml/jaxb">http://java.sun.com/xml/jaxb</a> 
+// Any modifications to this file will be lost upon recompilation of the source schema. 
+// Generated on: 2007.10.19 at 01:36:41 PM PDT 
+//
+
+
+package org.apache.cassandra.loader;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlType;
+
+
+/**
+ * <p>Java class for TimestampType complex type.
+ * 
+ * <p>The following schema fragment specifies the expected content contained within this class.
+ * 
+ * <pre>
+ * &lt;complexType name="TimestampType">
+ *   &lt;complexContent>
+ *     &lt;restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
+ *       &lt;attribute name="Field" type="{http://www.w3.org/2001/XMLSchema}int" />
+ *     &lt;/restriction>
+ *   &lt;/complexContent>
+ * &lt;/complexType>
+ * </pre>
+ * 
+ * 
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(name = "TimestampType")
+public class TimestampType {
+
+    @XmlAttribute(name = "Field")
+    protected Integer field;
+
+    /**
+     * Gets the value of the field property.
+     * 
+     * @return
+     *     possible object is
+     *     {@link Integer }
+     *     
+     */
+    public Integer getField() {
+        return field;
+    }
+
+    /**
+     * Sets the value of the field property.
+     * 
+     * @param value
+     *     allowed object is
+     *     {@link Integer }
+     *     
+     */
+    public void setField(Integer value) {
+        this.field = value;
+    }
+
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/loader/ValueType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/loader/ValueType.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/loader/ValueType.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/loader/ValueType.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,65 @@
+//
+// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, vJAXB 2.1.3 in JDK 1.6 
+// See <a href="http://java.sun.com/xml/jaxb">http://java.sun.com/xml/jaxb</a> 
+// Any modifications to this file will be lost upon recompilation of the source schema. 
+// Generated on: 2007.10.19 at 01:36:41 PM PDT 
+//
+
+
+package org.apache.cassandra.loader;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlType;
+
+
+/**
+ * <p>Java class for ValueType complex type.
+ * 
+ * <p>The following schema fragment specifies the expected content contained within this class.
+ * 
+ * <pre>
+ * &lt;complexType name="ValueType">
+ *   &lt;complexContent>
+ *     &lt;restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
+ *       &lt;attribute name="Field" type="{http://www.w3.org/2001/XMLSchema}int" />
+ *     &lt;/restriction>
+ *   &lt;/complexContent>
+ * &lt;/complexType>
+ * </pre>
+ * 
+ * 
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(name = "ValueType")
+public class ValueType {
+
+    @XmlAttribute(name = "Field")
+    protected Integer field;
+
+    /**
+     * Gets the value of the field property.
+     * 
+     * @return
+     *     possible object is
+     *     {@link Integer }
+     *     
+     */
+    public Integer getField() {
+        return field;
+    }
+
+    /**
+     * Sets the value of the field property.
+     * 
+     * @param value
+     *     allowed object is
+     *     {@link Integer }
+     *     
+     */
+    public void setField(Integer value) {
+        this.field = value;
+    }
+
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,112 @@
+package org.apache.cassandra.locator;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.log4j.Logger;
+
+/**
+ * This class contains a helper method that will be used by
+ * all abstraction that implement the IReplicaPlacementStrategy
+ * interface.
+*/
+public abstract class AbstractStrategy implements IReplicaPlacementStrategy
+{
+    protected static Logger logger_ = Logger.getLogger(AbstractStrategy.class);
+    
+    protected TokenMetadata tokenMetadata_;
+    
+    AbstractStrategy(TokenMetadata tokenMetadata)
+    {
+        tokenMetadata_ = tokenMetadata;
+    }
+    
+    /*
+     * This method changes the ports of the endpoints from
+     * the control port to the storage ports.
+    */
+    protected void retrofitPorts(List<EndPoint> eps)
+    {
+        for ( EndPoint ep : eps )
+        {
+            ep.setPort(DatabaseDescriptor.getStoragePort());
+        }
+    }
+
+    protected EndPoint getNextAvailableEndPoint(EndPoint startPoint, List<EndPoint> topN, List<EndPoint> liveNodes)
+    {
+        EndPoint endPoint = null;
+        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+        Collections.sort(tokens);
+        BigInteger token = tokenMetadata_.getToken(startPoint);
+        int index = Collections.binarySearch(tokens, token);
+        if(index < 0)
+        {
+            index = (index + 1) * (-1);
+            if (index >= tokens.size())
+                index = 0;
+        }
+        int totalNodes = tokens.size();
+        int startIndex = (index+1)%totalNodes;
+        for (int i = startIndex, count = 1; count < totalNodes ; ++count, i = (i+1)%totalNodes)
+        {
+            EndPoint tmpEndPoint = tokenToEndPointMap.get(tokens.get(i));
+            if(FailureDetector.instance().isAlive(tmpEndPoint) && !topN.contains(tmpEndPoint) && !liveNodes.contains(tmpEndPoint))
+            {
+                endPoint = tmpEndPoint;
+                break;
+            }
+        }
+        return endPoint;
+    }
+
+    /*
+     * This method returns the hint map. The key is the endpoint
+     * on which the data is being placed and the value is the
+     * endpoint which is in the top N.
+     * Get the map of top N to the live nodes currently.
+     */
+    public Map<EndPoint, EndPoint> getHintedStorageEndPoints(BigInteger token)
+    {
+        List<EndPoint> liveList = new ArrayList<EndPoint>();
+        Map<EndPoint, EndPoint> map = new HashMap<EndPoint, EndPoint>();
+        EndPoint[] topN = getStorageEndPoints( token );
+
+        for( int i = 0 ; i < topN.length ; i++)
+        {
+            if( FailureDetector.instance().isAlive(topN[i]))
+            {
+                map.put(topN[i], topN[i]);
+                liveList.add(topN[i]) ;
+            }
+            else
+            {
+                EndPoint endPoint = getNextAvailableEndPoint(topN[i], Arrays.asList(topN), liveList);
+                if(endPoint != null)
+                {
+                    map.put(endPoint, topN[i]);
+                    liveList.add(endPoint) ;
+                }
+                else
+                {
+                    // log a warning , maybe throw an exception
+                    logger_.warn("Unable to find a live Endpoint we might be out of live nodes , This is dangerous !!!!");
+                }
+            }
+        }
+        return map;
+    }
+
+    public abstract EndPoint[] getStorageEndPoints(BigInteger token);
+
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/locator/EndPointSnitch.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/EndPointSnitch.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/EndPointSnitch.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/EndPointSnitch.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.net.*;
+
+import org.apache.cassandra.net.EndPoint;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class EndPointSnitch implements IEndPointSnitch
+{
+    public boolean isOnSameRack(EndPoint host, EndPoint host2) throws UnknownHostException
+    {
+        /*
+         * Look at the IP Address of the two hosts. Compare 
+         * the 3rd octet. If they are the same then the hosts
+         * are in the same rack else different racks. 
+        */        
+        byte[] ip = getIPAddress(host.getHost());
+        byte[] ip2 = getIPAddress(host2.getHost());
+        
+        return ( ip[2] == ip2[2] );
+    }
+    
+    public boolean isInSameDataCenter(EndPoint host, EndPoint host2) throws UnknownHostException
+    {
+        /*
+         * Look at the IP Address of the two hosts. Compare 
+         * the 2nd octet. If they are the same then the hosts
+         * are in the same datacenter else different datacenter. 
+        */
+        byte[] ip = getIPAddress(host.getHost());
+        byte[] ip2 = getIPAddress(host2.getHost());
+        
+        return ( ip[1] == ip2[1] );
+    }
+    
+    private byte[] getIPAddress(String host) throws UnknownHostException
+    {
+        InetAddress ia = InetAddress.getByName(host);         
+        return ia.getAddress();
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/locator/IEndPointSnitch.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/IEndPointSnitch.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/IEndPointSnitch.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/IEndPointSnitch.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.net.UnknownHostException;
+
+import org.apache.cassandra.net.EndPoint;
+
+
+/**
+ * This interface helps determine location of node in the data center relative to another node.
+ * Give a node A and another node B it can tell if A and B are on the same rack or in the same
+ * data center.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IEndPointSnitch
+{
+    /**
+     * Helps determine if 2 nodes are in the same rack in the data center.
+     * @param host a specified endpoint
+     * @param host2 another specified endpoint
+     * @return true if on the same rack false otherwise
+     * @throws UnknownHostException
+     */
+    public boolean isOnSameRack(EndPoint host, EndPoint host2) throws UnknownHostException;
+    
+    /**
+     * Helps determine if 2 nodes are in the same data center.
+     * @param host a specified endpoint
+     * @param host2 another specified endpoint
+     * @return true if in the same data center false otherwise
+     * @throws UnknownHostException
+     */
+    public boolean isInSameDataCenter(EndPoint host, EndPoint host2) throws UnknownHostException;
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.math.BigInteger;
+import java.util.Map;
+
+import org.apache.cassandra.net.EndPoint;
+
+
+/*
+ * This interface has two implementations. One which
+ * does not respect rack or datacenter awareness and
+ * the other which does.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public interface IReplicaPlacementStrategy
+{
+	public EndPoint[] getStorageEndPoints(BigInteger token);
+	public Map<String, EndPoint[]> getStorageEndPoints(String[] keys);
+    public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap);
+    public Map<EndPoint, EndPoint> getHintedStorageEndPoints(BigInteger token);    
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,205 @@
+package org.apache.cassandra.locator;
+
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+
+
+/*
+ * This class returns the nodes responsible for a given
+ * key but does respects rack awareness. It makes a best
+ * effort to get a node from a different data center and
+ * a node in a different rack in the same datacenter as
+ * the primary.
+ */
+public class RackAwareStrategy extends AbstractStrategy
+{
+    public RackAwareStrategy(TokenMetadata tokenMetadata)
+    {
+        super(tokenMetadata);
+    }
+    
+    public EndPoint[] getStorageEndPoints(BigInteger token)
+    {
+        int startIndex = 0 ;
+        List<EndPoint> list = new ArrayList<EndPoint>();
+        boolean bDataCenter = false;
+        boolean bOtherRack = false;
+        int foundCount = 0;
+        int N = DatabaseDescriptor.getReplicationFactor();
+        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+        Collections.sort(tokens);
+        int index = Collections.binarySearch(tokens, token);
+        if(index < 0)
+        {
+            index = (index + 1) * (-1);
+            if (index >= tokens.size())
+                index = 0;
+        }
+        int totalNodes = tokens.size();
+        // Add the node at the index by default
+        list.add(tokenToEndPointMap.get(tokens.get(index)));
+        foundCount++;
+        if( N == 1 )
+        {
+            return list.toArray(new EndPoint[0]);
+        }
+        startIndex = (index + 1)%totalNodes;
+        IEndPointSnitch endPointSnitch = StorageService.instance().getEndPointSnitch();
+        
+        for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+        {
+            try
+            {
+                // First try to find one in a different data center
+                if(!endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
+                {
+                    // If we have already found something in a diff datacenter no need to find another
+                    if( !bDataCenter )
+                    {
+                        list.add(tokenToEndPointMap.get(tokens.get(i)));
+                        bDataCenter = true;
+                        foundCount++;
+                    }
+                    continue;
+                }
+                // Now  try to find one on a different rack
+                if(!endPointSnitch.isOnSameRack(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))) &&
+                        endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
+                {
+                    // If we have already found something in a diff rack no need to find another
+                    if( !bOtherRack )
+                    {
+                        list.add(tokenToEndPointMap.get(tokens.get(i)));
+                        bOtherRack = true;
+                        foundCount++;
+                    }
+                    continue;
+                }
+            }
+            catch (UnknownHostException e)
+            {
+                logger_.debug(LogUtil.throwableToString(e));
+            }
+
+        }
+        // If we found N number of nodes we are good. This loop wil just exit. Otherwise just
+        // loop through the list and add until we have N nodes.
+        for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+        {
+            if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+            {
+                list.add(tokenToEndPointMap.get(tokens.get(i)));
+                foundCount++;
+                continue;
+            }
+        }
+        retrofitPorts(list);
+        return list.toArray(new EndPoint[0]);
+    }
+    
+    public Map<String, EndPoint[]> getStorageEndPoints(String[] keys)
+    {
+    	Map<String, EndPoint[]> results = new HashMap<String, EndPoint[]>();
+    	List<EndPoint> list = new ArrayList<EndPoint>();
+    	int startIndex = 0 ;
+    	int foundCount = 0;
+    	boolean bDataCenter = false;
+        boolean bOtherRack = false;
+    	
+    	Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+    	int N = DatabaseDescriptor.getReplicationFactor();
+        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+        Collections.sort(tokens);
+        
+        for ( String key : keys )
+        {
+        	BigInteger token = StorageService.hash(key);
+        	int index = Collections.binarySearch(tokens, token);
+            if(index < 0)
+            {
+                index = (index + 1) * (-1);
+                if (index >= tokens.size())
+                    index = 0;
+            }
+            int totalNodes = tokens.size();
+            // Add the node at the index by default
+            list.add(tokenToEndPointMap.get(tokens.get(index)));
+            foundCount++;
+            if( N == 1 )
+            {
+            	results.put( key, list.toArray(new EndPoint[0]) );
+                return results;
+            }
+            startIndex = (index + 1)%totalNodes;
+            IEndPointSnitch endPointSnitch = StorageService.instance().getEndPointSnitch();
+            
+            for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+            {
+                try
+                {
+                    // First try to find one in a different data center
+                    if(!endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
+                    {
+                        // If we have already found something in a diff datacenter no need to find another
+                        if( !bDataCenter )
+                        {
+                            list.add(tokenToEndPointMap.get(tokens.get(i)));
+                            bDataCenter = true;
+                            foundCount++;
+                        }
+                        continue;
+                    }
+                    // Now  try to find one on a different rack
+                    if(!endPointSnitch.isOnSameRack(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))) &&
+                            endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
+                    {
+                        // If we have already found something in a diff rack no need to find another
+                        if( !bOtherRack )
+                        {
+                            list.add(tokenToEndPointMap.get(tokens.get(i)));
+                            bOtherRack = true;
+                            foundCount++;
+                        }
+                        continue;
+                    }
+                }
+                catch (UnknownHostException e)
+                {
+                    logger_.debug(LogUtil.throwableToString(e));
+                }
+
+            }
+            // If we found N number of nodes we are good. This loop wil just exit. Otherwise just
+            // loop through the list and add until we have N nodes.
+            for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+            {
+                if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+                {
+                    list.add(tokenToEndPointMap.get(tokens.get(i)));
+                    foundCount++;
+                    continue;
+                }
+            }
+            retrofitPorts(list);
+            results.put(key, list.toArray(new EndPoint[0]));
+        }
+        
+        return results;
+    }
+    
+    public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
+    {
+        throw new UnsupportedOperationException("This operation is not currently supported");
+    }
+}
\ No newline at end of file

Added: incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,154 @@
+package org.apache.cassandra.locator;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+
+
+/**
+ * This class returns the nodes responsible for a given
+ * key but does not respect rack awareness. Basically
+ * returns the 3 nodes that lie right next to each other
+ * on the ring.
+ */
+public class RackUnawareStrategy extends AbstractStrategy
+{   
+    /* Use this flag to check if initialization is in order. */
+    private AtomicBoolean initialized_ = new AtomicBoolean(false);
+    private Map<Range, List<EndPoint>> rangeToEndPointMap_;
+    
+    public RackUnawareStrategy(TokenMetadata tokenMetadata)
+    {
+        super(tokenMetadata);
+    }
+    
+    public EndPoint[] getStorageEndPoints(BigInteger token)
+    {
+        return getStorageEndPoints(token, tokenMetadata_.cloneTokenEndPointMap());            
+    }
+    
+    public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
+    {
+        int startIndex = 0 ;
+        List<EndPoint> list = new ArrayList<EndPoint>();
+        int foundCount = 0;
+        int N = DatabaseDescriptor.getReplicationFactor();
+        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+        Collections.sort(tokens);
+        int index = Collections.binarySearch(tokens, token);
+        if(index < 0)
+        {
+            index = (index + 1) * (-1);
+            if (index >= tokens.size())
+                index = 0;
+        }
+        int totalNodes = tokens.size();
+        // Add the node at the index by default
+        list.add(tokenToEndPointMap.get(tokens.get(index)));
+        foundCount++;
+        startIndex = (index + 1)%totalNodes;
+        // If we found N number of nodes we are good. This loop will just exit. Otherwise just
+        // loop through the list and add until we have N nodes.
+        for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+        {
+            if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+            {
+                list.add(tokenToEndPointMap.get(tokens.get(i)));
+                foundCount++;
+                continue;
+            }
+        }
+        retrofitPorts(list);
+        return list.toArray(new EndPoint[0]);
+    }
+    
+    private void doInitialization()
+    {
+        if ( !initialized_.get() )
+        {
+            /* construct the mapping from the ranges to the replicas responsible for them */
+            rangeToEndPointMap_ = StorageService.instance().getRangeToEndPointMap();            
+            initialized_.set(true);
+        }
+    }
+    
+    /**
+     * This method determines which range in the array actually contains
+     * the hash of the key
+     * @param ranges
+     * @param key
+     * @return
+     */
+    private int findRangeIndexForKey(Range[] ranges, String key)
+    {
+        int index = 0;
+        BigInteger hash = StorageService.hash(key);
+        for ( int i = 0; i < ranges.length; ++i )
+        {
+            if ( ranges[i].contains(hash) )
+            {
+                index = i;
+                break;
+            }
+        }
+        
+        return index;
+    }
+    
+    public Map<String, EndPoint[]> getStorageEndPoints(String[] keys)
+    {              
+        Arrays.sort(keys);
+        Range[] ranges = StorageService.instance().getAllRanges();
+        
+    	Map<String, EndPoint[]> results = new HashMap<String, EndPoint[]>();
+    	List<EndPoint> list = new ArrayList<EndPoint>();
+    	int startIndex = 0 ;
+    	int foundCount = 0;
+    	
+    	Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+    	int N = DatabaseDescriptor.getReplicationFactor();
+        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+        Collections.sort(tokens);
+        for ( String key : keys )
+        {
+        	BigInteger token = StorageService.hash(key);
+        	int index = Collections.binarySearch(tokens, token);
+            if(index < 0)
+            {
+                index = (index + 1) * (-1);
+                if (index >= tokens.size())
+                    index = 0;
+            }
+            int totalNodes = tokens.size();
+            // Add the node at the index by default
+            list.add(tokenToEndPointMap.get(tokens.get(index)));
+            foundCount++;
+            startIndex = (index + 1)%totalNodes;
+            // If we found N number of nodes we are good. This loop will just exit. Otherwise just
+            // loop through the list and add until we have N nodes.
+            for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+            {
+                if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+                {
+                    list.add(tokenToEndPointMap.get(tokens.get(i)));
+                    foundCount++;
+                    continue;
+                }
+            }
+            retrofitPorts(list);
+            results.put(key, list.toArray(new EndPoint[0]));
+        }
+        
+        return results;
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TokenMetadata
+{
+    private static ICompactSerializer<TokenMetadata> serializer_ = new TokenMetadataSerializer();
+    
+    public static ICompactSerializer<TokenMetadata> serializer()
+    {
+        return serializer_;
+    }
+    
+    /* Maintains token to endpoint map of every node in the cluster. */    
+    private Map<BigInteger, EndPoint> tokenToEndPointMap_ = new HashMap<BigInteger, EndPoint>();    
+    /* Maintains a reverse index of endpoint to token in the cluster. */
+    private Map<EndPoint, BigInteger> endPointToTokenMap_ = new HashMap<EndPoint, BigInteger>();
+    
+    /* Use this lock for manipulating the token map */
+    private ReadWriteLock lock_ = new ReentrantReadWriteLock(true);
+    
+    /*
+     * For JAXB purposes. 
+    */
+    public TokenMetadata()
+    {
+    }
+    
+    protected TokenMetadata(Map<BigInteger, EndPoint> tokenToEndPointMap, Map<EndPoint, BigInteger> endPointToTokenMap)
+    {
+        tokenToEndPointMap_ = tokenToEndPointMap;
+        endPointToTokenMap_ = endPointToTokenMap;
+    }
+    
+    public TokenMetadata cloneMe()
+    {
+        Map<BigInteger, EndPoint> tokenToEndPointMap = cloneTokenEndPointMap();
+        Map<EndPoint, BigInteger> endPointToTokenMap = cloneEndPointTokenMap();
+        return new TokenMetadata( tokenToEndPointMap, endPointToTokenMap );
+    }
+    
+    /**
+     * Update the two maps in an safe mode. 
+    */
+    public void update(BigInteger token, EndPoint endpoint)
+    {
+        lock_.writeLock().lock();
+        try
+        {            
+            BigInteger oldToken = endPointToTokenMap_.get(endpoint);
+            if ( oldToken != null )
+                tokenToEndPointMap_.remove(oldToken);
+            tokenToEndPointMap_.put(token, endpoint);
+            endPointToTokenMap_.put(endpoint, token);
+        }
+        finally
+        {
+            lock_.writeLock().unlock();
+        }
+    }
+    
+    /**
+     * Remove the entries in the two maps.
+     * @param endpoint
+     */
+    public void remove(EndPoint endpoint)
+    {
+        lock_.writeLock().lock();
+        try
+        {            
+            BigInteger oldToken = endPointToTokenMap_.get(endpoint);
+            if ( oldToken != null )
+                tokenToEndPointMap_.remove(oldToken);            
+            endPointToTokenMap_.remove(endpoint);
+        }
+        finally
+        {
+            lock_.writeLock().unlock();
+        }
+    }
+    
+    public BigInteger getToken(EndPoint endpoint)
+    {
+        lock_.readLock().lock();
+        try
+        {
+            return endPointToTokenMap_.get(endpoint);
+        }
+        finally
+        {
+            lock_.readLock().unlock();
+        }
+    }
+    
+    public boolean isKnownEndPoint(EndPoint ep)
+    {
+        lock_.readLock().lock();
+        try
+        {
+            return endPointToTokenMap_.containsKey(ep);
+        }
+        finally
+        {
+            lock_.readLock().unlock();
+        }
+    }
+    
+    /*
+     * Returns a safe clone of tokenToEndPointMap_.
+    */
+    public Map<BigInteger, EndPoint> cloneTokenEndPointMap()
+    {
+        lock_.readLock().lock();
+        try
+        {            
+            return new HashMap<BigInteger, EndPoint>( tokenToEndPointMap_ );
+        }
+        finally
+        {
+            lock_.readLock().unlock();
+        }
+    }
+    
+    /*
+     * Returns a safe clone of endPointTokenMap_.
+    */
+    public Map<EndPoint, BigInteger> cloneEndPointTokenMap()
+    {
+        lock_.readLock().lock();
+        try
+        {            
+            return new HashMap<EndPoint, BigInteger>( endPointToTokenMap_ );
+        }
+        finally
+        {
+            lock_.readLock().unlock();
+        }
+    }
+    
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();
+        Set<EndPoint> eps = endPointToTokenMap_.keySet();
+        
+        for ( EndPoint ep : eps )
+        {
+            sb.append(ep);
+            sb.append(":");
+            sb.append(endPointToTokenMap_.get(ep));
+            sb.append(System.getProperty("line.separator"));
+        }
+        
+        return sb.toString();
+    }
+}
+
+class TokenMetadataSerializer implements ICompactSerializer<TokenMetadata>
+{
+    public void serialize(TokenMetadata tkMetadata, DataOutputStream dos) throws IOException
+    {        
+        Map<BigInteger, EndPoint> tokenToEndPointMap = tkMetadata.cloneTokenEndPointMap();
+        Set<BigInteger> tokens = tokenToEndPointMap.keySet();
+        /* write the size */
+        dos.writeInt(tokens.size());        
+        for ( BigInteger token : tokens )
+        {
+            byte[] bytes = token.toByteArray();
+            /* Convert the BigInteger to byte[] and persist */
+            dos.writeInt(bytes.length);
+            dos.write(bytes); 
+            /* Write the endpoint out */
+            CompactEndPointSerializationHelper.serialize(tokenToEndPointMap.get(token), dos);
+        }
+    }
+    
+    public TokenMetadata deserialize(DataInputStream dis) throws IOException
+    {
+        TokenMetadata tkMetadata = null;
+        int size = dis.readInt();
+        
+        if ( size > 0 )
+        {
+            Map<BigInteger, EndPoint> tokenToEndPointMap = new HashMap<BigInteger, EndPoint>();
+            Map<EndPoint, BigInteger> endPointToTokenMap = new HashMap<EndPoint, BigInteger>();
+            
+            for ( int i = 0; i < size; ++i )
+            {
+                /* Read the byte[] and convert to BigInteger */
+                byte[] bytes = new byte[dis.readInt()];
+                dis.readFully(bytes);
+                BigInteger token = new BigInteger(bytes);
+                /* Read the endpoint out */
+                EndPoint endpoint = CompactEndPointSerializationHelper.deserialize(dis);
+                tokenToEndPointMap.put(token, endpoint);
+                endPointToTokenMap.put(endpoint, token);
+            }
+            
+            tkMetadata = new TokenMetadata( tokenToEndPointMap, endPointToTokenMap );
+        }
+        
+        return tkMetadata;
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/AsyncResult.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/AsyncResult.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/AsyncResult.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.util.List;
+import java.util.Hashtable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.QuorumResponseHandler;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class AsyncResult implements IAsyncResult
+{
+    private static Logger logger_ = Logger.getLogger( AsyncResult.class );
+    private Object[] result_ = new Object[0];    
+    private AtomicBoolean done_ = new AtomicBoolean(false);
+    private Lock lock_ = new ReentrantLock();
+    private Condition condition_;
+
+    public AsyncResult()
+    {        
+        condition_ = lock_.newCondition();
+    }    
+    
+    public Object[] get()
+    {
+        lock_.lock();
+        try
+        {
+            if ( !done_.get() )
+            {
+                condition_.await();                    
+            }
+        }
+        catch ( InterruptedException ex )
+        {
+            logger_.warn( LogUtil.throwableToString(ex) );
+        }
+        finally
+        {
+            lock_.unlock();            
+        }        
+        return result_;
+    }
+    
+    public boolean isDone()
+    {
+        return done_.get();
+    }
+    
+    public Object[] get(long timeout, TimeUnit tu) throws TimeoutException
+    {
+        lock_.lock();
+        try
+        {            
+            boolean bVal = true;
+            try
+            {
+                if ( !done_.get() )
+                {                    
+                    bVal = condition_.await(timeout, tu);
+                }
+            }
+            catch ( InterruptedException ex )
+            {
+                logger_.warn( LogUtil.throwableToString(ex) );
+            }
+            
+            if ( !bVal && !done_.get() )
+            {                                           
+                throw new TimeoutException("Operation timed out.");
+            }
+        }
+        finally
+        {
+            lock_.unlock();      
+        }
+        return result_;
+    }
+    
+    void result(Object[] result)
+    {        
+        try
+        {
+            lock_.lock();
+            if ( !done_.get() )
+            {
+                result_ = result;
+                done_.set(true);
+                condition_.signal();
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }        
+    }    
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/CompactEndPointSerializationHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/CompactEndPointSerializationHelper.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/CompactEndPointSerializationHelper.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/CompactEndPointSerializationHelper.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class CompactEndPointSerializationHelper
+{
+    public static void serialize(EndPoint endPoint, DataOutputStream dos) throws IOException
+    {        
+        dos.write(EndPoint.toBytes(endPoint));
+    }
+    
+    public static EndPoint deserialize(DataInputStream dis) throws IOException
+    {     
+        byte[] bytes = new byte[6];
+        dis.readFully(bytes, 0, bytes.length);
+        return EndPoint.fromBytes(bytes);       
+    }
+    
+    private static byte[] getIPAddress(String host) throws UnknownHostException
+    {
+        InetAddress ia = InetAddress.getByName(host);         
+        return ia.getAddress();
+    }
+    
+    private static String getHostName(byte[] ipAddr) throws UnknownHostException
+    {
+        InetAddress ia = InetAddress.getByAddress(ipAddr);
+        return ia.getCanonicalHostName();
+    }
+    
+    public static void main(String[] args) throws Throwable
+    {
+        EndPoint ep = new EndPoint(7000);
+        byte[] bytes = EndPoint.toBytes(ep);
+        System.out.println(bytes.length);
+        EndPoint ep2 = EndPoint.fromBytes(bytes);
+        System.out.println(ep2);
+    }
+}
\ No newline at end of file

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/ConnectionStatistics.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/ConnectionStatistics.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/ConnectionStatistics.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/ConnectionStatistics.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ConnectionStatistics
+{
+    private String localHost;
+    private int localPort;
+    private String remoteHost;
+    private int remotePort;
+    private int totalConnections;
+    private int connectionsInUse;
+
+    ConnectionStatistics(EndPoint localEp, EndPoint remoteEp, int tc, int ciu)
+    {
+        localHost = localEp.getHost();
+        localPort = localEp.getPort();
+        remoteHost = remoteEp.getHost();
+        remotePort = remoteEp.getPort();
+        totalConnections = tc;
+        connectionsInUse = ciu;
+    }
+    
+    public String getLocalHost()
+    {
+        return localHost;
+    }
+    
+    public int getLocalPort()
+    {
+        return localPort;
+    }
+    
+    public String getRemoteHost()
+    {
+        return remoteHost;
+    }
+    
+    public int getRemotePort()
+    {
+        return remotePort;
+    }
+    
+    public int getTotalConnections()
+    {
+        return totalConnections;
+    }
+    
+    public int getConnectionInUse()
+    {
+        return connectionsInUse;
+    }
+
+    public String toString()
+    {
+        return localHost + ":" + localPort + "->" + remoteHost + ":" + remotePort + " Total Connections open : " + totalConnections + " Connections in use : " + connectionsInUse;
+    }
+}
\ No newline at end of file

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/EndPoint.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/EndPoint.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/EndPoint.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/EndPoint.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1 @@
+/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.cassandra.net;


import java.io.IOException;
import java.io.Serializable;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.u
 til.HashMap;
import java.util.Map;

import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;

/**
 * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
 */

public class EndPoint implements Serializable, Comparable<EndPoint>
{
	// logging and profiling.
	private static Logger logger_ = Logger.getLogger(EndPoint.class);
	private static final long serialVersionUID = -4962625949179835907L;
	private static Map<CharBuffer, String> hostNames_ = new HashMap<CharBuffer, String>();
    protected static final int randomPort_ = 5555;
    public static EndPoint randomLocalEndPoint_;
    
    static
    {
        try
        {
            randomLocalEndPoint_ = new EndPoint(FBUtilities.getHostName(), EndPoint.randomPort_);
        }        
        catch ( IOException ex )
        {
            logger_.warn(LogUtil.throwableToString(ex));
        }
    }

	private String host_;
	pri
 vate int port_;

	private transient InetSocketAddress ia_;

	/* Ctor for JAXB. DO NOT DELETE */
	private EndPoint()
	{
	}

	public EndPoint(String host, int port)
	{
		/*
		 * Attempts to resolve the host, but does not fail if it cannot.
		 */
		host_ = host;
		port_ = port;
	}

	// create a local endpoint id
	public EndPoint(int port)
	{
		try
		{
			host_ = FBUtilities.getLocalHostName();
			port_ = port;
		}
		catch (UnknownHostException e)
		{
			logger_.warn(LogUtil.throwableToString(e));
		}
	}

	public String getHost()
	{
		return host_;
	}

	public int getPort()
	{
		return port_;
	}

	public void setPort(int port)
	{
		port_ = port;
	}

	public InetSocketAddress getInetAddress()
	{
		if (ia_ == null || ia_.isUnresolved())
		{
			ia_ = new InetSocketAddress(host_, port_);
		}
		return ia_;
	}

	public boolean equals(Object o)
	{
		if (!(o instanceof EndPoint))
			return false;

		EndPoint rhs = (EndPoint) o;
		return (host_.equals(rhs.host_) && port_ == rhs.port_);
	
 }

	public int hashCode()
	{
		return (host_ + port_).hashCode();
	}

	public int compareTo(EndPoint rhs)
	{
		return host_.compareTo(rhs.host_);
	}

	public String toString()
	{
		return (host_ + ":" + port_);
	}

	public static EndPoint fromString(String str)
	{
		String[] values = str.split(":");
		return new EndPoint(values[0], Integer.parseInt(values[1]));
	}

	public static byte[] toBytes(EndPoint ep)
	{
		ByteBuffer buffer = ByteBuffer.allocate(6);
		byte[] iaBytes = ep.getInetAddress().getAddress().getAddress();
		buffer.put(iaBytes);
		buffer.put(MessagingService.toByteArray((short) ep.getPort()));
		buffer.flip();
		return buffer.array();
	}

	public static EndPoint fromBytes(byte[] bytes)
	{
		ByteBuffer buffer = ByteBuffer.allocate(4);
		System.arraycopy(bytes, 0, buffer.array(), 0, 4);
		byte[] portBytes = new byte[2];
		System.arraycopy(bytes, 4, portBytes, 0, portBytes.length);
		try
		{
			CharBuffer charBuffer = buffer.asCharBuffer();
			String host = hostNa
 mes_.get(charBuffer);
			if (host == null)
			{				
				host = InetAddress.getByAddress(buffer.array()).getHostName();				
				hostNames_.put(charBuffer, host);
			}
			int port = (int) MessagingService.byteArrayToShort(portBytes);
			return new EndPoint(host, port);
		}
		catch (UnknownHostException e)
		{
			throw new IllegalArgumentException(e);
		}
	}
}
\ No newline at end of file

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/FileStreamTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/FileStreamTask.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/FileStreamTask.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/FileStreamTask.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.*;
+import java.net.SocketException;
+
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class FileStreamTask implements Runnable
+{
+    private static Logger logger_ = Logger.getLogger( FileStreamTask.class );
+    
+    private String file_;
+    private long startPosition_;
+    private long total_;
+    private EndPoint from_;
+    private EndPoint to_;
+    
+    FileStreamTask(String file, long startPosition, long total, EndPoint from, EndPoint to)
+    {
+        file_ = file;
+        startPosition_ = startPosition;
+        total_ = total;
+        from_ = from;
+        to_ = to;
+    }
+    
+    public void run()
+    {
+        TcpConnection connection = null;
+        try
+        {                        
+            connection = new TcpConnection(from_, to_);
+            File file = new File(file_);             
+            connection.stream(file, startPosition_, total_);
+            MessagingService.setStreamingMode(false);
+            logger_.debug("Done streaming " + file);
+        }            
+        catch ( SocketException se )
+        {                        
+            logger_.info(LogUtil.throwableToString(se));
+        }
+        catch ( IOException e )
+        {
+            logConnectAndIOException(e, connection);
+        }
+        catch (Throwable th)
+        {
+            logger_.warn(LogUtil.throwableToString(th));
+        }        
+    }
+    
+    private void logConnectAndIOException(IOException ex, TcpConnection connection)
+    {                    
+        if ( connection != null )
+        {
+            connection.errorClose();
+        }
+        logger_.info(LogUtil.throwableToString(ex));
+    }
+}