You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/11 16:26:17 UTC

svn commit: r773573 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ColumnFamilyStore.java db/DBManager.java db/FileStruct.java db/MinorCompactionManager.java io/Coordinate.java io/SSTable.java

Author: jbellis
Date: Mon May 11 14:26:16 2009
New Revision: 773573

URL: http://svn.apache.org/viewvc?rev=773573&view=rev
Log:
clean up compaction code and refactor to allow user-specified threadhold (minimum number of CFs to compact).  patch by jbellis

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DBManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/FileStruct.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Coordinate.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=773573&r1=773572&r2=773573&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon May 11 14:26:16 2009
@@ -55,6 +55,7 @@
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.cassandra.utils.TimedStatsDeque;
+import org.apache.commons.lang.StringUtils;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -62,9 +63,9 @@
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
-    private static int threshHold_ = 4;
-    private static final int bufSize_ = 128*1024*1024;
-    private static int compactionMemoryThreshold_ = 1 << 30;
+    private static int COMPACTION_THRESHOLD = 4; // compact this many sstables at a time
+    private static final int BUFSIZE = 128*1024*1024;
+    private static final int COMPACTION_MEMORY_THRESHOLD = 1 << 30;
     private static Logger logger_ = Logger.getLogger(ColumnFamilyStore.class);
 
     private final String table_;
@@ -717,8 +718,8 @@
         	lock_.writeLock().unlock();
         }
 
-        if ((ssTableSize >= threshHold_ && !isCompacting_.get())
-            || (isCompacting_.get() && ssTableSize % threshHold_ == 0))
+        if ((ssTableSize >= COMPACTION_THRESHOLD && !isCompacting_.get())
+            || (isCompacting_.get() && ssTableSize % COMPACTION_THRESHOLD == 0))
         {
             logger_.debug("Submitting for  compaction ...");
             MinorCompactionManager.instance().submit(ColumnFamilyStore.this);
@@ -731,7 +732,7 @@
         PriorityQueue<FileStruct> pq = new PriorityQueue<FileStruct>();
         if (files.size() > 1 || (ranges != null &&  files.size() > 0))
         {
-            int bufferSize = Math.min( (ColumnFamilyStore.compactionMemoryThreshold_ / files.size()), minBufferSize ) ;
+            int bufferSize = Math.min( (ColumnFamilyStore.COMPACTION_MEMORY_THRESHOLD / files.size()), minBufferSize ) ;
             FileStruct fs = null;
             for (String file : files)
             {
@@ -805,40 +806,47 @@
         return buckets.keySet();
     }
 
+    public void doCompaction() throws IOException
+    {
+        doCompaction(COMPACTION_THRESHOLD);
+    }
+
     /*
      * Break the files into buckets and then compact.
      */
-    void doCompaction() throws IOException
+    public void doCompaction(int threshold) throws IOException
     {
         isCompacting_.set(true);
         List<String> files = new ArrayList<String>(ssTables_);
         try
         {
-	        int count;
-	    	for(List<String> fileList : getCompactionBuckets(files, 50L*1024L*1024L))
+            int count;
+            for (List<String> fileList : getCompactionBuckets(files, 50L * 1024L * 1024L))
             {
-	    		Collections.sort( fileList , new FileNameComparator( FileNameComparator.Ascending));
-	    		if(fileList.size() >= threshHold_ )
-	    		{
-	    			files.clear();
-	    			count = 0;
-	    			for(String file : fileList)
-	    			{
-	    				files.add(file);
-	    				count++;
-	    				if( count == threshHold_ )
-	    					break;
-	    			}
-                    // For each bucket if it has crossed the threshhold do the compaction
-                    // In case of range  compaction merge the counting bloom filters also.
-                    if( count == threshHold_)
-                        doFileCompaction(files, bufSize_);
-	    		}
-	    	}
+                Collections.sort(fileList, new FileNameComparator(FileNameComparator.Ascending));
+                if (fileList.size() < threshold)
+                {
+                    continue;
+                }
+                // For each bucket if it has crossed the threshhold do the compaction
+                // In case of range  compaction merge the counting bloom filters also.
+                files.clear();
+                count = 0;
+                for (String file : fileList)
+                {
+                    files.add(file);
+                    count++;
+                    if (count == threshold)
+                    {
+                        doFileCompaction(files, BUFSIZE);
+                        break;
+                    }
+                }
+            }
         }
         finally
         {
-        	isCompacting_.set(false);
+            isCompacting_.set(false);
         }
     }
 
@@ -876,11 +884,11 @@
         	 {
         		 files = filesInternal;
         	 }
-        	 doFileCompaction(files, bufSize_);
+        	 doFileCompaction(files, BUFSIZE);
         }
-        catch ( Exception ex)
+        catch (IOException ex)
         {
-        	ex.printStackTrace();
+            logger_.error(ex);
         }
         finally
         {
@@ -932,10 +940,6 @@
         {
         	 result = doFileAntiCompaction(files, ranges, target, fileList, null);
         }
-        catch ( Exception ex)
-        {
-        	ex.printStackTrace();
-        }
         finally
         {
         	isCompacting_.set(false);
@@ -958,18 +962,17 @@
     {
         isCompacting_.set(true);
         List<String> files = new ArrayList<String>(ssTables_);
-        for(String file: files)
+        try
         {
-	        try
-	        {
-	        	doCleanup(file);
-	        }
-	        catch ( Exception ex)
-	        {
-	        	ex.printStackTrace();
-	        }
+            for(String file: files)
+            {
+                doCleanup(file);
+            }
+        }
+        finally
+        {
+        	isCompacting_.set(false);
         }
-    	isCompacting_.set(false);
     }
     /**
      * cleans up one particular file by removing keys that this node is not responsible for.
@@ -1048,7 +1051,7 @@
 	                    + expectedRangeFileSize + "   is greater than the safe limit of the disk space available.");
 	            return result;
 	        }
-	        PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, ColumnFamilyStore.bufSize_);
+	        PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, ColumnFamilyStore.BUFSIZE);
 	        if (pq.size() > 0)
 	        {
 	            mergedFileName = getTempFileName();
@@ -1234,18 +1237,11 @@
      * to get the latest data.
      *
      */
-    void  doFileCompaction(List<String> files,  int minBufferSize) throws IOException
+    private void doFileCompaction(List<String> files,  int minBufferSize) throws IOException
     {
-    	String newfile = null;
-        long startTime = System.currentTimeMillis();
-        long totalBytesRead = 0;
-        long totalBytesWritten = 0;
-        long totalkeysRead = 0;
-        long totalkeysWritten = 0;
-        // Calculate the expected compacted filesize
-        long expectedCompactedFileSize = getExpectedCompactedFileSize(files);
-        String compactionFileLocation = DatabaseDescriptor.getCompactionFileLocation(expectedCompactedFileSize);
+        String compactionFileLocation = DatabaseDescriptor.getCompactionFileLocation(getExpectedCompactedFileSize(files));
         // If the compaction file path is null that means we have no space left for this compaction.
+        // try again w/o the largest one.
         if( compactionFileLocation == null )
         {
             String maxFile = getMaxSizeFile( files );
@@ -1253,7 +1249,15 @@
             doFileCompaction(files , minBufferSize);
             return;
         }
+
+        String newfile = null;
+        long startTime = System.currentTimeMillis();
+        long totalBytesRead = 0;
+        long totalBytesWritten = 0;
+        long totalkeysRead = 0;
+        long totalkeysWritten = 0;
         PriorityQueue<FileStruct> pq = initializePriorityQueue(files, null, minBufferSize);
+
         if (pq.size() > 0)
         {
             String mergedFileName = getTempFileName( files );
@@ -1331,7 +1335,7 @@
                         }
                         catch ( Exception ex)
                         {
-                            ex.printStackTrace();
+                            logger_.error("empty sstable file " + filestruct.getFileName(), ex);
                             filestruct.close();
                             continue;
                         }
@@ -1390,10 +1394,10 @@
                 }
                 if ( newfile != null )
                 {
-                    ssTables_.add(newfile);
                     logger_.debug("Inserting bloom filter for file " + newfile);
                     SSTable.storeBloomFilter(newfile, compactedBloomFilter);
-                    totalBytesWritten = (new File(newfile)).length();
+                    ssTables_.add(newfile);
+                    totalBytesWritten += (new File(newfile)).length();
                 }
             }
             finally
@@ -1405,11 +1409,9 @@
                 SSTable.delete(file);
             }
         }
-        logger_.debug("Total time taken for compaction  ..."
-                + (System.currentTimeMillis() - startTime));
-        logger_.debug("Total bytes Read for compaction  ..." + totalBytesRead);
-        logger_.debug("Total bytes written for compaction  ..."
-                + totalBytesWritten + "   Total keys read ..." + totalkeysRead);
+        String format = "Compacted [%s] to %s.  %d/%d bytes for %d/%d keys read/written.  Time: %dms.";
+        long dTime = System.currentTimeMillis() - startTime;
+        logger_.info(String.format(format, StringUtils.join(files, ", "), newfile, totalBytesRead, totalBytesWritten, totalkeysRead, totalkeysWritten, dTime));
     }
 
     public boolean isSuper()

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DBManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DBManager.java?rev=773573&r1=773572&r2=773573&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DBManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DBManager.java Mon May 11 14:26:16 2009
@@ -154,9 +154,4 @@
         }
         return storageMetadata;
     }
-
-    public static void main(String[] args) throws Throwable
-    {
-        DBManager.instance().start();
-    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/FileStruct.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/FileStruct.java?rev=773573&r1=773572&r2=773573&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/FileStruct.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/FileStruct.java Mon May 11 14:26:16 2009
@@ -27,10 +27,13 @@
 import org.apache.cassandra.io.SSTable;
 import org.apache.cassandra.io.Coordinate;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.log4j.Logger;
 
 
 public class FileStruct implements Comparable<FileStruct>, Iterator<String>
 {
+    private static Logger logger = Logger.getLogger(FileStruct.class);
+
     private String key = null; // decorated!
     private boolean exhausted = false;
     private IFileReader reader;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=773573&r1=773572&r2=773573&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java Mon May 11 14:26:16 2009
@@ -72,12 +72,12 @@
 
         FileCompactor(ColumnFamilyStore columnFamilyStore)
         {
-        	columnFamilyStore_ = columnFamilyStore;
+            columnFamilyStore_ = columnFamilyStore;
         }
 
         public void run()
         {
-                logger_.debug("Started  compaction ..."+columnFamilyStore_.columnFamily_);
+            logger_.debug("Started compaction ..." + columnFamilyStore_.columnFamily_);
             try
             {
                 columnFamilyStore_.doCompaction();
@@ -86,7 +86,7 @@
             {
                 throw new RuntimeException(e);
             }
-            logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
+            logger_.debug("Finished compaction ..." + columnFamilyStore_.columnFamily_);
         }
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Coordinate.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Coordinate.java?rev=773573&r1=773572&r2=773573&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Coordinate.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Coordinate.java Mon May 11 14:26:16 2009
@@ -31,4 +31,12 @@
         start_ = start;
         end_ = end;
     }
+
+    public String toString()
+    {
+        return "Coordinate(" +
+               "start_=" + start_ +
+               ", end_=" + end_ +
+               ')';
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=773573&r1=773572&r2=773573&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Mon May 11 14:26:16 2009
@@ -221,17 +221,11 @@
         }
         
         File file = new File(dataFile);
-        if ( file.exists() )
+        assert file.exists();
+        /* delete the data file */
+        if (!file.delete())
         {
-            /* delete the data file */
-			if (file.delete())
-			{			    
-			    logger_.info("** Deleted " + file.getName() + " **");                
-			}
-			else
-			{			  
-			    logger_.error("Failed to delete " + file.getName());
-			}
+            logger_.error("Failed to delete " + file.getName());
         }
     }
 
@@ -600,9 +594,8 @@
      * @throws IOException
      */
     private void dumpBlockIndexes() throws IOException
-    {    	
-        long position = dataWriter_.getCurrentPosition();
-        firstBlockPosition_ = position;
+    {
+        firstBlockPosition_ = dataWriter_.getCurrentPosition();
     	for( SortedMap<String, BlockMetadata> block : blockIndexes_ )
     	{
     		dumpBlockIndex( block );
@@ -660,12 +653,16 @@
         afterAppend(decoratedKey, currentPosition, value.length );
     }
 
+    /*
+      TODO only the end_ part of the returned Coordinate is ever used.  Apparently this code works, but it's definitely due for some cleanup
+      since the code fooling about with start_ appears to be irrelevant.
+     */
     public static Coordinate getCoordinates(String decoratedKey, IFileReader dataReader, IPartitioner partitioner) throws IOException
     {
     	List<KeyPositionInfo> indexInfo = indexMetadataMap_.get(dataReader.getFileName());
     	int size = (indexInfo == null) ? 0 : indexInfo.size();
     	long start = 0L;
-    	long end = dataReader.getEOF();
+    	long end;
         if ( size > 0 )
         {
             int index = Collections.binarySearch(indexInfo, new KeyPositionInfo(decoratedKey, partitioner));