You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 07:45:23 UTC

svn commit: r759033 [2/2] - in /incubator/cassandra/trunk/src/org/apache/cassandra: concurrent/ config/ cql/common/ db/ dht/ gms/ io/ locator/ net/ test/ tools/ utils/

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java Fri Mar 27 06:45:19 2009
@@ -212,11 +212,13 @@
     }
 
     public static class BufferWriter extends Writer
-    {
+    {        
+        private int size_;
 
         BufferWriter(String filename, int size) throws IOException
         {
             super(filename, size);
+            size_ = size;
         }
         
         @Override
@@ -268,6 +270,145 @@
         }
     }
 
+    public static class ConcurrentWriter extends AbstractWriter
+    {
+        private FileChannel fc_;
+
+        public ConcurrentWriter(String filename) throws IOException
+        {
+            super(filename);
+            RandomAccessFile raf = new RandomAccessFile(filename, "rw");
+            fc_ = raf.getChannel();
+        }
+
+        public long getCurrentPosition() throws IOException
+        {
+            return fc_.position();
+        }
+
+        public void seek(long position) throws IOException
+        {
+            fc_.position(position);
+        }
+
+        public void append(DataOutputBuffer buffer) throws IOException
+        {
+            int length = buffer.getLength();
+            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(length);
+            byteBuffer.put(buffer.getData(), 0, length);
+            byteBuffer.flip();
+            fc_.write(byteBuffer);
+        }
+        
+        public void append(DataOutputBuffer keyBuffer, DataOutputBuffer buffer) throws IOException
+        {
+            int keyBufLength = keyBuffer.getLength();
+            if ( keyBuffer == null || keyBufLength == 0 )
+                throw new IllegalArgumentException("Key cannot be NULL or of zero length.");
+
+            /* Size allocated "int" for key length + key + "int" for data length + data */
+            int length = buffer.getLength();
+            ByteBuffer byteBuffer = ByteBuffer.allocateDirect( 4 + keyBufLength + 4 + length );
+            byteBuffer.putInt(keyBufLength);
+            byteBuffer.put(keyBuffer.getData(), 0, keyBufLength);
+            byteBuffer.putInt(length);
+            byteBuffer.put(buffer.getData(), 0, length);
+            byteBuffer.flip();
+            fc_.write(byteBuffer);
+        }
+
+        public void append(String key, DataOutputBuffer buffer) throws IOException
+        {
+            if ( key == null )
+                throw new IllegalArgumentException("Key cannot be NULL.");
+
+            int length = buffer.getLength();
+            /* Size allocated : utfPrefix_ + key length + "int" for data size + data */
+            ByteBuffer byteBuffer = ByteBuffer.allocateDirect( SequenceFile.utfPrefix_ + key.length() + 4 + length);
+            SequenceFile.writeUTF(byteBuffer, key);
+            byteBuffer.putInt(length);
+            byteBuffer.put(buffer.getData(), 0, length);
+            byteBuffer.flip();
+            fc_.write(byteBuffer);
+        }
+
+        public void append(String key, byte[] value) throws IOException
+        {
+            if ( key == null )
+                throw new IllegalArgumentException("Key cannot be NULL.");
+
+            /* Size allocated key length + "int" for data size + data */
+            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(utfPrefix_ + key.length() + 4 + value.length);
+            SequenceFile.writeUTF(byteBuffer, key);
+            byteBuffer.putInt(value.length);
+            byteBuffer.put(value);
+            byteBuffer.flip();
+            fc_.write(byteBuffer);
+        }
+
+        public void append(String key, long value) throws IOException
+        {
+            if ( key == null )
+                throw new IllegalArgumentException("Key cannot be NULL.");
+
+            /* Size allocated key length + a long */
+            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(SequenceFile.utfPrefix_ + key.length() + 8);
+            SequenceFile.writeUTF(byteBuffer, key);
+            byteBuffer.putLong(value);
+            byteBuffer.flip();
+            fc_.write(byteBuffer);
+        }
+
+        /*
+         * Be extremely careful while using this API. This currently
+         * used to write the commit log header in the commit logs.
+         * If not used carefully it could completely screw up reads
+         * of other key/value pairs that are written.
+        */
+        public long writeDirect(byte[] bytes) throws IOException
+        {
+            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(bytes.length);
+            byteBuffer.put(bytes);
+            byteBuffer.flip();
+            fc_.write(byteBuffer);
+            return fc_.position();
+        }
+        
+        public void writeLong(long value) throws IOException
+        {
+            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(8);
+            byteBuffer.putLong(value);
+            byteBuffer.flip();
+            fc_.write(byteBuffer);
+        }
+
+        public void close() throws IOException
+        {
+            fc_.close();
+        }
+
+        public void close(byte[] footer, int size) throws IOException
+        {
+            /* Size is marker length + "int" for size + footer data */
+            ByteBuffer byteBuffer = ByteBuffer.allocateDirect( utfPrefix_ + SequenceFile.marker_.length() + 4 + footer.length);
+            SequenceFile.writeUTF(byteBuffer, SequenceFile.marker_);
+            byteBuffer.putInt(size);
+            byteBuffer.put(footer);
+            byteBuffer.flip();
+            fc_.write(byteBuffer);            
+        }
+
+        public String getFileName()
+        {
+            return filename_;
+        }
+
+        public long getFileSize() throws IOException
+        {
+            return fc_.size();
+        }
+    }
+    
     public static class FastConcurrentWriter extends AbstractWriter
     {
         private FileChannel fc_;
@@ -420,8 +561,36 @@
         public String getFileName()
         {
             return filename_;
+        }   
+        
+        /**
+         * Given the application key this method basically figures if
+         * the key is in the block. Key comparisons differ based on the
+         * partition function. In OPHF key is stored as is but in the
+         * case of a Random hash key used internally is hash(key):key.
+         * @param key which we are looking for
+         * @param in DataInput stream into which we are looking for the key.
+         * @return true if key is found and false otherwise.
+         * @throws IOException
+         */
+        protected boolean isKeyInBlock(String key, DataInput in) throws IOException
+        {
+            boolean bVal = false;            
+            String keyInBlock = in.readUTF();
+            PartitionerType pType = StorageService.getPartitionerType();
+            switch ( pType )
+            {
+                case OPHF:
+                    bVal = keyInBlock.equals(key);
+                    break;
+                    
+                default:                    
+                    bVal = keyInBlock.split(":")[0].equals(key);
+                    break;
+            }
+            return bVal;
         }
-
+       
         /**
          * Return the position of the given key from the block index.
          * @param key the key whose offset is to be extracted from the current block index
@@ -449,7 +618,7 @@
             for ( int i = 0; i < keys; ++i )
             {            
                 String keyInBlock = bufIn.readUTF();                
-                if ( keyInBlock.equals(key) )
+                if ( keyInBlock.equals(key) )                
                 {                	
                     position = bufIn.readLong();
                     break;
@@ -497,9 +666,8 @@
             /* Number of keys in the block. */
             int keys = bufIn.readInt();
             for ( int i = 0; i < keys; ++i )
-            {
-                String keyInBlock = bufIn.readUTF();
-                if (keyInBlock.equals(key))
+            {                
+                if ( isKeyInBlock(key, bufIn) )
                 {
                     long position = bufIn.readLong();
                     long dataSize = bufIn.readLong();
@@ -583,7 +751,7 @@
         {
             /* Goto the Block Index */
             seek(section.end_);
-            long position = getPositionFromBlockIndex(key);
+            long position = getPositionFromBlockIndex(key);            
             seek(position);                   
         }
         
@@ -629,9 +797,9 @@
             }
             return totalBytesRead;
         }
-
+        
         /**
-         * Reads the column name indexes if present. If the
+         * Reads the column name indexes if present. If the 
          * indexes are based on time then skip over them.
          * @param cfName
          * @return
@@ -640,13 +808,13 @@
         {
             /* check if we have an index */
             boolean hasColumnIndexes = file_.readBoolean();
-            int totalBytesRead = 1;
+            int totalBytesRead = 1;            
             /* if we do then deserialize the index */
             if(hasColumnIndexes)
-            {
+            {        
                 if ( DatabaseDescriptor.isTimeSortingEnabled(cfName) )
                 {
-                    /* read the index */
+                    /* read the index */                            
                     totalBytesRead += IndexHelper.deserializeIndex(cfName, file_, columnIndexList);
                 }
                 else
@@ -658,6 +826,272 @@
         }
         
         /**
+         * This is useful in figuring out the key in system. If an OPHF 
+         * is used then the "key" is the application supplied key. If a random
+         * partitioning mechanism is used then the key is of the form 
+         * hash:key where hash is used internally as the key.
+         * 
+         * @param in the DataInput stream from which the key needs to be read
+         * @return the appropriate key based on partitioning type
+         * @throws IOException
+         */
+        protected String readKeyFromDisk(DataInput in) throws IOException
+        {
+            String keyInDisk = null;
+            PartitionerType pType = StorageService.getPartitionerType();
+            switch( pType )
+            {
+                case OPHF:
+                    keyInDisk = in.readUTF();                  
+                    break;
+                    
+                default:
+                    keyInDisk = in.readUTF().split(":")[0];
+                    break;
+            }
+            return keyInDisk;
+        }
+
+        /**
+         * This method dumps the next key/value into the DataOuputStream
+         * passed in. Always use this method to query for application
+         * specific data as it will have indexes.
+         *
+         * @param key key we are interested in.
+         * @param dos DataOutputStream that needs to be filled.
+         * @param cf the IColumn we want to read
+         * @param section region of the file that needs to be read
+         * @return total number of bytes read/considered
+        */
+        public long next(String key, DataOutputBuffer bufOut, String cf, Coordinate section) throws IOException
+        {
+    		String[] values = RowMutation.getColumnAndColumnFamily(cf);
+    		String columnFamilyName = values[0];    		
+    		String columnName = (values.length == 1) ? null : values[1];
+
+            long bytesRead = -1L;
+            if ( isEOF() )
+                return bytesRead;
+            seekTo(key, section);            
+            /* note the position where the key starts */
+            long startPosition = file_.getFilePointer();
+            String keyInDisk = readKeyFromDisk(file_);
+            if ( keyInDisk != null )
+            {
+                /*
+                 * If key on disk is greater than requested key
+                 * we can bail out since we exploit the property
+                 * of the SSTable format.
+                */
+                if ( keyInDisk.compareTo(key) > 0 )
+                    return bytesRead;
+
+                /*
+                 * If we found the key then we populate the buffer that
+                 * is passed in. If not then we skip over this key and
+                 * position ourselves to read the next one.
+                */
+                int dataSize = file_.readInt();
+                if ( keyInDisk.equals(key) )
+                {
+                    /* write the key into buffer */
+                    bufOut.writeUTF( keyInDisk );                    
+                    
+                    if(columnName == null)
+                    {
+                    	int bytesSkipped = IndexHelper.skipBloomFilterAndIndex(file_);
+	                    /*
+	                     * read the correct number of bytes for the column family and
+	                     * write data into buffer. Substract from dataSize the bloom
+                         * filter size.
+	                    */                        
+                    	dataSize -= bytesSkipped;
+                    	/* write the data size */
+                    	bufOut.writeInt(dataSize);
+	                    /* write the data into buffer, except the boolean we have read */
+	                    bufOut.write(file_, dataSize);
+                    }
+                    else
+                    {
+                        /* Read the bloom filter for the column summarization */
+                        long preBfPos = file_.getFilePointer();
+                        BloomFilter bf = defreezeBloomFilter();
+                        /* column does not exist in this file */
+                        if ( !bf.isPresent(columnName) ) 
+                            return bytesRead;
+                        long postBfPos = file_.getFilePointer();
+                        dataSize -= (postBfPos - preBfPos);
+                        
+                        List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+                        /* Read the name indexes if present */
+                        int totalBytesRead = handleColumnNameIndexes(columnFamilyName, columnIndexList);                    	
+                    	dataSize -= totalBytesRead;
+
+                        /* read the column family name */
+                        String cfName = file_.readUTF();
+                        dataSize -= (utfPrefix_ + cfName.length());
+
+                        /* read if this cf is marked for delete */
+                        boolean markedForDelete = file_.readBoolean();
+                        dataSize -= 1;
+
+                        /* read the total number of columns */
+                        int totalNumCols = file_.readInt();
+                        dataSize -= 4;
+                                                
+                        /* get the column range we have to read */
+                        IndexHelper.ColumnIndexInfo cIndexInfo = new IndexHelper.ColumnNameIndexInfo(columnName);
+                        IndexHelper.ColumnRange columnRange = IndexHelper.getColumnRangeFromNameIndex(cIndexInfo, columnIndexList, dataSize, totalNumCols);
+
+                        Coordinate coordinate = columnRange.coordinate();
+                		/* seek to the correct offset to the data, and calculate the data size */
+                        file_.skipBytes((int)coordinate.start_);                        
+                        dataSize = (int)(coordinate.end_ - coordinate.start_);
+                        
+                        /*
+                         * write the number of columns in the column family we are returning:
+                         * 	dataSize that we are reading +
+                         * 	length of column family name +
+                         * 	one booleanfor deleted or not +
+                         * 	one int for number of columns
+                        */
+                        bufOut.writeInt(dataSize + utfPrefix_+cfName.length() + 4 + 1);
+                        /* write the column family name */
+                        bufOut.writeUTF(cfName);
+                        /* write if this cf is marked for delete */
+                        bufOut.writeBoolean(markedForDelete);
+                        /* write number of columns */
+                        bufOut.writeInt(columnRange.count());
+                        /* now write the columns */
+                        bufOut.write(file_, dataSize);
+                    }
+                }
+                else
+                {
+                    /* skip over data portion */
+                	file_.seek(dataSize + file_.getFilePointer());
+                }
+                
+                long endPosition = file_.getFilePointer();
+                bytesRead = endPosition - startPosition;                 
+            }
+
+            return bytesRead;
+        }
+        
+        /**
+         * This method dumps the next key/value into the DataOuputStream
+         * passed in. Always use this method to query for application
+         * specific data as it will have indexes.
+         
+         * @param key key we are interested in.
+         * @param dos DataOutputStream that needs to be filled.
+         * @param column name of the column in our format.
+         * @param timeRange time range we are interested in.
+         * @param section region of the file that needs to be read
+         * @throws IOException
+         * @return number of bytes that were read.
+        */
+        public long next(String key, DataOutputBuffer bufOut, String cf, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException
+        {
+            String[] values = RowMutation.getColumnAndColumnFamily(cf);
+            String columnFamilyName = values[0];            
+            String columnName = (values.length == 1) ? null : values[1];
+
+            long bytesRead = -1L;
+            if ( isEOF() )
+                return bytesRead;                                
+            seekTo(key, section);            
+            /* note the position where the key starts */
+            long startPosition = file_.getFilePointer();
+            String keyInDisk = readKeyFromDisk(file_);
+            if ( keyInDisk != null )
+            {
+                /*
+                 * If key on disk is greater than requested key
+                 * we can bail out since we exploit the property
+                 * of the SSTable format.
+                */
+                if ( keyInDisk.compareTo(key) > 0 )
+                    return bytesRead;
+
+                /*
+                 * If we found the key then we populate the buffer that
+                 * is passed in. If not then we skip over this key and
+                 * position ourselves to read the next one.
+                */
+                int dataSize = file_.readInt();
+                if ( keyInDisk.equals(key) )
+                {
+                    /* write the key into buffer */
+                    bufOut.writeUTF( keyInDisk );                    
+                    
+                    if(columnName == null)
+                    {
+                        int bytesSkipped = IndexHelper.skipBloomFilter(file_);
+                        /*
+                         * read the correct number of bytes for the column family and
+                         * write data into buffer. Substract from dataSize the bloom
+                         * filter size.
+                        */                        
+                        dataSize -= bytesSkipped;
+                        List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+                        /* Read the times indexes if present */
+                        int totalBytesRead = handleColumnTimeIndexes(columnFamilyName, columnIndexList);                        
+                        dataSize -= totalBytesRead;
+                        
+                        /* read the column family name */
+                        String cfName = file_.readUTF();
+                        dataSize -= (utfPrefix_ + cfName.length());
+
+                        /* read if this cf is marked for delete */
+                        boolean markedForDelete = file_.readBoolean();
+                        dataSize -= 1;
+
+                        /* read the total number of columns */
+                        int totalNumCols = file_.readInt();
+                        dataSize -= 4;
+                                                
+                        /* get the column range we have to read */                        
+                        IndexHelper.ColumnRange columnRange = IndexHelper.getColumnRangeFromTimeIndex(timeRange, columnIndexList, dataSize, totalNumCols);
+
+                        Coordinate coordinate = columnRange.coordinate();
+                        /* seek to the correct offset to the data, and calculate the data size */
+                        file_.skipBytes((int)coordinate.start_);
+                        dataSize = (int)(coordinate.end_ - coordinate.start_);
+                        
+                        /*
+                         * write the number of columns in the column family we are returning:
+                         *  dataSize that we are reading +
+                         *  length of column family name +
+                         *  one booleanfor deleted or not +
+                         *  one int for number of columns
+                        */
+                        bufOut.writeInt(dataSize + utfPrefix_+cfName.length() + 4 + 1);
+                        /* write the column family name */
+                        bufOut.writeUTF(cfName);
+                        /* write if this cf is marked for delete */
+                        bufOut.writeBoolean(markedForDelete);
+                        /* write number of columns */
+                        bufOut.writeInt(columnRange.count());
+                        /* now write the columns */
+                        bufOut.write(file_, dataSize);
+                    }
+                }
+                else
+                {
+                    /* skip over data portion */
+                    file_.seek(dataSize + file_.getFilePointer());
+                }
+                
+                long endPosition = file_.getFilePointer();
+                bytesRead = endPosition - startPosition;                 
+            }
+
+            return bytesRead;
+        }
+
+        /**
          * This method dumps the next key/value into the DataOuputStream
          * passed in. Always use this method to query for application
          * specific data as it will have indexes.
@@ -670,11 +1104,11 @@
          * @return total number of bytes read/considered
          *
         */
-        public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, List<String> columnNames, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException
+        public long next(String key, DataOutputBuffer bufOut, String cf, List<String> columnNames, Coordinate section) throws IOException
         {
-            assert timeRange == null || columnNames == null; // at most one may be non-null
-            
-            List<String> cNames = columnNames == null ? null : new ArrayList<String>(columnNames);
+        	String[] values = RowMutation.getColumnAndColumnFamily(cf);
+    		String columnFamilyName = values[0];
+            List<String> cNames = new ArrayList<String>(columnNames);
 
             long bytesRead = -1L;
             if ( isEOF() )
@@ -682,8 +1116,8 @@
 
             seekTo(key, section);            
             /* note the position where the key starts */
-            long startPosition = file_.getFilePointer();
-            String keyInDisk = file_.readUTF();
+            long startPosition = file_.getFilePointer();            
+            String keyInDisk = readKeyFromDisk(file_);
             if ( keyInDisk != null )
             {
                 /*
@@ -737,9 +1171,7 @@
                         
                         List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
                         /* read the column name indexes if present */
-                        int totalBytesRead = (timeRange == null)
-                                           ? handleColumnNameIndexes(columnFamilyName, columnIndexList)
-                                           : handleColumnTimeIndexes(columnFamilyName, columnIndexList);
+                        int totalBytesRead = handleColumnNameIndexes(columnFamilyName, columnIndexList);                        
                     	dataSize -= totalBytesRead;
 
                         /* read the column family name */
@@ -747,8 +1179,8 @@
                         dataSize -= (utfPrefix_ + cfName.length());
 
                         /* read if this cf is marked for delete */
-                        long markedForDeleteAt = file_.readLong();
-                        dataSize -= 8;
+                        boolean markedForDelete = file_.readBoolean();
+                        dataSize -= 1;
 
                         /* read the total number of columns */
                         int totalNumCols = file_.readInt();
@@ -758,15 +1190,7 @@
                         /* sort the required list of columns */
                         Collections.sort(cNames);
                         /* get the various column ranges we have to read */
-                        List<IndexHelper.ColumnRange> columnRanges;
-                        if (timeRange == null)
-                        {
-                            columnRanges = IndexHelper.getMultiColumnRangesFromNameIndex(cNames, columnIndexList, dataSize, totalNumCols);
-                        }
-                        else
-                        {
-                            columnRanges = Arrays.asList(IndexHelper.getColumnRangeFromTimeIndex(timeRange, columnIndexList, dataSize, totalNumCols));
-                        }
+                        List<IndexHelper.ColumnRange> columnRanges = IndexHelper.getMultiColumnRangesFromNameIndex(cNames, columnIndexList, dataSize, totalNumCols);
 
                         /* calculate the data size */
                         int numColsReturned = 0;
@@ -789,7 +1213,7 @@
                         /* write the column family name */
                         bufOut.writeUTF(cfName);
                         /* write if this cf is marked for delete */
-                        bufOut.writeLong(markedForDeleteAt);
+                        bufOut.writeBoolean(markedForDelete);
                         /* write number of columns */
                         bufOut.writeInt(numColsReturned);
                         int prevPosition = 0;
@@ -856,8 +1280,65 @@
                 bytesRead = -1L;
             return bytesRead;
         }
-    }
 
+        /**
+         * This method dumps the next key/value into the DataOuputStream
+         * passed in.
+         *
+         * @param key - key we are interested in.
+         * @param dos - DataOutputStream that needs to be filled.
+         * @param section region of the file that needs to be read
+         * @return total number of bytes read/considered
+         */
+        public long next(String key, DataOutputBuffer bufOut, Coordinate section) throws IOException
+        {
+            long bytesRead = -1L;
+            if ( isEOF() )
+                return bytesRead;
+                   
+            seekTo(key, section);            
+            /* note the position where the key starts */
+            long startPosition = file_.getFilePointer(); 
+            String keyInDisk = readKeyFromDisk(file_);
+            if ( keyInDisk != null )
+            {
+                /*
+                 * If key on disk is greater than requested key
+                 * we can bail out since we exploit the property
+                 * of the SSTable format.
+                */
+                if ( keyInDisk.compareTo(key) > 0 )
+                    return bytesRead;
+
+                /*
+                 * If we found the key then we populate the buffer that
+                 * is passed in. If not then we skip over this key and
+                 * position ourselves to read the next one.
+                */
+                int dataSize = file_.readInt();
+                if ( keyInDisk.equals(key) )
+                {
+                    /* write the key into buffer */
+                    bufOut.writeUTF( keyInDisk );
+                    /* write data size into buffer */
+                    bufOut.writeInt(dataSize);
+                    /* write the data into buffer */
+                    bufOut.write(file_, dataSize);
+                }
+                else
+                {
+                    /* skip over data portion */
+                	file_.seek(dataSize + file_.getFilePointer());
+                }
+
+                long endPosition = file_.getFilePointer();
+                bytesRead = endPosition - startPosition;
+            }
+
+            return bytesRead;
+        }
+    }
+    
     public static class Reader extends AbstractReader
     {
         Reader(String filename) throws IOException
@@ -951,6 +1432,7 @@
     }
     
     private static Logger logger_ = Logger.getLogger( SequenceFile.class ) ;
+    public static final short utfPrefix_ = 2;
     public static final String marker_ = "Bloom-Filter";
 
     public static IFileWriter writer(String filename) throws IOException
@@ -968,6 +1450,11 @@
         return new ChecksumWriter(filename, size);
     }
 
+    public static IFileWriter concurrentWriter(String filename) throws IOException
+    {
+        return new ConcurrentWriter(filename);
+    }
+    
     public static IFileWriter fastWriter(String filename, int size) throws IOException
     {
         return new FastConcurrentWriter(filename, size);
@@ -988,6 +1475,11 @@
         return new ChecksumReader(filename, size);
     }
 
+    public static boolean readBoolean(ByteBuffer buffer)
+    {
+        return ( buffer.get() == 1 ? true : false );
+    }
+
     /**
      * Efficiently writes a UTF8 string to the buffer.
      * Assuming all Strings that are passed in have length
@@ -1056,4 +1548,83 @@
         buffer.put(bytearr, 0, utflen + 2);
     }
 
+    /**
+     * Read a UTF8 string from a serialized buffer.
+     * @param buffer buffer from which a UTF8 string is read
+     * @return a Java String
+    */
+    protected static String readUTF(ByteBuffer in) throws IOException
+    {
+        int utflen = in.getShort();
+        byte[] bytearr = new byte[utflen];
+        char[] chararr = new char[utflen];
+
+        int c, char2, char3;
+        int count = 0;
+        int chararr_count = 0;
+
+        in.get(bytearr, 0, utflen);
+
+        while (count < utflen)
+        {
+            c = (int) bytearr[count] & 0xff;
+            if (c > 127)
+                break;
+            count++;
+            chararr[chararr_count++] = (char) c;
+        }
+
+        while (count < utflen)
+        {
+            c = (int) bytearr[count] & 0xff;
+            switch (c >> 4)
+            {
+            case 0:
+            case 1:
+            case 2:
+            case 3:
+            case 4:
+            case 5:
+            case 6:
+            case 7:
+                /* 0xxxxxxx */
+                count++;
+                chararr[chararr_count++] = (char) c;
+                break;
+            case 12:
+            case 13:
+                /* 110x xxxx 10xx xxxx */
+                count += 2;
+                if (count > utflen)
+                    throw new UTFDataFormatException(
+                    "malformed input: partial character at end");
+                char2 = (int) bytearr[count - 1];
+                if ((char2 & 0xC0) != 0x80)
+                    throw new UTFDataFormatException(
+                            "malformed input around byte " + count);
+                chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
+                break;
+            case 14:
+                /* 1110 xxxx 10xx xxxx 10xx xxxx */
+                count += 3;
+                if (count > utflen)
+                    throw new UTFDataFormatException(
+                    "malformed input: partial character at end");
+                char2 = (int) bytearr[count - 2];
+                char3 = (int) bytearr[count - 1];
+                if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
+                    throw new UTFDataFormatException(
+                            "malformed input around byte " + (count - 1));
+                chararr[chararr_count++] = (char) (((c & 0x0F) << 12)
+                        | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+                break;
+            default:
+                /* 10xx xxxx, 1111 xxxx */
+                throw new UTFDataFormatException("malformed input around byte "
+                        + count);
+            }
+        }
+        // The number of chars produced may be less than utflen
+        return new String(chararr, 0, chararr_count);
+    }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java Fri Mar 27 06:45:19 2009
@@ -1,5 +1,6 @@
 package org.apache.cassandra.locator;
 
+import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -7,12 +8,11 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.log4j.Logger;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.log4j.Logger;
 
 /**
  * This class contains a helper method that will be used by
@@ -45,10 +45,10 @@
     protected EndPoint getNextAvailableEndPoint(EndPoint startPoint, List<EndPoint> topN, List<EndPoint> liveNodes)
     {
         EndPoint endPoint = null;
-        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-        List tokens = new ArrayList(tokenToEndPointMap.keySet());
+        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
         Collections.sort(tokens);
-        Token token = tokenMetadata_.getToken(startPoint);
+        BigInteger token = tokenMetadata_.getToken(startPoint);
         int index = Collections.binarySearch(tokens, token);
         if(index < 0)
         {
@@ -76,7 +76,7 @@
      * endpoint which is in the top N.
      * Get the map of top N to the live nodes currently.
      */
-    public Map<EndPoint, EndPoint> getHintedStorageEndPoints(Token token)
+    public Map<EndPoint, EndPoint> getHintedStorageEndPoints(BigInteger token)
     {
         List<EndPoint> liveList = new ArrayList<EndPoint>();
         Map<EndPoint, EndPoint> map = new HashMap<EndPoint, EndPoint>();
@@ -107,6 +107,6 @@
         return map;
     }
 
-    public abstract EndPoint[] getStorageEndPoints(Token token);
+    public abstract EndPoint[] getStorageEndPoints(BigInteger token);
 
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java Fri Mar 27 06:45:19 2009
@@ -18,9 +18,9 @@
 
 package org.apache.cassandra.locator;
 
+import java.math.BigInteger;
 import java.util.Map;
 
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.EndPoint;
 
 
@@ -32,7 +32,8 @@
  */
 public interface IReplicaPlacementStrategy
 {
-	public EndPoint[] getStorageEndPoints(Token token);
-    public EndPoint[] getStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap);
-    public Map<EndPoint, EndPoint> getHintedStorageEndPoints(Token token);    
+	public EndPoint[] getStorageEndPoints(BigInteger token);
+	public Map<String, EndPoint[]> getStorageEndPoints(String[] keys);
+    public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap);
+    public Map<EndPoint, EndPoint> getHintedStorageEndPoints(BigInteger token);    
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java Fri Mar 27 06:45:19 2009
@@ -1,13 +1,14 @@
 package org.apache.cassandra.locator;
 
+import java.math.BigInteger;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.LogUtil;
@@ -27,7 +28,7 @@
         super(tokenMetadata);
     }
     
-    public EndPoint[] getStorageEndPoints(Token token)
+    public EndPoint[] getStorageEndPoints(BigInteger token)
     {
         int startIndex = 0 ;
         List<EndPoint> list = new ArrayList<EndPoint>();
@@ -35,8 +36,8 @@
         boolean bOtherRack = false;
         int foundCount = 0;
         int N = DatabaseDescriptor.getReplicationFactor();
-        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-        List tokens = new ArrayList(tokenToEndPointMap.keySet());
+        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
         Collections.sort(tokens);
         int index = Collections.binarySearch(tokens, token);
         if(index < 0)
@@ -106,8 +107,98 @@
         retrofitPorts(list);
         return list.toArray(new EndPoint[0]);
     }
+    
+    public Map<String, EndPoint[]> getStorageEndPoints(String[] keys)
+    {
+    	Map<String, EndPoint[]> results = new HashMap<String, EndPoint[]>();
+    	List<EndPoint> list = new ArrayList<EndPoint>();
+    	int startIndex = 0 ;
+    	int foundCount = 0;
+    	boolean bDataCenter = false;
+        boolean bOtherRack = false;
+    	
+    	Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+    	int N = DatabaseDescriptor.getReplicationFactor();
+        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+        Collections.sort(tokens);
+        
+        for ( String key : keys )
+        {
+        	BigInteger token = StorageService.hash(key);
+        	int index = Collections.binarySearch(tokens, token);
+            if(index < 0)
+            {
+                index = (index + 1) * (-1);
+                if (index >= tokens.size())
+                    index = 0;
+            }
+            int totalNodes = tokens.size();
+            // Add the node at the index by default
+            list.add(tokenToEndPointMap.get(tokens.get(index)));
+            foundCount++;
+            if( N == 1 )
+            {
+            	results.put( key, list.toArray(new EndPoint[0]) );
+                return results;
+            }
+            startIndex = (index + 1)%totalNodes;
+            IEndPointSnitch endPointSnitch = StorageService.instance().getEndPointSnitch();
+            
+            for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+            {
+                try
+                {
+                    // First try to find one in a different data center
+                    if(!endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
+                    {
+                        // If we have already found something in a diff datacenter no need to find another
+                        if( !bDataCenter )
+                        {
+                            list.add(tokenToEndPointMap.get(tokens.get(i)));
+                            bDataCenter = true;
+                            foundCount++;
+                        }
+                        continue;
+                    }
+                    // Now  try to find one on a different rack
+                    if(!endPointSnitch.isOnSameRack(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))) &&
+                            endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
+                    {
+                        // If we have already found something in a diff rack no need to find another
+                        if( !bOtherRack )
+                        {
+                            list.add(tokenToEndPointMap.get(tokens.get(i)));
+                            bOtherRack = true;
+                            foundCount++;
+                        }
+                        continue;
+                    }
+                }
+                catch (UnknownHostException e)
+                {
+                    logger_.debug(LogUtil.throwableToString(e));
+                }
 
-    public EndPoint[] getStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap)
+            }
+            // If we found N number of nodes we are good. This loop wil just exit. Otherwise just
+            // loop through the list and add until we have N nodes.
+            for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+            {
+                if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+                {
+                    list.add(tokenToEndPointMap.get(tokens.get(i)));
+                    foundCount++;
+                    continue;
+                }
+            }
+            retrofitPorts(list);
+            results.put(key, list.toArray(new EndPoint[0]));
+        }
+        
+        return results;
+    }
+    
+    public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
     {
         throw new UnsupportedOperationException("This operation is not currently supported");
     }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java Fri Mar 27 06:45:19 2009
@@ -1,14 +1,18 @@
 package org.apache.cassandra.locator;
 
+import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
 
 
 /**
@@ -21,24 +25,25 @@
 {   
     /* Use this flag to check if initialization is in order. */
     private AtomicBoolean initialized_ = new AtomicBoolean(false);
-
+    private Map<Range, List<EndPoint>> rangeToEndPointMap_;
+    
     public RackUnawareStrategy(TokenMetadata tokenMetadata)
     {
         super(tokenMetadata);
     }
     
-    public EndPoint[] getStorageEndPoints(Token token)
+    public EndPoint[] getStorageEndPoints(BigInteger token)
     {
         return getStorageEndPoints(token, tokenMetadata_.cloneTokenEndPointMap());            
     }
     
-    public EndPoint[] getStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap)
+    public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
     {
         int startIndex = 0 ;
         List<EndPoint> list = new ArrayList<EndPoint>();
         int foundCount = 0;
         int N = DatabaseDescriptor.getReplicationFactor();
-        List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
+        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
         Collections.sort(tokens);
         int index = Collections.binarySearch(tokens, token);
         if(index < 0)
@@ -66,5 +71,84 @@
         retrofitPorts(list);
         return list.toArray(new EndPoint[0]);
     }
-
+    
+    private void doInitialization()
+    {
+        if ( !initialized_.get() )
+        {
+            /* construct the mapping from the ranges to the replicas responsible for them */
+            rangeToEndPointMap_ = StorageService.instance().getRangeToEndPointMap();            
+            initialized_.set(true);
+        }
+    }
+    
+    /**
+     * This method determines which range in the array actually contains
+     * the hash of the key
+     * @param ranges
+     * @param key
+     * @return
+     */
+    private int findRangeIndexForKey(Range[] ranges, String key)
+    {
+        int index = 0;
+        BigInteger hash = StorageService.hash(key);
+        for ( int i = 0; i < ranges.length; ++i )
+        {
+            if ( ranges[i].contains(hash) )
+            {
+                index = i;
+                break;
+            }
+        }
+        
+        return index;
+    }
+    
+    public Map<String, EndPoint[]> getStorageEndPoints(String[] keys)
+    {              
+        Arrays.sort(keys);
+        Range[] ranges = StorageService.instance().getAllRanges();
+        
+    	Map<String, EndPoint[]> results = new HashMap<String, EndPoint[]>();
+    	List<EndPoint> list = new ArrayList<EndPoint>();
+    	int startIndex = 0 ;
+    	int foundCount = 0;
+    	
+    	Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+    	int N = DatabaseDescriptor.getReplicationFactor();
+        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+        Collections.sort(tokens);
+        for ( String key : keys )
+        {
+        	BigInteger token = StorageService.hash(key);
+        	int index = Collections.binarySearch(tokens, token);
+            if(index < 0)
+            {
+                index = (index + 1) * (-1);
+                if (index >= tokens.size())
+                    index = 0;
+            }
+            int totalNodes = tokens.size();
+            // Add the node at the index by default
+            list.add(tokenToEndPointMap.get(tokens.get(index)));
+            foundCount++;
+            startIndex = (index + 1)%totalNodes;
+            // If we found N number of nodes we are good. This loop will just exit. Otherwise just
+            // loop through the list and add until we have N nodes.
+            for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+            {
+                if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+                {
+                    list.add(tokenToEndPointMap.get(tokens.get(i)));
+                    foundCount++;
+                    continue;
+                }
+            }
+            retrofitPorts(list);
+            results.put(key, list.toArray(new EndPoint[0]));
+        }
+        
+        return results;
+    }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java Fri Mar 27 06:45:19 2009
@@ -18,13 +18,18 @@
 
 package org.apache.cassandra.locator;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
 import org.apache.cassandra.net.EndPoint;
 
 
@@ -34,19 +39,29 @@
 
 public class TokenMetadata
 {
-    /* Maintains token to endpoint map of every node in the cluster. */
-    private Map<Token, EndPoint> tokenToEndPointMap_ = new HashMap<Token, EndPoint>();
+    private static ICompactSerializer<TokenMetadata> serializer_ = new TokenMetadataSerializer();
+    
+    public static ICompactSerializer<TokenMetadata> serializer()
+    {
+        return serializer_;
+    }
+    
+    /* Maintains token to endpoint map of every node in the cluster. */    
+    private Map<BigInteger, EndPoint> tokenToEndPointMap_ = new HashMap<BigInteger, EndPoint>();    
     /* Maintains a reverse index of endpoint to token in the cluster. */
-    private Map<EndPoint, Token> endPointToTokenMap_ = new HashMap<EndPoint, Token>();
+    private Map<EndPoint, BigInteger> endPointToTokenMap_ = new HashMap<EndPoint, BigInteger>();
     
     /* Use this lock for manipulating the token map */
-    private final ReadWriteLock lock_ = new ReentrantReadWriteLock(true);
-
+    private ReadWriteLock lock_ = new ReentrantReadWriteLock(true);
+    
+    /*
+     * For JAXB purposes. 
+    */
     public TokenMetadata()
     {
     }
-
-    private TokenMetadata(Map<Token, EndPoint> tokenToEndPointMap, Map<EndPoint, Token> endPointToTokenMap)
+    
+    protected TokenMetadata(Map<BigInteger, EndPoint> tokenToEndPointMap, Map<EndPoint, BigInteger> endPointToTokenMap)
     {
         tokenToEndPointMap_ = tokenToEndPointMap;
         endPointToTokenMap_ = endPointToTokenMap;
@@ -54,18 +69,20 @@
     
     public TokenMetadata cloneMe()
     {
-        return new TokenMetadata(cloneTokenEndPointMap(), cloneEndPointTokenMap());
+        Map<BigInteger, EndPoint> tokenToEndPointMap = cloneTokenEndPointMap();
+        Map<EndPoint, BigInteger> endPointToTokenMap = cloneEndPointTokenMap();
+        return new TokenMetadata( tokenToEndPointMap, endPointToTokenMap );
     }
     
     /**
      * Update the two maps in an safe mode. 
     */
-    public void update(Token token, EndPoint endpoint)
+    public void update(BigInteger token, EndPoint endpoint)
     {
         lock_.writeLock().lock();
         try
         {            
-            Token oldToken = endPointToTokenMap_.get(endpoint);
+            BigInteger oldToken = endPointToTokenMap_.get(endpoint);
             if ( oldToken != null )
                 tokenToEndPointMap_.remove(oldToken);
             tokenToEndPointMap_.put(token, endpoint);
@@ -86,7 +103,7 @@
         lock_.writeLock().lock();
         try
         {            
-            Token oldToken = endPointToTokenMap_.get(endpoint);
+            BigInteger oldToken = endPointToTokenMap_.get(endpoint);
             if ( oldToken != null )
                 tokenToEndPointMap_.remove(oldToken);            
             endPointToTokenMap_.remove(endpoint);
@@ -97,7 +114,7 @@
         }
     }
     
-    public Token getToken(EndPoint endpoint)
+    public BigInteger getToken(EndPoint endpoint)
     {
         lock_.readLock().lock();
         try
@@ -126,12 +143,12 @@
     /*
      * Returns a safe clone of tokenToEndPointMap_.
     */
-    public Map<Token, EndPoint> cloneTokenEndPointMap()
+    public Map<BigInteger, EndPoint> cloneTokenEndPointMap()
     {
         lock_.readLock().lock();
         try
         {            
-            return new HashMap<Token, EndPoint>( tokenToEndPointMap_ );
+            return new HashMap<BigInteger, EndPoint>( tokenToEndPointMap_ );
         }
         finally
         {
@@ -142,12 +159,12 @@
     /*
      * Returns a safe clone of endPointTokenMap_.
     */
-    public Map<EndPoint, Token> cloneEndPointTokenMap()
+    public Map<EndPoint, BigInteger> cloneEndPointTokenMap()
     {
         lock_.readLock().lock();
         try
         {            
-            return new HashMap<EndPoint, Token>( endPointToTokenMap_ );
+            return new HashMap<EndPoint, BigInteger>( endPointToTokenMap_ );
         }
         finally
         {
@@ -171,3 +188,51 @@
         return sb.toString();
     }
 }
+
+class TokenMetadataSerializer implements ICompactSerializer<TokenMetadata>
+{
+    public void serialize(TokenMetadata tkMetadata, DataOutputStream dos) throws IOException
+    {        
+        Map<BigInteger, EndPoint> tokenToEndPointMap = tkMetadata.cloneTokenEndPointMap();
+        Set<BigInteger> tokens = tokenToEndPointMap.keySet();
+        /* write the size */
+        dos.writeInt(tokens.size());        
+        for ( BigInteger token : tokens )
+        {
+            byte[] bytes = token.toByteArray();
+            /* Convert the BigInteger to byte[] and persist */
+            dos.writeInt(bytes.length);
+            dos.write(bytes); 
+            /* Write the endpoint out */
+            CompactEndPointSerializationHelper.serialize(tokenToEndPointMap.get(token), dos);
+        }
+    }
+    
+    public TokenMetadata deserialize(DataInputStream dis) throws IOException
+    {
+        TokenMetadata tkMetadata = null;
+        int size = dis.readInt();
+        
+        if ( size > 0 )
+        {
+            Map<BigInteger, EndPoint> tokenToEndPointMap = new HashMap<BigInteger, EndPoint>();
+            Map<EndPoint, BigInteger> endPointToTokenMap = new HashMap<EndPoint, BigInteger>();
+            
+            for ( int i = 0; i < size; ++i )
+            {
+                /* Read the byte[] and convert to BigInteger */
+                byte[] bytes = new byte[dis.readInt()];
+                dis.readFully(bytes);
+                BigInteger token = new BigInteger(bytes);
+                /* Read the endpoint out */
+                EndPoint endpoint = CompactEndPointSerializationHelper.deserialize(dis);
+                tokenToEndPointMap.put(token, endpoint);
+                endPointToTokenMap.put(endpoint, token);
+            }
+            
+            tkMetadata = new TokenMetadata( tokenToEndPointMap, endPointToTokenMap );
+        }
+        
+        return tkMetadata;
+    }
+}

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/Message.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/Message.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/Message.java Fri Mar 27 06:45:19 2009
@@ -53,20 +53,27 @@
     Header header_;
     private Object[] body_ = new Object[0];
     
-    protected Message(String id, EndPoint from, String messageType, String verb, Object... body)
+    /* Ctor for JAXB. DO NOT DELETE */
+    private Message()
     {
-        this(new Header(id, from, messageType, verb), body);
+    }
+
+    protected Message(String id, EndPoint from, String messageType, String verb, Object[] body)
+    {
+        header_ = new Header(id, from, messageType, verb);
+        body_ = body;
     }
     
-    protected Message(Header header, Object... body)
+    protected Message(Header header, Object[] body)
     {
         header_ = header;
         body_ = body;
     }
 
-    public Message(EndPoint from, String messageType, String verb, Object... body)
+    public Message(EndPoint from, String messageType, String verb, Object[] body)
     {
-        this(new Header(from, messageType, verb), body);
+        header_ = new Header(from, messageType, verb);
+        body_ = body;
     }    
     
     public byte[] getHeader(Object key)
@@ -157,7 +164,7 @@
         header_.setMessageId(id);
     }    
 
-    public Message getReply(EndPoint from, Object... args)
+    public Message getReply(EndPoint from, Object[] args)
     {        
         Message response = new Message(getMessageId(),
                                        from,

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/test/DBTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/test/DBTest.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/test/DBTest.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/test/DBTest.java Fri Mar 27 06:45:19 2009
@@ -109,7 +109,7 @@
             System.out.println(scanner.next().name());
         }             
     }
-
+    
     public static void doTest()
     {
         String host = "insearch00";
@@ -182,6 +182,7 @@
         //doWrites();
         //doRead("543");
         
+        DatabaseDescriptor.init();
         DBTest.doTest();
     }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java Fri Mar 27 06:45:19 2009
@@ -20,8 +20,12 @@
 
 import com.facebook.thrift.transport.TTransport;
 import com.facebook.thrift.transport.TSocket;
+import com.facebook.thrift.transport.THttpClient;
+import com.facebook.thrift.transport.TFramedTransport;
 import com.facebook.thrift.protocol.TBinaryProtocol;
-
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.service.*;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
@@ -29,17 +33,25 @@
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.StringReader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.StringTokenizer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.IColumn;
@@ -58,7 +70,8 @@
 import org.apache.cassandra.service.batch_mutation_t;
 import org.apache.cassandra.service.column_t;
 import org.apache.cassandra.service.superColumn_t;
-
+import org.apache.cassandra.utils.BasicUtilities;
+import org.apache.cassandra.utils.LogUtil;
 import org.apache.log4j.Logger;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.Token;
@@ -894,7 +907,7 @@
                 Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
 				errorCount_++;
 			} else {
-				Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+				Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
 				if (cfMap == null || cfMap.size() == 0) {
 					logger_
 							.debug("ERROR ColumnFamil map is missing.....: "

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/test/SSTableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/test/SSTableTest.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/test/SSTableTest.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/test/SSTableTest.java Fri Mar 27 06:45:19 2009
@@ -20,13 +20,22 @@
 
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.PrimaryKey;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.service.PartitionerType;
 import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FBUtilities;
 
 
 public class SSTableTest
@@ -44,14 +53,79 @@
             ColumnFamily cf = new ColumnFamily("Test", "Standard");
             bufOut.reset();           
             // random.nextBytes(bytes);
-            cf.addColumn("C", "Avinash Lakshman is a good man".getBytes(), i);
-            ColumnFamily.serializerWithIndexes().serialize(cf, bufOut);
+            cf.createColumn("C", "Avinash Lakshman is a good man".getBytes(), i);
+            ColumnFamily.serializer2().serialize(cf, bufOut);
             ssTable.append(key, bufOut);            
             bf.fill(key);
         }
         ssTable.close(bf);
     }
     
+    private static void hashSSTableWrite() throws Throwable
+    {        
+        Map<String, ColumnFamily> columnFamilies = new HashMap<String, ColumnFamily>();                
+        byte[] bytes = new byte[64*1024];
+        Random random = new Random();
+        for ( int i = 100; i < 1000; ++i )
+        {
+            String key = Integer.toString(i);
+            ColumnFamily cf = new ColumnFamily("Test", "Standard");                      
+            // random.nextBytes(bytes);
+            cf.createColumn("C", "Avinash Lakshman is a good man".getBytes(), i);
+            columnFamilies.put(key, cf);
+        } 
+        flushForRandomPartitioner(columnFamilies);
+    }
+    
+    private static void flushForRandomPartitioner(Map<String, ColumnFamily> columnFamilies) throws Throwable
+    {
+        SSTable ssTable = new SSTable("C:\\Engagements\\Cassandra", "Table-Test-1", PartitionerType.RANDOM);
+        /* List of primary keys in sorted order */
+        List<PrimaryKey> pKeys = PrimaryKey.create( columnFamilies.keySet() );
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        /* Use this BloomFilter to decide if a key exists in a SSTable */
+        BloomFilter bf = new BloomFilter(pKeys.size(), 15);
+        for ( PrimaryKey pKey : pKeys )
+        {
+            buffer.reset();
+            ColumnFamily columnFamily = columnFamilies.get(pKey.key());
+            if ( columnFamily != null )
+            {
+                /* serialize the cf with column indexes */
+                ColumnFamily.serializer2().serialize( columnFamily, buffer );
+                /* Now write the key and value to disk */
+                ssTable.append(pKey.key(), pKey.hash(), buffer);
+                bf.fill(pKey.key());                
+            }
+        }
+        ssTable.close(bf);
+    }
+    
+    private static void readSSTable() throws Throwable
+    {
+        SSTable ssTable = new SSTable("C:\\Engagements\\Cassandra\\Table-Test-1-Data.db");  
+        for ( int i = 100; i < 1000; ++i )
+        {
+            String key = Integer.toString(i);            
+            DataInputBuffer bufIn = ssTable.next(key, "Test:C");
+            ColumnFamily cf = ColumnFamily.serializer().deserialize(bufIn);
+            if ( cf != null )
+            {            
+                System.out.println("KEY:" + key);
+                System.out.println(cf.name());
+                Collection<IColumn> columns = cf.getAllColumns();
+                for ( IColumn column : columns )
+                {
+                    System.out.println(column.name());
+                }
+            }
+            else
+            {
+                System.out.println("CF doesn't exist for key " + key);
+            }                             
+        }
+    }
+    
     public static void main(String[] args) throws Throwable
     {
         BloomFilter bf = new BloomFilter(1024*1024, 15);

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleaner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleaner.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleaner.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleaner.java Fri Mar 27 06:45:19 2009
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.tools;
 
+import java.io.BufferedInputStream;
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -26,6 +27,7 @@
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.Serializable;
+import java.math.BigInteger;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.io.ICompactSerializer;
@@ -34,6 +36,8 @@
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
 
 public class MembershipCleaner
 {

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java Fri Mar 27 06:45:19 2009
@@ -18,20 +18,25 @@
 
 package org.apache.cassandra.tools;
 
+import java.util.*;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
+import java.math.BigInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tools.TokenUpdater.TokenInfoMessage;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.config.*;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java Fri Mar 27 06:45:19 2009
@@ -18,24 +18,24 @@
 
 package org.apache.cassandra.tools;
 
+import java.util.*;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
+import java.math.BigInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tools.TokenUpdater.TokenInfoMessage;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.config.*;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -54,8 +54,9 @@
             DataInputBuffer bufIn = new DataInputBuffer();
             bufIn.reset(body, body.length);
             /* Deserialize to get the token for this endpoint. */
-            Token token = Token.serializer().deserialize(bufIn);
-
+            TokenUpdater.TokenInfoMessage tiMessage = TokenUpdater.TokenInfoMessage.serializer().deserialize(bufIn);
+            
+            BigInteger token = tiMessage.getToken();
             logger_.info("Updating the token to [" + token + "]");
             StorageService.instance().updateToken(token);
             
@@ -65,19 +66,19 @@
             logger_.debug("Number of nodes in the header " + headers.size());
             Set<String> nodes = headers.keySet();
             
-            IPartitioner p = StorageService.getPartitioner();
             for ( String node : nodes )
             {            
                 logger_.debug("Processing node " + node);
                 byte[] bytes = headers.remove(node);
                 /* Send a message to this node to update its token to the one retreived. */
                 EndPoint target = new EndPoint(node, DatabaseDescriptor.getStoragePort());
-                token = p.getTokenFactory().fromByteArray(bytes);
+                token = new BigInteger(bytes);
                 
-                /* Reset the new Message */
+                /* Reset the new TokenInfoMessage */
+                tiMessage = new TokenUpdater.TokenInfoMessage(target, token );
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(bos);
-                Token.serializer().serialize(token, dos);
+                TokenInfoMessage.serializer().serialize(tiMessage, dos);
                 message.setMessageBody(new Object[]{bos.toByteArray()});
                 
                 logger_.debug("Sending a token update message to " + target + " to update it to " + token);

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdater.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdater.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdater.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdater.java Fri Mar 27 06:45:19 2009
@@ -18,19 +18,26 @@
 
 package org.apache.cassandra.tools;
 
+import java.io.BufferedInputStream;
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileInputStream;
+import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
 
 public class TokenUpdater
 {
@@ -46,17 +53,16 @@
         }
         
         String ipPort = args[0];
-        IPartitioner p = StorageService.getPartitioner();
-        Token token = p.getTokenFactory().fromString(args[1]);
+        String token = args[1];
         String file = args[2];
         
         String[] ipPortPair = ipPort.split(":");
         EndPoint target = new EndPoint(ipPortPair[0], Integer.valueOf(ipPortPair[1]));
-
+        TokenInfoMessage tiMessage = new TokenInfoMessage( target, new BigInteger(token) );
+        
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
-        Token.serializer().serialize(token, dos);
-
+        TokenInfoMessage.serializer().serialize(tiMessage, dos);
         /* Construct the token update message to be sent */
         Message tokenUpdateMessage = new Message( new EndPoint(FBUtilities.getHostName(), port_), "", StorageService.tokenVerbHandler_, new Object[]{bos.toByteArray()} );
         
@@ -67,8 +73,8 @@
         {
             String[] nodeTokenPair = line.split(" ");
             /* Add the node and the token pair into the header of this message. */
-            Token nodeToken = p.getTokenFactory().fromString(nodeTokenPair[1]);
-            tokenUpdateMessage.addHeader(nodeTokenPair[0], p.getTokenFactory().toByteArray(nodeToken));
+            BigInteger nodeToken = new BigInteger(nodeTokenPair[1]);
+            tokenUpdateMessage.addHeader(nodeTokenPair[0], nodeToken.toByteArray());
         }
         
         System.out.println("Sending a token update message to " + target);
@@ -76,5 +82,64 @@
         Thread.sleep(TokenUpdater.waitTime_);
         System.out.println("Done sending the update message");
     }
+    
+    public static class TokenInfoMessage implements Serializable
+    {
+        private static ICompactSerializer<TokenInfoMessage> serializer_;
+        private static AtomicInteger idGen_ = new AtomicInteger(0);
+        
+        static
+        {
+            serializer_ = new TokenInfoMessageSerializer();            
+        }
+        
+        static ICompactSerializer<TokenInfoMessage> serializer()
+        {
+            return serializer_;
+        }
 
+        private EndPoint target_;
+        private BigInteger token_;
+        
+        TokenInfoMessage(EndPoint target, BigInteger token)
+        {
+            target_ = target;
+            token_ = token;
+        }
+        
+        EndPoint getTarget()
+        {
+            return target_;
+        }
+        
+        BigInteger getToken()
+        {
+            return token_;
+        }
+    }
+    
+    public static class TokenInfoMessageSerializer implements ICompactSerializer<TokenInfoMessage>
+    {
+        public void serialize(TokenInfoMessage tiMessage, DataOutputStream dos) throws IOException
+        {
+            byte[] node = EndPoint.toBytes( tiMessage.getTarget() );
+            dos.writeInt(node.length);
+            dos.write(node);
+            
+            byte[] token = tiMessage.getToken().toByteArray();
+            dos.writeInt( token.length );
+            dos.write(token);
+        }
+        
+        public TokenInfoMessage deserialize(DataInputStream dis) throws IOException
+        {
+            byte[] target = new byte[dis.readInt()];
+            dis.readFully(target);
+            
+            byte[] token = new byte[dis.readInt()];
+            dis.readFully(token);
+            
+            return new TokenInfoMessage(EndPoint.fromBytes(target), new BigInteger(token));
+        }
+    }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/utils/BasicUtilities.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/utils/BasicUtilities.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/utils/BasicUtilities.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/utils/BasicUtilities.java Fri Mar 27 06:45:19 2009
@@ -18,6 +18,10 @@
 
 package org.apache.cassandra.utils;
 
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor; 
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 
 /**

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/utils/FBUtilities.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/utils/FBUtilities.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/utils/FBUtilities.java Fri Mar 27 06:45:19 2009
@@ -18,28 +18,19 @@
 
 package org.apache.cassandra.utils;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.io.UnsupportedEncodingException;
-import java.math.BigInteger;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.zip.Deflater;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
 import java.security.MessageDigest;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.StringTokenizer;
-import java.util.zip.DataFormatException;
-import java.util.zip.Deflater;
-import java.util.zip.Inflater;
-
+import java.io.*;
+import java.net.*;
+import java.nio.channels.SocketChannel;
+import java.nio.ByteBuffer;
+import java.math.BigInteger;
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
  */

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/utils/FastObjectHash.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/utils/FastObjectHash.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/utils/FastObjectHash.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/utils/FastObjectHash.java Fri Mar 27 06:45:19 2009
@@ -187,7 +187,7 @@
         }
         else
         { // already FULL or REMOVED, must probe
-            // compute the double token
+            // compute the double hash
             final int probe = 1 + (hash % (length - 2));
 
             // if the slot we landed on is FULL (but not removed), probe