You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/12 01:48:20 UTC

svn commit: r773725 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ColumnFamilyStore.java src/java/org/apache/cassandra/db/MinorCompactionManager.java test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Author: jbellis
Date: Mon May 11 23:48:20 2009
New Revision: 773725

URL: http://svn.apache.org/viewvc?rev=773725&view=rev
Log:
unit test to expose bug system test is running into.  patch by jbellis; reviewed by Jun Rao for CASSANDRA-153

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=773725&r1=773724&r2=773725&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon May 11 23:48:20 2009
@@ -806,22 +806,23 @@
         return buckets.keySet();
     }
 
-    public void doCompaction() throws IOException
+    public int doCompaction() throws IOException
     {
-        doCompaction(COMPACTION_THRESHOLD);
+        return doCompaction(COMPACTION_THRESHOLD);
     }
 
     /*
      * Break the files into buckets and then compact.
      */
-    public void doCompaction(int threshold) throws IOException
+    public int doCompaction(int threshold) throws IOException
     {
         isCompacting_.set(true);
         List<String> files = new ArrayList<String>(ssTables_);
+        int filesCompacted = 0;
         try
         {
-            int count;
-            for (List<String> fileList : getCompactionBuckets(files, 50L * 1024L * 1024L))
+            Set<List<String>> buckets = getCompactionBuckets(files, 50L * 1024L * 1024L);
+            for (List<String> fileList : buckets)
             {
                 Collections.sort(fileList, new FileNameComparator(FileNameComparator.Ascending));
                 if (fileList.size() < threshold)
@@ -831,14 +832,14 @@
                 // For each bucket if it has crossed the threshhold do the compaction
                 // In case of range  compaction merge the counting bloom filters also.
                 files.clear();
-                count = 0;
+                int count = 0;
                 for (String file : fileList)
                 {
                     files.add(file);
                     count++;
                     if (count == threshold)
                     {
-                        doFileCompaction(files, BUFSIZE);
+                        filesCompacted += doFileCompaction(files, BUFSIZE);
                         break;
                     }
                 }
@@ -848,6 +849,7 @@
         {
             isCompacting_.set(false);
         }
+        return filesCompacted;
     }
 
     void doMajorCompaction(long skip)
@@ -1237,7 +1239,7 @@
      * to get the latest data.
      *
      */
-    private void doFileCompaction(List<String> files,  int minBufferSize) throws IOException
+    private int doFileCompaction(List<String> files,  int minBufferSize) throws IOException
     {
         String compactionFileLocation = DatabaseDescriptor.getCompactionFileLocation(getExpectedCompactedFileSize(files));
         // If the compaction file path is null that means we have no space left for this compaction.
@@ -1246,8 +1248,7 @@
         {
             String maxFile = getMaxSizeFile( files );
             files.remove( maxFile );
-            doFileCompaction(files , minBufferSize);
-            return;
+            return doFileCompaction(files , minBufferSize);
         }
 
         String newfile = null;
@@ -1412,6 +1413,7 @@
         String format = "Compacted [%s] to %s.  %d/%d bytes for %d/%d keys read/written.  Time: %dms.";
         long dTime = System.currentTimeMillis() - startTime;
         logger_.info(String.format(format, StringUtils.join(files, ", "), newfile, totalBytesRead, totalBytesWritten, totalkeysRead, totalkeysWritten, dTime));
+        return files.size();
     }
 
     public boolean isSuper()

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=773725&r1=773724&r2=773725&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java Mon May 11 23:48:20 2009
@@ -66,7 +66,7 @@
         return instance_;
     }
 
-    class FileCompactor implements Runnable
+    class FileCompactor implements Callable<Integer>
     {
         private ColumnFamilyStore columnFamilyStore_;
 
@@ -75,18 +75,21 @@
             columnFamilyStore_ = columnFamilyStore;
         }
 
-        public void run()
+        public Integer call()
         {
             logger_.debug("Started compaction ..." + columnFamilyStore_.columnFamily_);
             try
             {
-                columnFamilyStore_.doCompaction();
+                return columnFamilyStore_.doCompaction();
             }
             catch (IOException e)
             {
                 throw new RuntimeException(e);
             }
-            logger_.debug("Finished compaction ..." + columnFamilyStore_.columnFamily_);
+            finally
+            {
+                logger_.debug("Finished compaction ..." + columnFamilyStore_.columnFamily_);
+            }
         }
     }
 
@@ -164,13 +167,20 @@
     	compactor_.shutdownNow();
     }
 
-    public void submitPeriodicCompaction(ColumnFamilyStore columnFamilyStore)
-    {        
-    	compactor_.scheduleWithFixedDelay(new FileCompactor(columnFamilyStore), MinorCompactionManager.intervalInMins_,
+    public void submitPeriodicCompaction(final ColumnFamilyStore columnFamilyStore)
+    {
+        Runnable runnable = new Runnable() // having to wrap Callable in Runnable is retarded but that's what the API insists on.
+        {
+            public void run()
+            {
+                new FileCompactor(columnFamilyStore).call();
+            }
+        };
+    	compactor_.scheduleWithFixedDelay(runnable, MinorCompactionManager.intervalInMins_,
     			MinorCompactionManager.intervalInMins_, TimeUnit.MINUTES);       
     }
 
-    public Future submit(ColumnFamilyStore columnFamilyStore)
+    public Future<Integer> submit(ColumnFamilyStore columnFamilyStore)
     {
         return compactor_.submit(new FileCompactor(columnFamilyStore));
     }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=773725&r1=773724&r2=773725&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Mon May 11 23:48:20 2009
@@ -3,14 +3,7 @@
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.SortedSet;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -393,23 +386,56 @@
     }
 
     @Test
-    public void testCompaction() throws IOException, ExecutionException, InterruptedException
+    public void testOneCompaction() throws IOException, ExecutionException, InterruptedException
     {
         Table table = Table.open("Table1");
         ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
 
-        for (int j = 0; j < 5; j++) {
-            for (int i = 0; i < 10; i++) {
-                long epoch = System.currentTimeMillis()  /  1000;
-                String key = String.format("%s.%s.%s",  epoch,  1,  i);
+        Set<String> inserted = new HashSet<String>();
+        for (int j = 0; j < 2; j++) {
+            String key = "0";
+            RowMutation rm = new RowMutation("Table1", key);
+            rm.add("Standard1:0", new byte[0], j);
+            rm.apply();
+            inserted.add(key);
+            store.forceBlockingFlush();
+            assertEquals(table.getKeyRange("", "", 10000).size(), inserted.size());
+        }
+        store.doCompaction(2);
+        assertEquals(table.getKeyRange("", "", 10000).size(), inserted.size());
+    }
+
+    @Test
+    public void testCompactions() throws IOException, ExecutionException, InterruptedException
+    {
+        // this test does enough rows to force multiple block indexes to be used
+        Table table = Table.open("Table1");
+        ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+
+        final int ROWS_PER_SSTABLE = 10;
+        Set<String> inserted = new HashSet<String>();
+        for (int j = 0; j < (SSTable.indexInterval() * 3) / ROWS_PER_SSTABLE; j++) {
+            for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
+                String key = String.valueOf(i % 2);
                 RowMutation rm = new RowMutation("Table1", key);
-                rm.add("Standard1:A", new byte[0], epoch);
+                rm.add("Standard1:" + (i / 2), new byte[0], j * ROWS_PER_SSTABLE + i);
                 rm.apply();
+                inserted.add(key);
             }
             store.forceBlockingFlush();
+            assertEquals(table.getKeyRange("", "", 10000).size(), inserted.size());
+        }
+        while (true)
+        {
+            Future<Integer> ft = MinorCompactionManager.instance().submit(store);
+            if (ft.get() == 0)
+                break;
+        }
+        if (store.getSSTableFilenames().size() > 1)
+        {
+            store.doCompaction(store.getSSTableFilenames().size());
         }
-        Future ft = MinorCompactionManager.instance().submit(store);
-        ft.get();
+        assertEquals(table.getKeyRange("", "", 10000).size(), inserted.size());
     }
     
     @Test