You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/13 05:16:59 UTC

svn commit: r890022 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/tools/ test/unit/org/apache/cassandra/db/ test/unit/org/apache/cassandra/service/

Author: jbellis
Date: Sun Dec 13 04:16:59 2009
New Revision: 890022

URL: http://svn.apache.org/viewvc?rev=890022&view=rev
Log:
clean up CompactionManager.  patch by jbellis

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=890022&r1=890021&r2=890022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sun Dec 13 04:16:59 2009
@@ -245,7 +245,7 @@
         ssTables_.onStart(sstables);
 
         // submit initial check-for-compaction request
-        CompactionManager.instance().submit(ColumnFamilyStore.this);
+        CompactionManager.instance.submitMinor(ColumnFamilyStore.this);
 
         // schedule hinted handoff
         if (table_.equals(Table.SYSTEM_TABLE) && columnFamily_.equals(HintedHandOffManager.HINTS_CF))
@@ -298,7 +298,7 @@
     List<SSTableReader> forceAntiCompaction(Collection<Range> ranges, InetAddress target)
     {
         assert ranges != null;
-        Future<List<SSTableReader>> futurePtr = CompactionManager.instance().submitAnti(ColumnFamilyStore.this,
+        Future<List<SSTableReader>> futurePtr = CompactionManager.instance.submitAnti(ColumnFamilyStore.this,
                                                                                         ranges, target);
 
         List<SSTableReader> result;
@@ -591,7 +591,7 @@
     public void addSSTable(SSTableReader sstable)
     {
         ssTables_.add(sstable);
-        CompactionManager.instance().submit(this);
+        CompactionManager.instance.submitMinor(this);
     }
 
     /*
@@ -738,7 +738,7 @@
 
     void forceCleanup()
     {
-        CompactionManager.instance().submitCleanup(ColumnFamilyStore.this);
+        CompactionManager.instance.submitCleanup(ColumnFamilyStore.this);
     }
 
     /**
@@ -939,7 +939,7 @@
         ssTables_.add(ssTable);
         ssTables_.markCompacted(sstables);
         gcAfterRpcTimeout();
-        CompactionManager.instance().submit(ColumnFamilyStore.this);
+        CompactionManager.instance.submitMinor(ColumnFamilyStore.this);
 
         String format = "Compacted to %s.  %d/%d bytes for %d keys.  Time: %dms.";
         long dTime = System.currentTimeMillis() - startTime;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=890022&r1=890021&r2=890022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Sun Dec 13 04:16:59 2009
@@ -25,10 +25,7 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
+import javax.management.*;
 
 import org.apache.log4j.Logger;
 
@@ -41,154 +38,25 @@
 {
     public static final String MBEAN_OBJECT_NAME = "org.apache.cassandra.db:type=CompactionManager";
     private static final Logger logger = Logger.getLogger(CompactionManager.class);
-    private static volatile CompactionManager instance;
+    public static final CompactionManager instance;
 
     private int minimumCompactionThreshold = 4; // compact this many sstables min at a time
     private int maximumCompactionThreshold = 32; // compact this many sstables max at a time
 
-    public static CompactionManager instance()
+    static
     {
-        if (instance == null)
+        instance = new CompactionManager();
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
         {
-            synchronized (CompactionManager.class)
-            {
-                try
-                {
-                    if (instance == null)
-                    {
-                        instance = new CompactionManager();
-                        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-                        mbs.registerMBean(instance, new ObjectName(MBEAN_OBJECT_NAME));
-                    }
-                }
-                catch (Exception e)
-                {
-                    throw new RuntimeException(e);
-                }
-            }
-        }
-        return instance;
-    }
-
-    static abstract class Compactor<T> implements Callable<T>
-    {
-        protected final ColumnFamilyStore cfstore;
-        public Compactor(ColumnFamilyStore columnFamilyStore)
-        {
-            cfstore = columnFamilyStore;
-        }
-
-        abstract T compact() throws IOException;
-        
-        public T call()
-        {
-        	T results;
-            if (logger.isDebugEnabled())
-                logger.debug("Starting " + this + ".");
-            try
-            {
-                results = compact();
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-            if (logger.isDebugEnabled())
-                logger.debug("Finished " + this + ".");
-            return results;
-        }
-
-        @Override
-        public String toString()
-        {
-            StringBuilder buff = new StringBuilder();
-            buff.append("<").append(getClass().getSimpleName());
-            buff.append(" for ").append(cfstore).append(">");
-            return buff.toString();
-        }
-    }
-
-    static class AntiCompactor extends Compactor<List<SSTableReader>>
-    {
-        private final Collection<Range> ranges;
-        private final InetAddress target;
-        AntiCompactor(ColumnFamilyStore cfstore, Collection<Range> ranges, InetAddress target)
-        {
-            super(cfstore);
-            this.ranges = ranges;
-            this.target = target;
-        }
-
-        public List<SSTableReader> compact() throws IOException
-        {
-            return cfstore.doAntiCompaction(ranges, target);
-        }
-    }
-
-    static class OnDemandCompactor extends Compactor<Object>
-    {
-        private final long skip;
-        OnDemandCompactor(ColumnFamilyStore cfstore, long skip)
-        {
-            super(cfstore);
-            this.skip = skip;
-        }
-
-        public Object compact() throws IOException
-        {
-            cfstore.doMajorCompaction(skip);
-            return this;
-        }
-    }
-
-    static class CleanupCompactor extends Compactor<Object>
-    {
-        CleanupCompactor(ColumnFamilyStore cfstore)
-        {
-            super(cfstore);
+            mbs.registerMBean(instance, new ObjectName(MBEAN_OBJECT_NAME));
         }
-
-        public Object compact() throws IOException
+        catch (Exception e)
         {
-            cfstore.doCleanupCompaction();
-            return this;
+            throw new RuntimeException(e);
         }
     }
-    
-    static class MinorCompactor extends Compactor<Integer>
-    {
-        private final int minimum;
-        private final int maximum;
-        MinorCompactor(ColumnFamilyStore cfstore, int minimumThreshold, int maximumThreshold)
-        {
-            super(cfstore);
-            minimum = minimumThreshold;
-            maximum = maximumThreshold;
-        }
 
-        public Integer compact() throws IOException
-        {
-            return cfstore.doCompaction(minimum, maximum);
-        }
-    }
-
-    static class ReadonlyCompactor extends Compactor<Object>
-    {
-        private final InetAddress initiator;
-        ReadonlyCompactor(ColumnFamilyStore cfstore, InetAddress initiator)
-        {
-            super(cfstore);
-            this.initiator = initiator;
-        }
-
-        public Object compact() throws IOException
-        {
-            cfstore.doReadonlyCompaction(initiator);
-            return this;
-        }
-    }
-
-    
     private ExecutorService compactor_ = new DebuggableThreadPoolExecutor("COMPACTION-POOL");
 
     /**
@@ -196,34 +64,72 @@
      * It's okay to over-call (within reason) since the compactions are single-threaded,
      * and if a call is unnecessary, it will just be no-oped in the bucketing phase.
      */
-    public Future<Integer> submit(final ColumnFamilyStore columnFamilyStore)
+    public Future<Integer> submitMinor(final ColumnFamilyStore columnFamilyStore)
     {
-        return submit(columnFamilyStore, minimumCompactionThreshold, maximumCompactionThreshold);
+        return submitMinor(columnFamilyStore, minimumCompactionThreshold, maximumCompactionThreshold);
     }
 
-    Future<Integer> submit(final ColumnFamilyStore columnFamilyStore, final int minThreshold, final int maxThreshold)
+    Future<Integer> submitMinor(final ColumnFamilyStore cfStore, final int minThreshold, final int maxThreshold)
     {
-        return compactor_.submit(new MinorCompactor(columnFamilyStore, minThreshold, maxThreshold));
+        Callable<Integer> callable = new Callable<Integer>()
+        {
+            public Integer call() throws IOException
+            {
+                return cfStore.doCompaction(minThreshold, maxThreshold);
+            }
+        };
+        return compactor_.submit(callable);
     }
 
-    public Future submitCleanup(ColumnFamilyStore columnFamilyStore)
+    public Future<Object> submitCleanup(final ColumnFamilyStore cfStore)
     {
-        return compactor_.submit(new CleanupCompactor(columnFamilyStore));
+        Callable<Object> runnable = new Callable<Object>()
+        {
+            public Object call() throws IOException
+            {
+                cfStore.doCleanupCompaction();
+                return this;
+            }
+        };
+        return compactor_.submit(runnable);
     }
 
-    public Future<List<SSTableReader>> submitAnti(ColumnFamilyStore columnFamilyStore, Collection<Range> ranges, InetAddress target)
+    public Future<List<SSTableReader>> submitAnti(final ColumnFamilyStore cfStore, final Collection<Range> ranges, final InetAddress target)
     {
-        return compactor_.submit(new AntiCompactor(columnFamilyStore, ranges, target));
+        Callable<List<SSTableReader>> callable = new Callable<List<SSTableReader>>()
+        {
+            public List<SSTableReader> call() throws IOException
+            {
+                return cfStore.doAntiCompaction(ranges, target);
+            }
+        };
+        return compactor_.submit(callable);
     }
 
-    public Future submitMajor(ColumnFamilyStore columnFamilyStore, long skip)
+    public Future submitMajor(final ColumnFamilyStore cfStore, final long skip)
     {
-        return compactor_.submit(new OnDemandCompactor(columnFamilyStore, skip));
+        Callable<Object> callable = new Callable<Object>()
+        {
+            public Object call() throws IOException
+            {
+                cfStore.doMajorCompaction(skip);
+                return this;
+            }
+        };
+        return compactor_.submit(callable);
     }
 
-    public Future submitReadonly(ColumnFamilyStore columnFamilyStore, InetAddress initiator)
+    public Future submitReadonly(final ColumnFamilyStore cfStore, final InetAddress initiator)
     {
-        return compactor_.submit(new ReadonlyCompactor(columnFamilyStore, initiator));
+        Callable<Object> callable = new Callable<Object>()
+        {
+            public Object call() throws IOException
+            {
+                cfStore.doReadonlyCompaction(initiator);
+                return this;
+            }
+        };
+        return compactor_.submit(callable);
     }
 
     /**

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=890022&r1=890021&r2=890022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Sun Dec 13 04:16:59 2009
@@ -340,7 +340,7 @@
         {
             ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
             if ( cfStore != null )
-                CompactionManager.instance().submitMajor(cfStore, 0);
+                CompactionManager.instance.submitMajor(cfStore, 0);
         }
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=890022&r1=890021&r2=890022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Sun Dec 13 04:16:59 2009
@@ -636,7 +636,7 @@
             try
             {
                 List<Range> ranges = new ArrayList<Range>(differences);
-                List<SSTableReader> sstables = CompactionManager.instance().submitAnti(cfstore, ranges, remote).get();
+                List<SSTableReader> sstables = CompactionManager.instance.submitAnti(cfstore, ranges, remote).get();
                 Streaming.transferSSTables(remote, sstables, cf.table);
             }
             catch(Exception e)
@@ -723,7 +723,7 @@
                 // trigger readonly-compaction
                 logger.debug("Queueing readonly compaction for request from " + message.getFrom() + " for " + request);
                 Table table = Table.open(request.table);
-                CompactionManager.instance().submitReadonly(table.getColumnFamilyStore(request.cf), message.getFrom());
+                CompactionManager.instance.submitReadonly(table.getColumnFamilyStore(request.cf), message.getFrom());
             }
             catch (IOException e)
             {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=890022&r1=890021&r2=890022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Sun Dec 13 04:16:59 2009
@@ -639,7 +639,7 @@
                 System.exit(1);
             }
             int minthreshold = Integer.parseInt(arguments[1]);
-            int maxthreshold = CompactionManager.instance().getMaximumCompactionThreshold();
+            int maxthreshold = CompactionManager.instance.getMaximumCompactionThreshold();
             if (arguments.length > 2)
             {
                 maxthreshold = Integer.parseInt(arguments[2]);

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=890022&r1=890021&r2=890022&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java Sun Dec 13 04:16:59 2009
@@ -31,7 +31,6 @@
 
 import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.filter.IdentityQueryFilter;
 import org.apache.cassandra.utils.FBUtilities;
@@ -65,7 +64,7 @@
         }
         while (true)
         {
-            Future<Integer> ft = CompactionManager.instance().submit(store);
+            Future<Integer> ft = CompactionManager.instance.submitMinor(store);
             if (ft.get() == 0)
                 break;
         }
@@ -79,7 +78,7 @@
     @Test
     public void testCompactionPurge() throws IOException, ExecutionException, InterruptedException
     {
-        CompactionManager.instance().disableCompactions();
+        CompactionManager.instance.disableCompactions();
 
         Table table = Table.open(TABLE1);
         String cfName = "Standard1";
@@ -132,7 +131,7 @@
     @Test
     public void testCompactionPurgeOneFile() throws IOException, ExecutionException, InterruptedException
     {
-        CompactionManager.instance().disableCompactions();
+        CompactionManager.instance.disableCompactions();
 
         Table table = Table.open(TABLE1);
         String cfName = "Standard2";

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java?rev=890022&r1=890021&r2=890022&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java Sun Dec 13 04:16:59 2009
@@ -46,7 +46,7 @@
             store.forceBlockingFlush();
             assertEquals(inserted.size(), table.getColumnFamilyStore(columnFamilyName).getKeyRange("", "", 10000).keys.size());
         }
-        Future<Integer> ft = CompactionManager.instance().submit(store, 2, 32);
+        Future<Integer> ft = CompactionManager.instance.submitMinor(store, 2, 32);
         ft.get();
         assertEquals(1, store.getSSTables().size());
         assertEquals(table.getColumnFamilyStore(columnFamilyName).getKeyRange("", "", 10000).keys.size(), inserted.size());

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java?rev=890022&r1=890021&r2=890022&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java Sun Dec 13 04:16:59 2009
@@ -25,7 +25,6 @@
 import java.util.concurrent.ExecutionException;
 import java.util.Set;
 import java.util.HashSet;
-import java.util.Arrays;
 import java.util.Collections;
 
 import org.junit.Test;
@@ -38,7 +37,7 @@
     @Test
     public void testWithFlush() throws IOException, ExecutionException, InterruptedException
     {
-        CompactionManager.instance().disableCompactions();
+        CompactionManager.instance.disableCompactions();
 
         for (int i = 0; i < 100; i++)
         {

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java?rev=890022&r1=890021&r2=890022&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java Sun Dec 13 04:16:59 2009
@@ -59,7 +59,7 @@
         store.forceBlockingFlush();
         validateRemoveTwoSources();
 
-        Future<Integer> ft = CompactionManager.instance().submit(store, 2, 32);
+        Future<Integer> ft = CompactionManager.instance.submitMinor(store, 2, 32);
         ft.get();
         assertEquals(1, store.getSSTables().size());
         validateRemoveCompacted();
@@ -144,7 +144,7 @@
         store.forceBlockingFlush();
         validateRemoveWithNewData();
 
-        Future<Integer> ft = CompactionManager.instance().submit(store, 2, 32);
+        Future<Integer> ft = CompactionManager.instance.submitMinor(store, 2, 32);
         ft.get();
         assertEquals(1, store.getSSTables().size());
         validateRemoveWithNewData();

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=890022&r1=890021&r2=890022&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Sun Dec 13 04:16:59 2009
@@ -18,7 +18,6 @@
 */
 package org.apache.cassandra.service;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.*;
@@ -37,13 +36,11 @@
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.CompactionIterator.CompactedRow;
 import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.io.SSTableReader;
 import static org.apache.cassandra.service.AntiEntropyService.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
 
 import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.io.SSTableUtils;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -162,7 +159,7 @@
         
         // force a major compaction, and wait for it to finish
         MerkleTree old = aes.getCachedTree(tablename, cfname, LOCAL);
-        CompactionManager.instance().submitMajor(store, 0).get(5000, TimeUnit.MILLISECONDS);
+        CompactionManager.instance.submitMajor(store, 0).get(5000, TimeUnit.MILLISECONDS);
 
         // check that a tree was created and cached
         flushAES().get(5000, TimeUnit.MILLISECONDS);