You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 07:45:23 UTC

svn commit: r759033 [1/2] - in /incubator/cassandra/trunk/src/org/apache/cassandra: concurrent/ config/ cql/common/ db/ dht/ gms/ io/ locator/ net/ test/ tools/ utils/

Author: alakshman
Date: Fri Mar 27 06:45:19 2009
New Revision: 759033

URL: http://svn.apache.org/viewvc?rev=759033&view=rev
Log:
Final changes made to fix a checkin anomaly.

Added:
    incubator/cassandra/trunk/src/org/apache/cassandra/tools/ClusterTool.java
      - copied unchanged from r758964, incubator/cassandra/trunk/src/org/apache/cassandra/tools/ClusterTool.java
    incubator/cassandra/trunk/src/org/apache/cassandra/tools/FileSizeTokenGenerator.java
      - copied unchanged from r758964, incubator/cassandra/trunk/src/org/apache/cassandra/tools/FileSizeTokenGenerator.java
Removed:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/WriteResponse.java
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/BigIntegerToken.java
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/IPartitioner.java
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/OrderPreservingPartitioner.java
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/RandomPartitioner.java
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/StringToken.java
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/Token.java
    incubator/cassandra/trunk/src/org/apache/cassandra/utils/DestructivePQIterator.java
Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java
    incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java
    incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java
    incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java
    incubator/cassandra/trunk/src/org/apache/cassandra/gms/FailureDetector.java
    incubator/cassandra/trunk/src/org/apache/cassandra/gms/Gossiper.java
    incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java
    incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java
    incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java
    incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java
    incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
    incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java
    incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
    incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java
    incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java
    incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java
    incubator/cassandra/trunk/src/org/apache/cassandra/net/Message.java
    incubator/cassandra/trunk/src/org/apache/cassandra/test/DBTest.java
    incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
    incubator/cassandra/trunk/src/org/apache/cassandra/test/SSTableTest.java
    incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleaner.java
    incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java
    incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java
    incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdater.java
    incubator/cassandra/trunk/src/org/apache/cassandra/utils/BasicUtilities.java
    incubator/cassandra/trunk/src/org/apache/cassandra/utils/FBUtilities.java
    incubator/cassandra/trunk/src/org/apache/cassandra/utils/FastObjectHash.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Fri Mar 27 06:45:19 2009
@@ -57,23 +57,6 @@
     public void afterExecute(Runnable r, Throwable t)
     {
         super.afterExecute(r,t);
-
-        if (r instanceof FutureTask) {
-            assert t == null;
-            try
-            {
-                ((FutureTask)r).get();
-            }
-            catch (InterruptedException e)
-            {
-                throw new RuntimeException(e);
-            }
-            catch (ExecutionException e)
-            {
-                t = e;
-            }
-        }
-
         if ( t != null )
         {  
             Context ctx = ThreadLocalContext.get();
@@ -83,10 +66,20 @@
                 
                 if ( object != null )
                 {
-                    logger_.error("In afterExecute() " + t.getClass().getName() + " occured while working with " + object);
+                    logger_.info("**** In afterExecute() " + t.getClass().getName() + " occured while working with " + object + " ****");
+                }
+                else
+                {
+                    logger_.info("**** In afterExecute() " + t.getClass().getName() + " occured ****");
                 }
             }
-            logger_.error("Error in ThreadPoolExecutor", t);
+            
+            Throwable cause = t.getCause();
+            if ( cause != null )
+            {
+                logger_.info( LogUtil.throwableToString(cause) );
+            }
+            logger_.info( LogUtil.throwableToString(t) );
         }
     }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java Fri Mar 27 06:45:19 2009
@@ -18,24 +18,19 @@
 
 package org.apache.cassandra.config;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.io.*;
 
 import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.SystemTable;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.TypeInfo;
+import org.apache.cassandra.db.Table.TableMetadata;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.XMLUtils;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
+import org.apache.cassandra.io.*;
 
 
 /**
@@ -89,7 +84,7 @@
     */
     private static Map<String, Map<String, CFMetaData>> tableToCFMetaDataMap_;
     /* Hashing strategy Random or OPHF */
-    private static String partitionerClass_;
+    private static String hashingStrategy_ = DatabaseDescriptor.random_;
     /* if the size of columns or super-columns are more than this, indexing will kick in */
     private static int columnIndexSizeInKB_;
     /* Size of touch key cache */
@@ -120,358 +115,306 @@
     // the path qualified config file (storage-conf.xml) name
     private static String configFileName_;
 
-    static
+    public static Map<String, Map<String, CFMetaData>> init(String filePath) throws Throwable
     {
-        try
-        {
-            String file = System.getProperty("storage-config") + System.getProperty("file.separator") + "storage-conf.xml";
-            String os = System.getProperty("os.name");
-            XMLUtils xmlUtils = new XMLUtils(file);
-
-            /* Cluster Name */
-            clusterName_ = xmlUtils.getNodeValue("/Storage/ClusterName");
-
-            /* Ganglia servers contact list */
-            gangliaServers_ = xmlUtils.getNodeValues("/Storage/GangliaServers/GangliaServer");
-
-            /* ZooKeeper's address */
-            zkAddress_ = xmlUtils.getNodeValue("/Storage/ZookeeperAddress");
-
-            /* Hashing strategy */
-            partitionerClass_ = xmlUtils.getNodeValue("/Storage/Partitioner");
-            /* Callout location */
-            calloutLocation_ = xmlUtils.getNodeValue("/Storage/CalloutLocation");
-
-            /* JobTracker address */
-            jobTrackerHost_ = xmlUtils.getNodeValue("/Storage/JobTrackerHost");
-
-            /* Job Jar file location */
-            jobJarFileLocation_ = xmlUtils.getNodeValue("/Storage/JobJarFileLocation");
-
-            /* Zookeeper's session timeout */
-            String zkSessionTimeout = xmlUtils.getNodeValue("/Storage/ZookeeperSessionTimeout");
-            if ( zkSessionTimeout != null )
-                zkSessionTimeout_ = Integer.parseInt(zkSessionTimeout);
-
-            /* Data replication factor */
-            String replicationFactor = xmlUtils.getNodeValue("/Storage/ReplicationFactor");
-            if ( replicationFactor != null )
-                replicationFactor_ = Integer.parseInt(replicationFactor);
-
-            /* RPC Timeout */
-            String rpcTimeoutInMillis = xmlUtils.getNodeValue("/Storage/RpcTimeoutInMillis");
-            if ( rpcTimeoutInMillis != null )
-                rpcTimeoutInMillis_ = Integer.parseInt(rpcTimeoutInMillis);
-
-            /* Thread per pool */
-            String threadsPerPool = xmlUtils.getNodeValue("/Storage/ThreadsPerPool");
-            if ( threadsPerPool != null )
-                threadsPerPool_ = Integer.parseInt(threadsPerPool);
-
-            /* TCP port on which the storage system listens */
-            String port = xmlUtils.getNodeValue("/Storage/StoragePort");
-            if ( port != null )
-                storagePort_ = Integer.parseInt(port);
-
-            /* UDP port for control messages */
-            port = xmlUtils.getNodeValue("/Storage/ControlPort");
-            if ( port != null )
-                controlPort_ = Integer.parseInt(port);
-
-            /* HTTP port for HTTP messages */
-            port = xmlUtils.getNodeValue("/Storage/HttpPort");
-            if ( port != null )
-                httpPort_ = Integer.parseInt(port);
-
-            /* Touch Key Cache Size */
-            String touchKeyCacheSize = xmlUtils.getNodeValue("/Storage/TouchKeyCacheSize");
-            if ( touchKeyCacheSize != null )
-                touchKeyCacheSize_ = Integer.parseInt(touchKeyCacheSize);
-
-            /* Number of days to keep the memtable around w/o flushing */
-            String lifetime = xmlUtils.getNodeValue("/Storage/MemtableLifetimeInDays");
-            if ( lifetime != null )
-                memtableLifetime_ = Integer.parseInt(lifetime);
-
-            /* Size of the memtable in memory in MB before it is dumped */
-            String memtableSize = xmlUtils.getNodeValue("/Storage/MemtableSizeInMB");
-            if ( memtableSize != null )
-                memtableSize_ = Integer.parseInt(memtableSize);
-            /* Number of objects in millions in the memtable before it is dumped */
-            String memtableObjectCount = xmlUtils.getNodeValue("/Storage/MemtableObjectCountInMillions");
-            if ( memtableObjectCount != null )
-                memtableObjectCount_ = Integer.parseInt(memtableObjectCount);
-
-            /* This parameter enables or disables consistency checks.
-             * If set to false the read repairs are disable for very
-             * high throughput on reads but at the cost of consistency.*/
-            String doConsistencyCheck = xmlUtils.getNodeValue("/Storage/DoConsistencyChecksBoolean");
-            if ( doConsistencyCheck != null )
-                doConsistencyCheck_ = Boolean.parseBoolean(doConsistencyCheck);
-
-
-            /* read the size at which we should do column indexes */
-            String columnIndexSizeInKB = xmlUtils.getNodeValue("/Storage/ColumnIndexSizeInKB");
-            if(columnIndexSizeInKB == null)
-            {
-                columnIndexSizeInKB_ = 64;
-            }
-            else
-            {
-                columnIndexSizeInKB_ = Integer.parseInt(columnIndexSizeInKB);
-            }
-
-            /* metadata directory */
-            metadataDirectory_ = xmlUtils.getNodeValue("/Storage/MetadataDirectory");
-            if ( metadataDirectory_ != null )
-                FileUtils.createDirectory(metadataDirectory_);
-            else
-            {
-                if ( os.equals("Linux") )
-                {
-                    metadataDirectory_ = "/var/storage/system";
-                }
-            }
-
-            /* snapshot directory */
-            snapshotDirectory_ = xmlUtils.getNodeValue("/Storage/SnapshotDirectory");
-            if ( snapshotDirectory_ != null )
-                FileUtils.createDirectory(snapshotDirectory_);
-            else
-            {
-                    snapshotDirectory_ = metadataDirectory_ + System.getProperty("file.separator") + "snapshot";
-            }
-
-            /* map output directory */
-            mapOutputDirectories_ = xmlUtils.getNodeValues("/Storage/MapOutputDirectories/MapOutputDirectory");
-            if ( mapOutputDirectories_.length > 0 )
-            {
-                for ( String mapOutputDirectory : mapOutputDirectories_ )
-                    FileUtils.createDirectory(mapOutputDirectory);
-            }
+        /* Read the configuration file to retrieve DB related properties. */
+        String file = filePath + System.getProperty("file.separator") + "storage-conf.xml";
+        return initInternal(file);
+    }
 
-            /* data file directory */
-            dataFileDirectories_ = xmlUtils.getNodeValues("/Storage/DataFileDirectories/DataFileDirectory");
-            if ( dataFileDirectories_.length > 0 )
-            {
-                for ( String dataFileDirectory : dataFileDirectories_ )
-                    FileUtils.createDirectory(dataFileDirectory);
-            }
-            else
-            {
-                if ( os.equals("Linux") )
-                {
-                    dataFileDirectories_ = new String[]{"/var/storage/data"};
-                }
-            }
+    public static Map<String, Map<String, CFMetaData>> init() throws Throwable
+    {
+        /* Read the configuration file to retrieve DB related properties. */
+        configFileName_ = System.getProperty("storage-config") + System.getProperty("file.separator") + "storage-conf.xml";
+        return initInternal(configFileName_);
+    }
+    
+    public static Map<String, Map<String, CFMetaData>> initInternal(String file) throws Throwable
+    {
+        String os = System.getProperty("os.name");
+        XMLUtils xmlUtils = new XMLUtils(file);        
 
-            /* bootstrap file directory */
-            bootstrapFileDirectory_ = xmlUtils.getNodeValue("/Storage/BootstrapFileDirectory");
-            if ( bootstrapFileDirectory_ != null )
-                FileUtils.createDirectory(bootstrapFileDirectory_);
-            else
-            {
-                if ( os.equals("Linux") )
-                {
-                    bootstrapFileDirectory_ = "/var/storage/bootstrap";
-                }
-            }
+        /* Cluster Name */
+        clusterName_ = xmlUtils.getNodeValue("/Storage/ClusterName");
 
-            /* commit log directory */
-            logFileDirectory_ = xmlUtils.getNodeValue("/Storage/CommitLogDirectory");
-            if ( logFileDirectory_ != null )
-                FileUtils.createDirectory(logFileDirectory_);
-            else
-            {
-                if ( os.equals("Linux") )
-                {
-                    logFileDirectory_ = "/var/storage/commitlog";
-                }
-            }
+        /* Ganglia servers contact list */
+        gangliaServers_ = xmlUtils.getNodeValues("/Storage/GangliaServers/GangliaServer");
+        
+        /* ZooKeeper's address */
+        zkAddress_ = xmlUtils.getNodeValue("/Storage/ZookeeperAddress");
+        
+        /* Hashing strategy */
+        hashingStrategy_ = xmlUtils.getNodeValue("/Storage/HashingStrategy");
+        /* Callout location */
+        calloutLocation_ = xmlUtils.getNodeValue("/Storage/CalloutLocation");
+        
+        /* JobTracker address */
+        jobTrackerHost_ = xmlUtils.getNodeValue("/Storage/JobTrackerHost");
+        
+        /* Job Jar file location */
+        jobJarFileLocation_ = xmlUtils.getNodeValue("/Storage/JobJarFileLocation");       
+        
+        /* Zookeeper's session timeout */
+        String zkSessionTimeout = xmlUtils.getNodeValue("/Storage/ZookeeperSessionTimeout");
+        if ( zkSessionTimeout != null )
+            zkSessionTimeout_ = Integer.parseInt(zkSessionTimeout);
+
+        /* Data replication factor */
+        String replicationFactor = xmlUtils.getNodeValue("/Storage/ReplicationFactor");
+        if ( replicationFactor != null )
+        	replicationFactor_ = Integer.parseInt(replicationFactor);
+
+        /* RPC Timeout */
+        String rpcTimeoutInMillis = xmlUtils.getNodeValue("/Storage/RpcTimeoutInMillis");
+        if ( rpcTimeoutInMillis != null )
+        	rpcTimeoutInMillis_ = Integer.parseInt(rpcTimeoutInMillis);
+
+        /* Thread per pool */
+        String threadsPerPool = xmlUtils.getNodeValue("/Storage/ThreadsPerPool");
+        if ( threadsPerPool != null )
+            threadsPerPool_ = Integer.parseInt(threadsPerPool);
+
+        /* TCP port on which the storage system listens */
+        String port = xmlUtils.getNodeValue("/Storage/StoragePort");
+        if ( port != null )
+            storagePort_ = Integer.parseInt(port);
+
+        /* UDP port for control messages */
+        port = xmlUtils.getNodeValue("/Storage/ControlPort");
+        if ( port != null )
+            controlPort_ = Integer.parseInt(port);
+
+        /* HTTP port for HTTP messages */
+        port = xmlUtils.getNodeValue("/Storage/HttpPort");
+        if ( port != null )
+            httpPort_ = Integer.parseInt(port);
+
+        /* Touch Key Cache Size */
+        String touchKeyCacheSize = xmlUtils.getNodeValue("/Storage/TouchKeyCacheSize");
+        if ( touchKeyCacheSize != null )
+            touchKeyCacheSize_ = Integer.parseInt(touchKeyCacheSize);
+
+        /* Number of days to keep the memtable around w/o flushing */
+        String lifetime = xmlUtils.getNodeValue("/Storage/MemtableLifetimeInDays");
+        if ( lifetime != null )
+            memtableLifetime_ = Integer.parseInt(lifetime);
+        
+        /* Size of the memtable in memory in MB before it is dumped */
+        String memtableSize = xmlUtils.getNodeValue("/Storage/MemtableSizeInMB");
+        if ( memtableSize != null )
+        	memtableSize_ = Integer.parseInt(memtableSize);
+        /* Number of objects in millions in the memtable before it is dumped */
+        String memtableObjectCount = xmlUtils.getNodeValue("/Storage/MemtableObjectCountInMillions");
+        if ( memtableObjectCount != null )
+        	memtableObjectCount_ = Integer.parseInt(memtableObjectCount);
+        
+        /* This parameter enables or disables consistency checks. 
+         * If set to false the read repairs are disable for very
+         * high throughput on reads but at the cost of consistency.*/
+        String doConsistencyCheck = xmlUtils.getNodeValue("/Storage/DoConsistencyChecksBoolean");
+        if ( doConsistencyCheck != null )
+        	doConsistencyCheck_ = Boolean.parseBoolean(doConsistencyCheck);
+        
+        
+        /* read the size at which we should do column indexes */
+        String columnIndexSizeInKB = xmlUtils.getNodeValue("/Storage/ColumnIndexSizeInKB");
+        if(columnIndexSizeInKB == null)
+        {
+        	columnIndexSizeInKB_ = 64;
+        }
+        else
+        {
+        	columnIndexSizeInKB_ = Integer.parseInt(columnIndexSizeInKB);
+        }
 
-            /* threshold after which commit log should be rotated. */
-            String value = xmlUtils.getNodeValue("/Storage/CommitLogRotationThresholdInMB");
-            if ( value != null)
-                logRotationThreshold_ = Integer.parseInt(value) * 1024 * 1024;
-
-            /* fast sync option */
-            value = xmlUtils.getNodeValue("/Storage/CommitLogFastSync");
-            if ( value != null )
-                fastSync_ = Boolean.parseBoolean(value);
-
-            tableToCFMetaDataMap_ = new HashMap<String, Map<String, CFMetaData>>();
-
-            /* Rack Aware option */
-            value = xmlUtils.getNodeValue("/Storage/RackAware");
-            if ( value != null )
-                rackAware_ = Boolean.parseBoolean(value);
-
-            /* Read the table related stuff from config */
-            NodeList tables = xmlUtils.getRequestedNodeList("/Storage/Tables/Table");
-            int size = tables.getLength();
-            if (size == 0) {
-                throw new UnsupportedOperationException("A Table must be configured");
-            }
-            for ( int i = 0; i < size; ++i )
+        /* metadata directory */
+        metadataDirectory_ = xmlUtils.getNodeValue("/Storage/MetadataDirectory");
+        if ( metadataDirectory_ != null )
+            FileUtils.createDirectory(metadataDirectory_);
+        else
+        {
+            if ( os.equals("Linux") )
             {
-                Node table = tables.item(i);
-
-                /* parsing out the table name */
-                String tName = XMLUtils.getAttributeValue(table, "Name");
-                tables_.add(tName);
-                tableToCFMetaDataMap_.put(tName, new HashMap<String, CFMetaData>());
-
-                String xqlTable = "/Storage/Tables/Table[@Name='" + tName + "']/";
-                NodeList columnFamilies = xmlUtils.getRequestedNodeList(xqlTable + "ColumnFamily");
-
-                // get name of the rowKey for this table
-                String n_rowKey = xmlUtils.getNodeValue(xqlTable + "RowKey");
-                if (n_rowKey == null)
-                    n_rowKey = d_rowKey_;
-
-                //NodeList columnFamilies = xmlUtils.getRequestedNodeList(table, "ColumnFamily");
-                int size2 = columnFamilies.getLength();
-
-                for ( int j = 0; j < size2; ++j )
-                {
-                    Node columnFamily = columnFamilies.item(j);
-                    String cName = XMLUtils.getAttributeValue(columnFamily, "Name");
-                    if (cName == null)
-                    {
-                        throw new IllegalArgumentException("ColumnFamily element missing Name attribute: " + columnFamily);
-                    }
-                    String xqlCF = xqlTable + "ColumnFamily[@Name='" + cName + "']/";
-
-                    /* squirrel away the application column families */
-                    applicationColumnFamilies_.add(cName);
-
-                    // Parse out the column type
-                    String columnType = xmlUtils.getAttributeValue(columnFamily, "ColumnType");
-                    columnType = ColumnFamily.getColumnType(columnType);
-
-                    // Parse out the column family sorting property for columns
-                    String columnIndexProperty = XMLUtils.getAttributeValue(columnFamily, "ColumnSort");
-                    String columnIndexType = ColumnFamily.getColumnSortProperty(columnIndexProperty);
-
-                    // Parse out user-specified logical names for the various dimensions
-                    // of a the column family from the config.
-                    String n_superColumnMap = xmlUtils.getNodeValue(xqlCF + "SuperColumnMap");
-                    if (n_superColumnMap == null)
-                        n_superColumnMap = d_superColumnMap_;
-
-                    String n_superColumnKey = xmlUtils.getNodeValue(xqlCF + "SuperColumnKey");
-                    if (n_superColumnKey == null)
-                        n_superColumnKey = d_superColumnKey_;
-
-                    String n_columnMap = xmlUtils.getNodeValue(xqlCF + "ColumnMap");
-                    if (n_columnMap == null)
-                        n_columnMap = d_columnMap_;
-
-                    String n_columnKey = xmlUtils.getNodeValue(xqlCF + "ColumnKey");
-                    if (n_columnKey == null)
-                        n_columnKey = d_columnKey_;
-
-                    String n_columnValue = xmlUtils.getNodeValue(xqlCF + "ColumnValue");
-                    if (n_columnValue == null)
-                        n_columnValue = d_columnValue_;
-
-                    String n_columnTimestamp = xmlUtils.getNodeValue(xqlCF + "ColumnTimestamp");
-                    if (n_columnTimestamp == null)
-                        n_columnTimestamp = d_columnTimestamp_;
-
-                    // now populate the column family meta data and
-                    // insert it into the table dictionary.
-                    CFMetaData cfMetaData = new CFMetaData();
-
-                    cfMetaData.tableName = tName;
-                    cfMetaData.cfName = cName;
-
-                    cfMetaData.columnType = columnType;
-                    cfMetaData.indexProperty_ = columnIndexType;
-
-                    cfMetaData.n_rowKey = n_rowKey;
-                    cfMetaData.n_columnMap = n_columnMap;
-                    cfMetaData.n_columnKey = n_columnKey;
-                    cfMetaData.n_columnValue = n_columnValue;
-                    cfMetaData.n_columnTimestamp = n_columnTimestamp;
-                    if ("Super".equals(columnType))
-                    {
-                        cfMetaData.n_superColumnKey = n_superColumnKey;
-                        cfMetaData.n_superColumnMap = n_superColumnMap;
-                    }
-
-                    tableToCFMetaDataMap_.get(tName).put(cName, cfMetaData);
-                }
+                metadataDirectory_ = "/var/storage/system";
             }
+        }
 
-            /* Load the seeds for node contact points */
-            String[] seeds = xmlUtils.getNodeValues("/Storage/Seeds/Seed");
-            for( int i = 0; i < seeds.length; ++i )
-            {
-                seeds_.add( seeds[i] );
-            }
+        /* snapshot directory */
+        snapshotDirectory_ = xmlUtils.getNodeValue("/Storage/SnapshotDirectory");
+        if ( snapshotDirectory_ != null )
+            FileUtils.createDirectory(snapshotDirectory_);
+        else
+        {
+            	snapshotDirectory_ = metadataDirectory_ + System.getProperty("file.separator") + "snapshot";
         }
-        catch (Exception e)
+        
+        /* map output directory */
+        mapOutputDirectories_ = xmlUtils.getNodeValues("/Storage/MapOutputDirectories/MapOutputDirectory");
+        if ( mapOutputDirectories_.length > 0 )
         {
-            throw new RuntimeException(e);
+            for ( String mapOutputDirectory : mapOutputDirectories_ )
+                FileUtils.createDirectory(mapOutputDirectory);
         }
-
-        try
+        
+        /* data file directory */
+        dataFileDirectories_ = xmlUtils.getNodeValues("/Storage/DataFileDirectories/DataFileDirectory");
+        if ( dataFileDirectories_.length > 0 )
         {
-            storeMetadata();
+        	for ( String dataFileDirectory : dataFileDirectories_ )
+        		FileUtils.createDirectory(dataFileDirectory);
         }
-        catch (IOException e)
+        else
         {
-            throw new RuntimeException(e);
+            if ( os.equals("Linux") )
+            {
+                dataFileDirectories_ = new String[]{"/var/storage/data"};
+            }
         }
-    }
 
-    /*
-     * Create the metadata tables. This table has information about
-     * the table name and the column families that make up the table.
-     * Each column family also has an associated ID which is an int.
-    */
-    private static void storeMetadata() throws IOException
-    {
-        AtomicInteger idGenerator = new AtomicInteger(0);
-        Set<String> tables = tableToCFMetaDataMap_.keySet();
+        /* bootstrap file directory */
+        bootstrapFileDirectory_ = xmlUtils.getNodeValue("/Storage/BootstrapFileDirectory");
+        if ( bootstrapFileDirectory_ != null )
+            FileUtils.createDirectory(bootstrapFileDirectory_);
+        else
+        {
+            if ( os.equals("Linux") )
+            {
+                bootstrapFileDirectory_ = "/var/storage/bootstrap";
+            }
+        }
 
-        for ( String table : tables )
+        /* commit log directory */
+        logFileDirectory_ = xmlUtils.getNodeValue("/Storage/CommitLogDirectory");
+        if ( logFileDirectory_ != null )
+            FileUtils.createDirectory(logFileDirectory_);
+        else
         {
-            Table.TableMetadata tmetadata = Table.TableMetadata.instance();
-            if (tmetadata.isEmpty())
+            if ( os.equals("Linux") )
             {
-                tmetadata = Table.TableMetadata.instance();
-                /* Column families associated with this table */
-                Map<String, CFMetaData> columnFamilies = tableToCFMetaDataMap_.get(table);
+                logFileDirectory_ = "/var/storage/commitlog";
+            }
+        }
+
+        /* threshold after which commit log should be rotated. */
+        String value = xmlUtils.getNodeValue("/Storage/CommitLogRotationThresholdInMB");
+        if ( value != null)
+            logRotationThreshold_ = Integer.parseInt(value) * 1024 * 1024;
+
+        /* fast sync option */
+        value = xmlUtils.getNodeValue("/Storage/CommitLogFastSync");
+        if ( value != null )
+            fastSync_ = Boolean.parseBoolean(value);
+
+        tableToCFMetaDataMap_ = new HashMap<String, Map<String, CFMetaData>>();
+
+        /* Rack Aware option */
+        value = xmlUtils.getNodeValue("/Storage/RackAware");
+        if ( value != null )
+            rackAware_ = Boolean.parseBoolean(value);
+
+        /* Read the table related stuff from config */
+        NodeList tables = xmlUtils.getRequestedNodeList("/Storage/Tables/Table");
+        int size = tables.getLength();
+        for ( int i = 0; i < size; ++i )
+        {
+            Node table = tables.item(i);
 
-                for (String columnFamily : columnFamilies.keySet())
+            /* parsing out the table name */
+            String tName = XMLUtils.getAttributeValue(table, "Name");
+            tables_.add(tName);
+            tableToCFMetaDataMap_.put(tName, new HashMap<String, CFMetaData>());
+
+            String xqlTable = "/Storage/Tables/Table[@Name='" + tName + "']/";
+            NodeList columnFamilies = xmlUtils.getRequestedNodeList(xqlTable + "ColumnFamily");
+
+            // get name of the rowKey for this table
+            String n_rowKey = xmlUtils.getNodeValue(xqlTable + "RowKey");
+            if (n_rowKey == null)
+                n_rowKey = d_rowKey_;
+
+            //NodeList columnFamilies = xmlUtils.getRequestedNodeList(table, "ColumnFamily");            
+            int size2 = columnFamilies.getLength();
+
+            for ( int j = 0; j < size2; ++j )
+            {
+                Node columnFamily = columnFamilies.item(j);
+                String cName = XMLUtils.getAttributeValue(columnFamily, "Name");
+                String xqlCF = xqlTable + "ColumnFamily[@Name='" + cName + "']/";
+
+                /* squirrel away the application column families */
+                applicationColumnFamilies_.add(cName);
+
+                // Parse out the column type
+                String columnType = xmlUtils.getAttributeValue(columnFamily, "ColumnType");
+                columnType = ColumnFamily.getColumnType(columnType);
+
+                // Parse out the column family sorting property for columns
+                String columnIndexProperty = XMLUtils.getAttributeValue(columnFamily, "ColumnSort");
+                String columnIndexType = ColumnFamily.getColumnSortProperty(columnIndexProperty);
+
+                // Parse out user-specified logical names for the various dimensions
+                // of a the column family from the config.
+                String n_superColumnMap = xmlUtils.getNodeValue(xqlCF + "SuperColumnMap");
+                if (n_superColumnMap == null)
+                    n_superColumnMap = d_superColumnMap_;
+                
+                String n_superColumnKey = xmlUtils.getNodeValue(xqlCF + "SuperColumnKey");
+                if (n_superColumnKey == null)
+                    n_superColumnKey = d_superColumnKey_;
+                
+                String n_columnMap = xmlUtils.getNodeValue(xqlCF + "ColumnMap");
+                if (n_columnMap == null)
+                    n_columnMap = d_columnMap_;
+                
+                String n_columnKey = xmlUtils.getNodeValue(xqlCF + "ColumnKey");
+                if (n_columnKey == null)
+                    n_columnKey = d_columnKey_;
+
+                String n_columnValue = xmlUtils.getNodeValue(xqlCF + "ColumnValue");
+                if (n_columnValue == null)
+                    n_columnValue = d_columnValue_;
+
+                String n_columnTimestamp = xmlUtils.getNodeValue(xqlCF + "ColumnTimestamp");
+                if (n_columnTimestamp == null)
+                    n_columnTimestamp = d_columnTimestamp_;
+
+                // now populate the column family meta data and
+                // insert it into the table dictionary. 
+                CFMetaData cfMetaData = new CFMetaData();
+                
+                cfMetaData.tableName = tName;
+                cfMetaData.cfName = cName;
+                
+                cfMetaData.columnType = columnType;
+                cfMetaData.indexProperty_ = columnIndexType;
+
+                cfMetaData.n_rowKey = n_rowKey;
+                cfMetaData.n_columnMap = n_columnMap;
+                cfMetaData.n_columnKey = n_columnKey;
+                cfMetaData.n_columnValue = n_columnValue;
+                cfMetaData.n_columnTimestamp = n_columnTimestamp;
+                if ("Super".equals(columnType))
                 {
-                    tmetadata.add(columnFamily, idGenerator.getAndIncrement(), DatabaseDescriptor.getColumnType(columnFamily));
+                    cfMetaData.n_superColumnKey = n_superColumnKey;
+                    cfMetaData.n_superColumnMap = n_superColumnMap;
                 }
 
-                /*
-                 * Here we add all the system related column families.
-                */
-                /* Add the TableMetadata column family to this map. */
-                tmetadata.add(Table.TableMetadata.cfName_, idGenerator.getAndIncrement());
-                /* Add the LocationInfo column family to this map. */
-                tmetadata.add(SystemTable.cfName_, idGenerator.getAndIncrement());
-                /* Add the recycle column family to this map. */
-                tmetadata.add(Table.recycleBin_, idGenerator.getAndIncrement());
-                /* Add the Hints column family to this map. */
-                tmetadata.add(Table.hints_, idGenerator.getAndIncrement(), ColumnFamily.getColumnType("Super"));
-                tmetadata.apply();
-                idGenerator.set(0);
+                tableToCFMetaDataMap_.get(tName).put(cName, cfMetaData);
             }
         }
-    }
-
 
+        /* Load the seeds for node contact points */
+        String[] seeds = xmlUtils.getNodeValues("/Storage/Seeds/Seed");
+        for( int i = 0; i < seeds.length; ++i )
+        {            
+            seeds_.add( seeds[i] );
+        }
+        return tableToCFMetaDataMap_;
+    }
     
-    public static String getPartitionerClass()
+    public static String getHashingStrategy()
     {
-        return partitionerClass_;
+        return hashingStrategy_;
     }
     
     public static String getZkAddress()
@@ -792,14 +735,9 @@
             return TypeInfo.LONG;
         }
     }
-
-    public static Map<String, Map<String, CFMetaData>> getTableToColumnFamilyMap()
-    {
-        return tableToCFMetaDataMap_;
-    }
-
-    public static String getTableName()
+    
+    public static void main(String[] args) throws Throwable
     {
-        return tables_.get(0);
+        DatabaseDescriptor.initInternal("C:\\Engagements\\Cassandra-Golden\\storage-conf.xml");
     }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java Fri Mar 27 06:45:19 2009
@@ -33,6 +33,7 @@
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.log4j.Logger;
+import org.apache.cassandra.db.*;
 
 /**
  * A Row Source Defintion (RSD) for doing a range query on a column map
@@ -111,7 +112,7 @@
         List<Map<String, String>> rows = new LinkedList<Map<String, String>>();
         if (row != null)
         {
-            Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+            Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
             if (cfMap != null && cfMap.size() > 0)
             {
                 ColumnFamily cfamily = cfMap.get(cfMetaData_.cfName);

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java Fri Mar 27 06:45:19 2009
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.cql.common;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -31,8 +32,11 @@
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.column_t;
+import org.apache.cassandra.service.superColumn_t;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.log4j.Logger;
+import org.apache.cassandra.db.*;
 
 /**
  * A Row Source Defintion (RSD) for doing a super column range query on a Super Column Family.
@@ -80,7 +84,7 @@
         List<Map<String, String>> rows = new LinkedList<Map<String, String>>();
         if (row != null)
         {
-            Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+            Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
             if (cfMap != null && cfMap.size() > 0)
             {
                 ColumnFamily cfamily = cfMap.get(cfMetaData_.cfName);

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java Fri Mar 27 06:45:19 2009
@@ -33,6 +33,7 @@
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.log4j.Logger;
+import org.apache.cassandra.db.*;
 
 /**
  * A Row Source Defintion (RSD) for looking up a unique column within a column family.
@@ -95,7 +96,7 @@
 
         if (row != null)
         {
-            Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+            Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
             if (cfMap != null && cfMap.size() > 0)
             {
                 ColumnFamily cfamily = cfMap.get(cfMetaData_.cfName);

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/gms/FailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/FailureDetector.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/FailureDetector.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/FailureDetector.java Fri Mar 27 06:45:19 2009
@@ -177,7 +177,7 @@
         /* We need this so that we do not suspect a convict. */
         boolean isConvicted = false;
         double phi = hbWnd.phi(now);
-        logger_.trace("PHI for " + ep + " : " + phi);
+        logger_.info("PHI for " + ep + " : " + phi);
         
         /*
         if ( phi > phiConvictThreshold_ )

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/Gossiper.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/Gossiper.java Fri Mar 27 06:45:19 2009
@@ -76,7 +76,7 @@
                         if ( !bVal )
                             doGossipToSeed(message);
 
-                        logger_.trace("Performing status check ...");
+                        logger_.debug("Performing status check ...");
                         doStatusCheck();
                     }
                 }
@@ -344,7 +344,7 @@
             sb.append(gDigest);
             sb.append(" ");
         }
-        logger_.trace("Gossip Digests are : " + sb.toString());
+        logger_.debug("Gossip Digests are : " + sb.toString());
     }
 
     public int getCurrentGenerationNumber(EndPoint endpoint)
@@ -367,7 +367,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
         DataOutputStream dos = new DataOutputStream(bos);
         GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
-        logger_.trace("@@@@ Size of GossipDigestAckMessage is " + bos.toByteArray().length);
+        logger_.debug("@@@@ Size of GossipDigestAckMessage is " + bos.toByteArray().length);
         Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK_VERB, new Object[]{bos.toByteArray()});
         return message;
     }
@@ -392,7 +392,7 @@
         }
 
         EndPoint to = eps.get(++rrIndex_);
-        logger_.trace("Sending a GossipDigestSynMessage to " + to + " ...");
+        logger_.info("Sending a GossipDigestSynMessage to " + to + " ...");
         MessagingService.getMessagingInstance().sendUdpOneWay(message, to);
         return seeds_.contains(to);
     }
@@ -411,7 +411,7 @@
         List<EndPoint> liveEndPoints = new ArrayList<EndPoint>(epSet);
         int index = (size == 1) ? 0 : random_.nextInt(size);
         EndPoint to = liveEndPoints.get(index);
-        logger_.trace("Sending a GossipDigestSynMessage to " + to + " ...");
+        logger_.info("Sending a GossipDigestSynMessage to " + to + " ...");
         MessagingService.getMessagingInstance().sendUdpOneWay(message, to);
         return seeds_.contains(to);
     }
@@ -977,7 +977,7 @@
     public void doVerb(Message message)
     {
         EndPoint from = message.getFrom();
-        logger_.trace("Received a GossipDigestSynMessage from " + from);
+        logger_.info("Received a GossipDigestSynMessage from " + from);
 
         byte[] bytes = (byte[])message.getMessageBody()[0];
         DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
@@ -1001,7 +1001,7 @@
 
             GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap);
             Message gDigestAckMessage = Gossiper.instance().makeGossipDigestAckMessage(gDigestAck);
-            logger_.trace("Sending a GossipDigestAckMessage to " + from);
+            logger_.info("Sending a GossipDigestAckMessage to " + from);
             MessagingService.getMessagingInstance().sendUdpOneWay(gDigestAckMessage, from);
         }
         catch (IOException e)
@@ -1061,7 +1061,7 @@
     public void doVerb(Message message)
     {
         EndPoint from = message.getFrom();
-        logger_.trace("Received a GossipDigestAckMessage from " + from);
+        logger_.info("Received a GossipDigestAckMessage from " + from);
 
         byte[] bytes = (byte[])message.getMessageBody()[0];
         DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
@@ -1091,7 +1091,7 @@
 
             GossipDigestAck2Message gDigestAck2 = new GossipDigestAck2Message(deltaEpStateMap);
             Message gDigestAck2Message = Gossiper.instance().makeGossipDigestAck2Message(gDigestAck2);
-            logger_.trace("Sending a GossipDigestAck2Message to " + from);
+            logger_.info("Sending a GossipDigestAck2Message to " + from);
             MessagingService.getMessagingInstance().sendUdpOneWay(gDigestAck2Message, from);
         }
         catch ( IOException e )
@@ -1108,7 +1108,7 @@
     public void doVerb(Message message)
     {
         EndPoint from = message.getFrom();
-        logger_.trace("Received a GossipDigestAck2Message from " + from);
+        logger_.info("Received a GossipDigestAck2Message from " + from);
 
         byte[] bytes = (byte[])message.getMessageBody()[0];
         DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java Fri Mar 27 06:45:19 2009
@@ -21,10 +21,10 @@
  * Section of a file that needs to be scanned
  * is represented by this class.
 */
-public class Coordinate
+class Coordinate
 {
-    public final long start_;
-    public final long end_;
+    long start_;
+    long end_;
     
     Coordinate(long start, long end)
     {

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java Fri Mar 27 06:45:19 2009
@@ -92,6 +92,31 @@
 
     /**
      * This method dumps the next key/value into the DataOuputStream
+     * passed in.
+     *
+     * @param key key we are interested in.
+     * @param dos DataOutputStream that needs to be filled.
+     * @param section region of the file that needs to be read
+     * @throws IOException
+     * @return the number of bytes read.
+    */
+    public long next(String key, DataOutputBuffer bufOut, Coordinate section) throws IOException;
+
+    /**
+     * This method dumps the next key/value into the DataOuputStream
+     * passed in.
+     *
+     * @param key key we are interested in.
+     * @param dos DataOutputStream that needs to be filled.
+     * @param column name of the column in our format.
+     * @param section region of the file that needs to be read
+     * @throws IOException
+     * @return number of bytes that were read.
+    */
+    public long next(String key, DataOutputBuffer bufOut, String column, Coordinate section) throws IOException;
+
+    /**
+     * This method dumps the next key/value into the DataOuputStream
      * passed in. Always use this method to query for application
      * specific data as it will have indexes.
      *
@@ -105,9 +130,23 @@
      * @return number of bytes read.
      *
     */
-    public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, List<String> columnNames, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException;
+    public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, List<String> columnNames, Coordinate section) throws IOException;
     
     /**
+     * This method dumps the next key/value into the DataOuputStream
+     * passed in.
+     *
+     * @param key key we are interested in.
+     * @param dos DataOutputStream that needs to be filled.
+     * @param column name of the column in our format.
+     * @param timeRange time range we are interested in.
+     * @param section region of the file that needs to be read
+     * @throws IOException
+     * @return number of bytes that were read.
+    */
+    public long next(String key, DataOutputBuffer bufOut, String column, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException;
+
+    /**
      * Close the file after reading.
      * @throws IOException
      */

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java Fri Mar 27 06:45:19 2009
@@ -18,8 +18,11 @@
 
 package org.apache.cassandra.io;
 
+import java.io.File;
 import java.io.IOException;
 
+import org.apache.cassandra.db.PrimaryKey;
+
 
 /**
  * An interface for writing into the SequenceFile abstraction.

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java Fri Mar 27 06:45:19 2009
@@ -18,30 +18,24 @@
 
 package org.apache.cassandra.io;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Hashtable;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import org.apache.log4j.Logger;
+import java.io.*;
+import java.math.BigInteger;
+import java.nio.channels.FileChannel;
+import java.util.*;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.service.PartitionerType;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.BasicUtilities;
 import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.dht.IPartitioner;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
 
 /**
  * This class is built on top of the SequenceFile. It stores
@@ -144,26 +138,62 @@
     }
     
     /**
+     * This compares two strings and does it in reverse
+     * order.
+     * 
+     * @author alakshman
+     *
+     */
+    private static class OrderPreservingPartitionerComparator implements Comparator<String>
+    {
+        public int compare(String c1, String c2) 
+        {
+            return c2.compareTo(c1);
+        } 
+    }
+
+    /**
+     * This class compares two BigInteger's passes in
+     * as strings and does so in reverse order.
+     * @author alakshman
+     *
+     */
+    private static class RandomPartitionerComparator implements Comparator<String>
+    {
+        public int compare(String c1, String c2) 
+        {
+            BigInteger b1 = new BigInteger(c1);
+            BigInteger b2 = new BigInteger(c2);
+            return b2.compareTo(b1);
+        } 
+    }
+    
+    /**
      * This is a simple container for the index Key and its corresponding position
      * in the data file. Binary search is performed on a list of these objects
      * to lookup keys within the SSTable data file.
     */
     public static class KeyPositionInfo implements Comparable<KeyPositionInfo>
     {
-        private final String decoratedKey;
+        private String key_;
         private long position_;
 
-        public KeyPositionInfo(String decoratedKey)
+        public KeyPositionInfo(String key)
         {
-            this.decoratedKey = decoratedKey;
+            key_ = key;            
         }
 
-        public KeyPositionInfo(String decoratedKey, long position)
+        public KeyPositionInfo(String key, long position)
         {
-            this(decoratedKey);
+            this(key);
             position_ = position;
         }
 
+        public String key()
+        {
+            return key_;
+        }
+
         public long position()
         {
             return position_;
@@ -171,13 +201,25 @@
 
         public int compareTo(KeyPositionInfo kPosInfo)
         {
-            IPartitioner p = StorageService.getPartitioner();
-            return p.getDecoratedKeyComparator().compare(decoratedKey, kPosInfo.decoratedKey);
+            int value = 0;
+            PartitionerType pType = StorageService.getPartitionerType();
+            switch( pType )
+            {
+                case OPHF:
+                    value = key_.compareTo(kPosInfo.key_);                    
+                    break;
+                    
+                default:
+                    BigInteger b = new BigInteger(key_);
+                    value = b.compareTo( new BigInteger(kPosInfo.key_) );
+                    break;
+            }
+            return value;
         }
 
         public String toString()
         {
-        	return decoratedKey + ":" + position_;
+        	return key_ + ":" + position_;
         }
     }
     
@@ -262,7 +304,7 @@
         List<String> indexedKeys = new ArrayList<String>();
         for ( KeyPositionInfo keyPositionInfo : keyPositionInfos )
         {
-            indexedKeys.add(keyPositionInfo.decoratedKey);
+            indexedKeys.add(keyPositionInfo.key_);
         }
 
         Collections.sort(indexedKeys);
@@ -319,13 +361,13 @@
      * Determines if the given key is in the specified file. If the
      * key is not present then we skip processing this file.
     */
-    public static boolean isKeyInFile(String clientKey, String filename)
+    public static boolean isKeyInFile(String key, String filename)
     {
         boolean bVal = false;
         BloomFilter bf = bfs_.get(filename);
         if ( bf != null )
         {
-            bVal = bf.isPresent(clientKey);
+            bVal = bf.isPresent(key);
         }
         return bVal;
     }
@@ -358,13 +400,48 @@
     public SSTable(String directory, String filename) throws IOException
     {  
         dataFile_ = directory + System.getProperty("file.separator") + filename + "-Data.db";                
-        blockIndex_ = new TreeMap<String, BlockMetadata>(StorageService.getPartitioner().getReverseDecoratedKeyComparator());
-        blockIndexes_ = new ArrayList<SortedMap<String, BlockMetadata>>();
-        dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4*1024*1024);
-        // dataWriter_ = SequenceFile.chksumWriter(dataFile_, 4*1024*1024);
-        SSTable.positionAfterFirstBlockIndex_ = dataWriter_.getCurrentPosition();
+        blockIndex_ = new TreeMap<String, BlockMetadata>(Collections.reverseOrder());
+        blockIndexes_ = new ArrayList<SortedMap<String, BlockMetadata>>();        
+        // dataWriter_ = SequenceFile.writer(dataFile_);
+        dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4*1024*1024);        
+        SSTable.positionAfterFirstBlockIndex_ = dataWriter_.getCurrentPosition(); 
     } 
-
+    
+    private void initBlockIndex()
+    {
+        initBlockIndex(StorageService.getPartitionerType());
+    }
+    
+    private void initBlockIndex(PartitionerType pType)
+    {
+        switch ( pType )
+        {
+            case OPHF: 
+                blockIndex_ = new TreeMap<String, BlockMetadata>( new SSTable.OrderPreservingPartitionerComparator() );                
+               break;
+               
+            default:
+                blockIndex_ = new TreeMap<String, BlockMetadata>( new SSTable.RandomPartitionerComparator() );
+                break;
+        }
+    }
+    
+    /**
+     * This ctor is used for DB writes into the SSTable. Use this
+     * version to write to the SSTable.
+    */
+    public SSTable(String directory, String filename, PartitionerType pType) throws IOException
+    {        
+        dataFile_ = directory + System.getProperty("file.separator") + filename + "-Data.db";
+        // dataWriter_ = SequenceFile.writer(dataFile_);
+        dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4*1024*1024);    
+        // dataWriter_ = SequenceFile.chksumWriter(dataFile_, 4*1024*1024);
+        SSTable.positionAfterFirstBlockIndex_ = dataWriter_.getCurrentPosition(); 
+        /* set up the block index based on partition type */
+        initBlockIndex(pType);
+        blockIndexes_ = new ArrayList<SortedMap<String, BlockMetadata>>();
+    }
+    
     private void loadBloomFilter(IFileReader indexReader, long size) throws IOException
     {        
         /* read the position of the bloom filter */
@@ -381,8 +458,8 @@
         indexReader.next(bufOut);   
         bufOut.close();
         bufIn.reset(bufOut.getData(), bufOut.getLength());
-        String clientKey = bufIn.readUTF();
-        if ( clientKey.equals(SequenceFile.marker_) )
+        String key = bufIn.readUTF();
+        if ( key.equals(SequenceFile.marker_) )
         {
             /*
              * We are now reading the serialized Bloom Filter. We read
@@ -515,26 +592,31 @@
         return getFile(dataFile_);
     }
 
+    public long lastModified()
+    {
+        return dataWriter_.lastModified();
+    }
+    
     /*
      * Seeks to the specified key on disk.
     */
-    public void touch(final String clientKey, boolean fData) throws IOException
+    public void touch(String key, boolean fData) throws IOException
     {
-        if (touchCache_.containsKey(dataFile_ + ":" + clientKey))
+        if ( touchCache_.containsKey(key) )
             return;
         
         IFileReader dataReader = SequenceFile.reader(dataFile_); 
         try
         {
         	/* Morph the key */
-            String decoratedKey = StorageService.getPartitioner().decorateKey(clientKey);
-            Coordinate fileCoordinate = getCoordinates(decoratedKey, dataReader);
+        	key = morphKey(key);
+            Coordinate fileCoordinate = getCoordinates(key, dataReader);
             /* Get offset of key from block Index */
             dataReader.seek(fileCoordinate.end_);
-            BlockMetadata blockMetadata = dataReader.getBlockMetadata(decoratedKey);
+            BlockMetadata blockMetadata = dataReader.getBlockMetadata(key);
             if ( blockMetadata.position_ != -1L )
             {
-                touchCache_.put(dataFile_ + ":" + clientKey, blockMetadata.position_);
+                touchCache_.put(dataFile_ + ":" + key, blockMetadata.position_);                  
             } 
             
             if ( fData )
@@ -557,34 +639,67 @@
         }
     }
 
-    private long beforeAppend(String decoratedKey) throws IOException
+    private long beforeAppend(String key) throws IOException
     {
-    	if (decoratedKey == null )
+    	if(key == null )
             throw new IOException("Keys must not be null.");
-        Comparator<String> c = StorageService.getPartitioner().getReverseDecoratedKeyComparator();
-        if ( lastWrittenKey_ != null && c.compare(lastWrittenKey_, decoratedKey) <= 0 )
+        if ( lastWrittenKey_ != null && key.compareTo(lastWrittenKey_) <= 0 )
         {
             logger_.info("Last written key : " + lastWrittenKey_);
-            logger_.info("Current key : " + decoratedKey);
+            logger_.info("Current key : " + key);
             logger_.info("Writing into file " + dataFile_);
             throw new IOException("Keys must be written in ascending order.");
         }
-        return (lastWrittenKey_ == null) ? SSTable.positionAfterFirstBlockIndex_ : dataWriter_.getCurrentPosition();
+        long currentPosition = (lastWrittenKey_ == null) ? SSTable.positionAfterFirstBlockIndex_ : dataWriter_.getCurrentPosition();
+        return currentPosition;
+    }
+    
+    private long beforeAppend(BigInteger hash) throws IOException
+    {
+        if(hash == null )
+            throw new IOException("Keys must not be null.");
+        if ( lastWrittenKey_ != null )
+        {
+            BigInteger previousKey = new BigInteger(lastWrittenKey_);
+            if ( hash.compareTo(previousKey) <= 0 )
+            {
+                logger_.info("Last written key : " + previousKey);
+                logger_.info("Current key : " + hash);
+                logger_.info("Writing into file " + dataFile_);
+                throw new IOException("Keys must be written in ascending order.");
+            }
+        }
+        long currentPosition = (lastWrittenKey_ == null) ? SSTable.positionAfterFirstBlockIndex_ : dataWriter_.getCurrentPosition();
+        return currentPosition;
     }
 
-    private void afterAppend(String decoratedKey, long position, long size) throws IOException
+    private void afterAppend(String key, long position, long size) throws IOException
     {
         ++indexKeysWritten_;
-        lastWrittenKey_ = decoratedKey;
-        blockIndex_.put(decoratedKey, new BlockMetadata(position, size));
+        lastWrittenKey_ = key;
+        blockIndex_.put(key, new BlockMetadata(position, size));
         if ( indexKeysWritten_ == indexInterval_ )
         {
         	blockIndexes_.add(blockIndex_);
-        	blockIndex_ = new TreeMap<String, BlockMetadata>(StorageService.getPartitioner().getReverseDecoratedKeyComparator());
+        	blockIndex_ = new TreeMap<String, BlockMetadata>(Collections.reverseOrder());
             indexKeysWritten_ = 0;
         }                
     }
-
+    
+    private void afterAppend(BigInteger hash, long position, long size) throws IOException
+    {
+        ++indexKeysWritten_;
+        String key = hash.toString();
+        lastWrittenKey_ = key;
+        blockIndex_.put(key, new BlockMetadata(position, size));
+        if ( indexKeysWritten_ == indexInterval_ )
+        {
+            blockIndexes_.add(blockIndex_);
+            initBlockIndex();
+            indexKeysWritten_ = 0;
+        }                
+    }
+    
     /**
      * Dumps all the block indicies for this SSTable
      * at the end of the file.
@@ -615,10 +730,10 @@
         Set<String> keys = blockIndex.keySet();                
         /* Number of keys in this block */
         bufOut.writeInt(keys.size());
-        for ( String decoratedKey : keys )
+        for ( String key : keys )
         {            
-            bufOut.writeUTF(decoratedKey);
-            BlockMetadata blockMetadata = blockIndex.get(decoratedKey);
+            bufOut.writeUTF(key);
+            BlockMetadata blockMetadata = blockIndex.get(key);
             /* position of the key as a relative offset */
             bufOut.writeLong(position - blockMetadata.position_);
             bufOut.writeLong(blockMetadata.size_);
@@ -637,31 +752,45 @@
         blockIndex.clear();        
     }
 
-    public void append(String clientKey, DataOutputBuffer buffer) throws IOException
+    public void append(String key, DataOutputBuffer buffer) throws IOException
     {
-        String decoratedKey = StorageService.getPartitioner().decorateKey(clientKey);
-        long currentPosition = beforeAppend(decoratedKey);
-        dataWriter_.append(decoratedKey, buffer);
-        afterAppend(decoratedKey, currentPosition, buffer.getLength());
+        long currentPosition = beforeAppend(key);
+        dataWriter_.append(key, buffer);
+        afterAppend(key, currentPosition, buffer.getLength());
+    }
+    
+    public void append(String key, BigInteger hash, DataOutputBuffer buffer) throws IOException
+    {
+        long currentPosition = beforeAppend(hash);
+        /* Use as key - hash + ":" + key */
+        dataWriter_.append(hash + ":" + key, buffer);
+        afterAppend(hash, currentPosition, buffer.getLength());
     }
 
-    public void append(String clientKey, byte[] value) throws IOException
+    public void append(String key, byte[] value) throws IOException
+    {
+        long currentPosition = beforeAppend(key);
+        dataWriter_.append(key, value);
+        afterAppend(key, currentPosition, value.length );
+    }
+    
+    public void append(String key, BigInteger hash, byte[] value) throws IOException
     {
-        String decoratedKey = StorageService.getPartitioner().decorateKey(clientKey);
-        long currentPosition = beforeAppend(decoratedKey);
-        dataWriter_.append(decoratedKey, value);
-        afterAppend(decoratedKey, currentPosition, value.length );
+        long currentPosition = beforeAppend(hash);
+        /* Use as key - hash + ":" + key */
+        dataWriter_.append(hash + ":" + key, value);
+        afterAppend(hash, currentPosition, value.length);
     }
 
-    public static Coordinate getCoordinates(String decoratedKey, IFileReader dataReader) throws IOException
+    private Coordinate getCoordinates(String key, IFileReader dataReader) throws IOException
     {
-    	List<KeyPositionInfo> indexInfo = indexMetadataMap_.get(dataReader.getFileName());
+    	List<KeyPositionInfo> indexInfo = indexMetadataMap_.get(dataFile_);
     	int size = (indexInfo == null) ? 0 : indexInfo.size();
     	long start = 0L;
     	long end = dataReader.getEOF();
         if ( size > 0 )
         {
-            int index = Collections.binarySearch(indexInfo, new KeyPositionInfo(decoratedKey));
+            int index = Collections.binarySearch(indexInfo, new KeyPositionInfo(key));
             if ( index < 0 )
             {
                 /*
@@ -711,41 +840,97 @@
         return new Coordinate(start, end);
     }
     
-    public DataInputBuffer next(final String clientKey, String cfName, List<String> columnNames) throws IOException
+    /**
+     * Convert the application key into the appropriate application
+     * key based on the partition type.
+     * 
+     * @param key the application key
+     * @return the appropriate key based on partition mechanism
+    */
+    private String morphKey(String key)
     {
-        return next(clientKey, cfName, columnNames, null);
+        String internalKey = key;
+        PartitionerType pType = StorageService.getPartitionerType();
+        switch ( pType )
+        {
+            case OPHF:
+                break;
+                
+            default:
+                internalKey = FBUtilities.hash(key).toString();
+                break;
+        }
+        return internalKey;
     }
-
-    public DataInputBuffer next(final String clientKey, String cfName, List<String> columnNames, IndexHelper.TimeRange timeRange) throws IOException
+    
+    public DataInputBuffer next(String key, String cf, List<String> cNames) throws IOException
     {
+    	DataInputBuffer bufIn = null;        
+        IFileReader dataReader = null;
+        try
+        {
+            dataReader = SequenceFile.reader(dataFile_);
+            /* Morph key into actual key based on the partition type. */ 
+            key = morphKey(key);
+            Coordinate fileCoordinate = getCoordinates(key, dataReader);    
+            /*
+             * we have the position we have to read from in order to get the
+             * column family, get the column family and column(s) needed.
+            */          
+            bufIn = getData(dataReader, key, cf, cNames, fileCoordinate);
+        }
+        finally
+        {
+            if ( dataReader != null )
+            {
+                dataReader.close();
+            }
+        }
+        return bufIn;
+    }
+    
+    public DataInputBuffer next(String key, String columnName) throws IOException
+    {
+        DataInputBuffer bufIn = null;
         IFileReader dataReader = null;
         try
         {
             dataReader = SequenceFile.reader(dataFile_);
             // dataReader = SequenceFile.chksumReader(dataFile_, 4*1024*1024);
-
-            /* Morph key into actual key based on the partition type. */
-            String decoratedKey = StorageService.getPartitioner().decorateKey(clientKey);
-            Coordinate fileCoordinate = getCoordinates(decoratedKey, dataReader);
+            /* Morph key into actual key based on the partition type. */ 
+            key = morphKey(key);
+            Coordinate fileCoordinate = getCoordinates(key, dataReader);
             /*
              * we have the position we have to read from in order to get the
              * column family, get the column family and column(s) needed.
-            */
-            DataOutputBuffer bufOut = new DataOutputBuffer();
-            DataInputBuffer bufIn = new DataInputBuffer();
-
-            long bytesRead = dataReader.next(decoratedKey, bufOut, cfName, columnNames, timeRange, fileCoordinate);
-            if ( bytesRead != -1L )
+            */            
+            bufIn = getData(dataReader, key, columnName, fileCoordinate);
+        }
+        finally
+        {
+            if ( dataReader != null )
             {
-                if ( bufOut.getLength() > 0 )
-                {
-                    bufIn.reset(bufOut.getData(), bufOut.getLength());
-                    /* read the key even though we do not use it */
-                    bufIn.readUTF();
-                    bufIn.readInt();
-                }
+                dataReader.close();
             }
-            return bufIn;
+        }
+        return bufIn;
+    }
+    
+    public DataInputBuffer next(String key, String columnName, IndexHelper.TimeRange timeRange) throws IOException
+    {
+        DataInputBuffer bufIn = null;
+        IFileReader dataReader = null;
+        try
+        {
+            dataReader = SequenceFile.reader(dataFile_);
+            /* Morph key into actual key based on the partition type. */ 
+            key = morphKey(key);
+            Coordinate fileCoordinate = getCoordinates(key, dataReader);
+            /*
+             * we have the position we have to read from in order to get the
+             * column family, get the column family and column(s) needed.
+            */  
+            bufIn = getData(dataReader, key, columnName, timeRange, fileCoordinate);
         }
         finally
         {
@@ -754,14 +939,125 @@
                 dataReader.close();
             }
         }
+        return bufIn;
     }
-
-    public DataInputBuffer next(String clientKey, String columnFamilyColumn) throws IOException
+    
+    long getSeekPosition(String key, long start)
+    {
+        Long seekStart = touchCache_.get(dataFile_ + ":" + key);
+        if( seekStart != null)
+        {
+            return seekStart;
+        }
+        return start;
+    }
+        
+    /*
+     * Get the data for the key from the position passed in. 
+    */
+    private DataInputBuffer getData(IFileReader dataReader, String key, String column, Coordinate section) throws IOException
+    {
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        DataInputBuffer bufIn = new DataInputBuffer();
+        
+        long bytesRead = dataReader.next(key, bufOut, column, section);
+        if ( bytesRead != -1L )
+        {
+            if ( bufOut.getLength() > 0 )
+            {                              
+                bufIn.reset(bufOut.getData(), bufOut.getLength());            
+                /* read the key even though we do not use it */
+                bufIn.readUTF();
+                bufIn.readInt();            
+            }
+        }
+        
+        return bufIn;
+    }
+    
+    private DataInputBuffer getData(IFileReader dataReader, String key, String cf, List<String> columns, Coordinate section) throws IOException
     {
-        String[] values = RowMutation.getColumnAndColumnFamily(columnFamilyColumn);
-        String columnFamilyName = values[0];
-        List<String> cnNames = (values.length == 1) ? null : Arrays.asList(values[1]);
-        return next(clientKey, columnFamilyName, cnNames);
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        DataInputBuffer bufIn = new DataInputBuffer();
+                  
+        long bytesRead = dataReader.next(key, bufOut, cf, columns, section);
+        if ( bytesRead != -1L )
+        {
+            if ( bufOut.getLength() > 0 )
+            {                     
+                bufIn.reset(bufOut.getData(), bufOut.getLength());             
+                /* read the key even though we do not use it */
+                bufIn.readUTF();
+                bufIn.readInt();            
+            }        
+        }
+        return bufIn;
+    }
+    
+    /*
+     * Get the data for the key from the position passed in. 
+    */
+    private DataInputBuffer getData(IFileReader dataReader, String key, String column, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException
+    {
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        DataInputBuffer bufIn = new DataInputBuffer();
+                
+        try
+        {
+            dataReader.next(key, bufOut, column, timeRange, section);
+            if ( bufOut.getLength() > 0 )
+            {                              
+                bufIn.reset(bufOut.getData(), bufOut.getLength());            
+                /* read the key even though we do not use it */
+                bufIn.readUTF();
+                bufIn.readInt();            
+            }
+        }
+        catch ( IOException ex )
+        {
+            logger_.warn(LogUtil.throwableToString(ex));
+        }
+        return bufIn;
+    }
+    
+    /*
+     * Given a key we are interested in this method gets the
+     * closest index before the key on disk.
+     *
+     *  param @ key - key we are interested in.
+     *  return position of the closest index before the key
+     *  on disk or -1 if this key is not on disk.
+    */
+    private long getClosestIndexPositionToKeyOnDisk(String key)
+    {
+        long position = -1L;
+        List<KeyPositionInfo> indexInfo = indexMetadataMap_.get(dataFile_);
+        int size = indexInfo.size();
+        int index = Collections.binarySearch(indexInfo, new KeyPositionInfo(key));
+        if ( index < 0 )
+        {
+            /*
+             * We are here which means that the requested
+             * key is not an index.
+            */
+            index = (++index)*(-1);
+            /* this means key is not present at all */
+            if ( index >= size )
+                return position;
+            /* a scan is in order. */
+            position = (index == 0) ? 0 : indexInfo.get(index - 1).position();
+        }
+        else
+        {
+            /*
+             * If we are here that means the key is in the index file
+             * and we can retrieve it w/o a scan. In reality we would
+             * like to have a retreive(key, fromPosition) but for now
+             * we use scan(start, start + 1) - a hack.
+            */
+            position = indexInfo.get(index).position();
+        }
+        return position;
     }
 
     public void close() throws IOException