You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/03/27 03:44:50 UTC

svn commit: r759000 - in /incubator/cassandra/trunk/src/org/apache/cassandra: db/ColumnFamilyStore.java db/FileStruct.java db/FileStructComparator.java io/Coordinate.java io/SSTable.java

Author: jbellis
Date: Fri Mar 27 02:44:49 2009
New Revision: 759000

URL: http://svn.apache.org/viewvc?rev=759000&view=rev
Log:
clean up FileStruct and make it iterable.  (this will be used by range queries.)

Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java
    incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java
    incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=759000&r1=758999&r2=759000&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java Fri Mar 27 02:44:49 2009
@@ -675,13 +675,9 @@
             {
             	try
             	{
-            		fs = new FileStruct();
-	                fs.bufIn_ = new DataInputBuffer();
-	                fs.bufOut_ = new DataOutputBuffer();
-	                fs.reader_ = SequenceFile.bufferedReader(file, bufferSize);                    
-	                fs.key_ = null;
-	                fs = getNextKey(fs);
-	                if(fs == null)
+            		fs = new FileStruct(SequenceFile.bufferedReader(file, bufferSize));
+	                fs.getNextKey();
+	                if(fs.isExhausted())
 	                	continue;
 	                pq.add(fs);
             	}
@@ -690,9 +686,9 @@
             		ex.printStackTrace();
             		try
             		{
-            			if(fs != null)
+            			if (fs != null)
             			{
-            				fs.reader_.close();
+            				fs.close();
             			}
             		}
             		catch(Exception e)
@@ -897,38 +893,6 @@
 
     }
 
-    /*
-     * Read the next key from the data file , this fn will skip teh block index
-     * and read teh next available key into the filestruct that is passed.
-     * If it cannot read or a end of file is reached it will return null.
-     */
-    FileStruct getNextKey(FileStruct filestruct) throws IOException
-    {
-        filestruct.bufOut_.reset();
-        if (filestruct.reader_.isEOF())
-        {
-            filestruct.reader_.close();
-            return null;
-        }
-        
-        long bytesread = filestruct.reader_.next(filestruct.bufOut_);
-        if (bytesread == -1)
-        {
-            filestruct.reader_.close();
-            return null;
-        }
-
-        filestruct.bufIn_.reset(filestruct.bufOut_.getData(), filestruct.bufOut_.getLength());
-        filestruct.key_ = filestruct.bufIn_.readUTF();
-        /* If the key we read is the Block Index Key then we are done reading the keys so exit */
-        if ( filestruct.key_.equals(SSTable.blockIndexKey_) )
-        {
-            filestruct.reader_.close();
-            return null;
-        }
-        return filestruct;
-    }
-
     void forceCleanup()
     {
     	MinorCompactionManager.instance().submitCleanup(ColumnFamilyStore.this);
@@ -1057,11 +1021,11 @@
 	                    fs = pq.poll();
 	                }
 	                if (fs != null
-	                        && (lastkey == null || lastkey.compareTo(fs.key_) == 0))
+	                        && (lastkey == null || lastkey.compareTo(fs.getKey()) == 0))
 	                {
 	                    // The keys are the same so we need to add this to the
 	                    // ldfs list
-	                    lastkey = fs.key_;
+	                    lastkey = fs.getKey();
 	                    lfs.add(fs);
 	                }
 	                else
@@ -1076,9 +1040,9 @@
 		                    	try
 		                    	{
 	                                /* read the length although we don't need it */
-	                                filestruct.bufIn_.readInt();
+	                                filestruct.getBufIn().readInt();
 	                                // Skip the Index
-                                    IndexHelper.skipBloomFilterAndIndex(filestruct.bufIn_);
+                                    IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
 	                                // We want to add only 2 and resolve them right there in order to save on memory footprint
 	                                if(columnFamilies.size() > 1)
 	                                {
@@ -1086,7 +1050,7 @@
                                         merge(columnFamilies);
 	                                }
 			                        // deserialize into column families
-			                        columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.bufIn_));
+			                        columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
 		                    	}
 		                    	catch ( Exception ex)
 		                    	{
@@ -1108,13 +1072,13 @@
 	                    	try
 	                    	{
 		                        /* read the length although we don't need it */
-		                        int size = filestruct.bufIn_.readInt();
-		                        bufOut.write(filestruct.bufIn_, size);
+		                        int size = filestruct.getBufIn().readInt();
+		                        bufOut.write(filestruct.getBufIn(), size);
 	                    	}
 	                    	catch ( Exception ex)
 	                    	{
 	                    		logger_.warn(LogUtil.throwableToString(ex));
-	                            filestruct.reader_.close();
+	                            filestruct.close();
 	                            continue;
 	                    	}
 	                    }
@@ -1142,16 +1106,16 @@
 	                    {
 	                    	try
 	                    	{
-	                    		filestruct = getNextKey	( filestruct );
-	                    		if(filestruct == null)
+                                filestruct.getNextKey();
+	                    		if (filestruct.isExhausted())
 	                    		{
 	                    			continue;
 	                    		}
 	                    		/* keep on looping until we find a key in the range */
-	                            while ( !Range.isKeyInRanges(filestruct.key_, ranges) )
+	                            while ( !Range.isKeyInRanges(filestruct.getKey(), ranges) )
 	                            {
-		                    		filestruct = getNextKey	( filestruct );
-		                    		if(filestruct == null)
+                                    filestruct.getNextKey();
+                                    if (filestruct.isExhausted())
 		                    		{
 		                    			break;
 		                    		}
@@ -1163,7 +1127,7 @@
 	                                    //break;
 	        	                    //}
 	                            }
-	                            if ( filestruct != null)
+	                            if (!filestruct.isExhausted())
 	                            {
 	                            	pq.add(filestruct);
 	                            }
@@ -1175,7 +1139,7 @@
 	                    		// in any case we have read as far as possible from it
 	                    		// and it will be deleted after compaction.
                                 logger_.warn(LogUtil.throwableToString(ex));
-	                            filestruct.reader_.close();
+	                            filestruct.close();
                             }
 	                    }
 	                    lfs.clear();
@@ -1270,11 +1234,11 @@
 	                    fs = pq.poll();                        
 	                }
 	                if (fs != null
-	                        && (lastkey == null || lastkey.compareTo(fs.key_) == 0))
+	                        && (lastkey == null || lastkey.compareTo(fs.getKey()) == 0))
 	                {
 	                    // The keys are the same so we need to add this to the
 	                    // ldfs list
-	                    lastkey = fs.key_;
+	                    lastkey = fs.getKey();
 	                    lfs.add(fs);
 	                }
 	                else
@@ -1289,16 +1253,16 @@
 		                    	try
 		                    	{
 	                                /* read the length although we don't need it */
-	                                filestruct.bufIn_.readInt();
+	                                filestruct.getBufIn().readInt();
 	                                // Skip the Index
-                                    IndexHelper.skipBloomFilterAndIndex(filestruct.bufIn_);
+                                    IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
 	                                // We want to add only 2 and resolve them right there in order to save on memory footprint
 	                                if(columnFamilies.size() > 1)
 	                                {
 	    		                        merge(columnFamilies);
 	                                }
 			                        // deserialize into column families                                    
-			                        columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.bufIn_));
+			                        columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
 		                    	}
 		                    	catch ( Exception ex)
 		                    	{
@@ -1320,13 +1284,13 @@
 	                    	try
 	                    	{
 		                        /* read the length although we don't need it */
-		                        int size = filestruct.bufIn_.readInt();
-		                        bufOut.write(filestruct.bufIn_, size);
+		                        int size = filestruct.getBufIn().readInt();
+		                        bufOut.write(filestruct.getBufIn(), size);
 	                    	}
 	                    	catch ( Exception ex)
 	                    	{
 	                    		ex.printStackTrace();
-	                            filestruct.reader_.close();
+	                            filestruct.close();
 	                            continue;
 	                    	}
 	                    }
@@ -1344,8 +1308,8 @@
 	                    {
 	                    	try
 	                    	{
-	                    		filestruct = getNextKey(filestruct);
-	                    		if(filestruct == null)
+                                filestruct.getNextKey();
+	                    		if (filestruct.isExhausted())
 	                    		{
 	                    			continue;
 	                    		}
@@ -1357,7 +1321,7 @@
 	                    		// Ignore the exception as it might be a corrupted file
 	                    		// in any case we have read as far as possible from it
 	                    		// and it will be deleted after compaction.
-	                            filestruct.reader_.close();
+	                            filestruct.close();
                             }
 	                    }
 	                    lfs.clear();

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java?rev=759000&r1=758999&r2=759000&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java Fri Mar 27 02:44:49 2009
@@ -19,92 +19,189 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
+import java.util.Iterator;
 
+import org.apache.cassandra.io.Coordinate;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.IFileReader;
 import org.apache.cassandra.io.SSTable;
-import org.apache.cassandra.io.SequenceFile;
-import org.apache.cassandra.service.StorageService;
 
 
-public class FileStruct implements Comparable<FileStruct>
+public class FileStruct implements Comparable<FileStruct>, Iterable<String>
 {
-    IFileReader reader_;
-    String key_;        
-    DataInputBuffer bufIn_;
-    DataOutputBuffer bufOut_;
     
-    public FileStruct()
+    private String key = null;
+    private boolean exhausted = false;
+    private IFileReader reader;
+    private DataInputBuffer bufIn;
+    private DataOutputBuffer bufOut;
+
+    public FileStruct(IFileReader reader)
     {
+        this.reader = reader;
+        bufIn = new DataInputBuffer();
+        bufOut = new DataOutputBuffer();
     }
-    
-    public FileStruct(String file, int bufSize) throws IOException
+
+    public String getFileName()
     {
-        bufIn_ = new DataInputBuffer();
-        bufOut_ = new DataOutputBuffer();
-        reader_ = SequenceFile.bufferedReader(file, bufSize);
-        long bytesRead = advance();
-        if ( bytesRead == -1L )
-            throw new IOException("Either the file is empty or EOF has been reached.");          
+        return reader.getFileName();
     }
-    
+
+    public void close() throws IOException
+    {
+        reader.close();
+    }
+
+    public boolean isExhausted()
+    {
+        return exhausted;
+    }
+
+    public DataInputBuffer getBufIn()
+    {
+        return bufIn;
+    }
+
     public String getKey()
     {
-        return key_;
+        return key;
     }
-    
-    public DataOutputBuffer getBuffer()
+
+    public int compareTo(FileStruct f)
     {
-        return bufOut_;
+        return key.compareTo(f.key);
     }
-    
-    public long advance() throws IOException
-    {        
-        long bytesRead = -1L;
-        bufOut_.reset();
-        /* advance and read the next key in the file. */           
-        if (reader_.isEOF())
+
+    // we don't use SequenceReader.seekTo, since that (sometimes) throws an exception
+    // if the key is not found.  unsure if this behavior is desired.
+    public void seekTo(String seekKey)
+    {
+        try
+        {
+            Coordinate range = SSTable.getCoordinates(seekKey, reader);
+            reader.seek(range.end_);
+            long position = reader.getPositionFromBlockIndex(seekKey);
+            if (position == -1)
+            {
+                reader.seek(range.start_);
+            }
+            else
+            {
+                reader.seek(position);
+            }
+
+            while (!exhausted)
+            {
+                getNextKey();
+                if (key.compareTo(seekKey) >= 0)
+                {
+                    break;
+                }
+            }
+        }
+        catch (IOException e)
         {
-            reader_.close();
-            return bytesRead;
+            throw new RuntimeException("corrupt sstable", e);
         }
-            
-        bytesRead = reader_.next(bufOut_);        
-        if (bytesRead == -1)
+    }
+
+    /*
+     * Read the next key from the data file, skipping block indexes.
+     * Caller must check isExhausted after each call to see if further
+     * reads are valid.
+     */
+    public void getNextKey()
+    {
+        if (exhausted)
         {
-            reader_.close();
-            return bytesRead;
+            throw new IndexOutOfBoundsException();
         }
 
-        bufIn_.reset(bufOut_.getData(), bufOut_.getLength());
-        key_ = bufIn_.readUTF();
-        /* If the key we read is the Block Index Key then omit and read the next key. */
-        if ( key_.equals(SSTable.blockIndexKey_) )
+        try
         {
-            bufOut_.reset();
-            bytesRead = reader_.next(bufOut_);
-            if (bytesRead == -1)
+            bufOut.reset();
+            if (reader.isEOF())
+            {
+                reader.close();
+                exhausted = true;
+                return;
+            }
+
+            long bytesread = reader.next(bufOut);
+            if (bytesread == -1)
             {
-                reader_.close();
-                return bytesRead;
+                reader.close();
+                exhausted = true;
+                return;
             }
-            bufIn_.reset(bufOut_.getData(), bufOut_.getLength());
-            key_ = bufIn_.readUTF();
+
+            bufIn.reset(bufOut.getData(), bufOut.getLength());
+            key = bufIn.readUTF();
+            /* If the key we read is the Block Index Key then omit and read the next key. */
+            if (key.equals(SSTable.blockIndexKey_))
+            {
+                bufOut.reset();
+                bytesread = reader.next(bufOut);
+                if (bytesread == -1)
+                {
+                    reader.close();
+                    exhausted = true;
+                    return;
+                }
+                bufIn.reset(bufOut.getData(), bufOut.getLength());
+                key = bufIn.readUTF();
+            }
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
         }
-        
-        return bytesRead;
     }
 
-    public int compareTo(FileStruct f)
+    public Iterator<String> iterator()
     {
-        return StorageService.getPartitioner().getDecoratedKeyComparator().compare(key_, f.key_);
+        return new FileStructIterator();
     }
-    
-    public void close() throws IOException
+
+    private class FileStructIterator implements Iterator<String>
     {
-        bufIn_.close();
-        bufOut_.close();
-        reader_.close();
+        String saved;
+
+        public FileStructIterator()
+        {
+            if (getKey() == null && !isExhausted())
+            {
+                forward();
+            }
+        }
+
+        private void forward()
+        {
+            getNextKey();
+            saved = isExhausted() ? null : getKey();
+        }
+
+        public boolean hasNext()
+        {
+            return saved != null;
+        }
+
+        public String next()
+        {
+            if (saved == null)
+            {
+                throw new IndexOutOfBoundsException();
+            }
+            String key = saved;
+            forward();
+            return key;
+        }
+
+        public void remove()
+        {
+            throw new UnsupportedOperationException();
+        }
     }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java?rev=759000&r1=758999&r2=759000&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java Fri Mar 27 02:44:49 2009
@@ -6,13 +6,6 @@
 {
     public int compare(FileStruct f, FileStruct f2)
     {
-        return f.reader_.getFileName().compareTo(f2.reader_.getFileName());
-    }
-
-    public boolean equals(Object o)
-    {
-        if (!(o instanceof FileStructComparator))
-            return false;
-        return true;
+        return f.getFileName().compareTo(f2.getFileName());
     }
 }
\ No newline at end of file

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java?rev=759000&r1=758999&r2=759000&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java Fri Mar 27 02:44:49 2009
@@ -21,10 +21,10 @@
  * Section of a file that needs to be scanned
  * is represented by this class.
 */
-class Coordinate
+public class Coordinate
 {
-    long start_;
-    long end_;
+    public final long start_;
+    public final long end_;
     
     Coordinate(long start, long end)
     {

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java?rev=759000&r1=758999&r2=759000&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java Fri Mar 27 02:44:49 2009
@@ -653,9 +653,9 @@
         afterAppend(decoratedKey, currentPosition, value.length );
     }
 
-    private Coordinate getCoordinates(String decoratedKey, IFileReader dataReader) throws IOException
+    public static Coordinate getCoordinates(String decoratedKey, IFileReader dataReader) throws IOException
     {
-    	List<KeyPositionInfo> indexInfo = indexMetadataMap_.get(dataFile_);
+    	List<KeyPositionInfo> indexInfo = indexMetadataMap_.get(dataReader.getFileName());
     	int size = (indexInfo == null) ? 0 : indexInfo.size();
     	long start = 0L;
     	long end = dataReader.getEOF();