You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/13 05:16:59 UTC
svn commit: r890022 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/
src/java/org/apache/cassandra/tools/ test/unit/org/apache/cassandra/db/
test/unit/org/apache/cassandra/service/
Author: jbellis
Date: Sun Dec 13 04:16:59 2009
New Revision: 890022
URL: http://svn.apache.org/viewvc?rev=890022&view=rev
Log:
clean up CompactionManager. patch by jbellis
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=890022&r1=890021&r2=890022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sun Dec 13 04:16:59 2009
@@ -245,7 +245,7 @@
ssTables_.onStart(sstables);
// submit initial check-for-compaction request
- CompactionManager.instance().submit(ColumnFamilyStore.this);
+ CompactionManager.instance.submitMinor(ColumnFamilyStore.this);
// schedule hinted handoff
if (table_.equals(Table.SYSTEM_TABLE) && columnFamily_.equals(HintedHandOffManager.HINTS_CF))
@@ -298,7 +298,7 @@
List<SSTableReader> forceAntiCompaction(Collection<Range> ranges, InetAddress target)
{
assert ranges != null;
- Future<List<SSTableReader>> futurePtr = CompactionManager.instance().submitAnti(ColumnFamilyStore.this,
+ Future<List<SSTableReader>> futurePtr = CompactionManager.instance.submitAnti(ColumnFamilyStore.this,
ranges, target);
List<SSTableReader> result;
@@ -591,7 +591,7 @@
public void addSSTable(SSTableReader sstable)
{
ssTables_.add(sstable);
- CompactionManager.instance().submit(this);
+ CompactionManager.instance.submitMinor(this);
}
/*
@@ -738,7 +738,7 @@
void forceCleanup()
{
- CompactionManager.instance().submitCleanup(ColumnFamilyStore.this);
+ CompactionManager.instance.submitCleanup(ColumnFamilyStore.this);
}
/**
@@ -939,7 +939,7 @@
ssTables_.add(ssTable);
ssTables_.markCompacted(sstables);
gcAfterRpcTimeout();
- CompactionManager.instance().submit(ColumnFamilyStore.this);
+ CompactionManager.instance.submitMinor(ColumnFamilyStore.this);
String format = "Compacted to %s. %d/%d bytes for %d keys. Time: %dms.";
long dTime = System.currentTimeMillis() - startTime;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=890022&r1=890021&r2=890022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Sun Dec 13 04:16:59 2009
@@ -25,10 +25,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
+import javax.management.*;
import org.apache.log4j.Logger;
@@ -41,154 +38,25 @@
{
public static final String MBEAN_OBJECT_NAME = "org.apache.cassandra.db:type=CompactionManager";
private static final Logger logger = Logger.getLogger(CompactionManager.class);
- private static volatile CompactionManager instance;
+ public static final CompactionManager instance;
private int minimumCompactionThreshold = 4; // compact this many sstables min at a time
private int maximumCompactionThreshold = 32; // compact this many sstables max at a time
- public static CompactionManager instance()
+ static
{
- if (instance == null)
+ instance = new CompactionManager();
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
{
- synchronized (CompactionManager.class)
- {
- try
- {
- if (instance == null)
- {
- instance = new CompactionManager();
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(instance, new ObjectName(MBEAN_OBJECT_NAME));
- }
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
- }
- return instance;
- }
-
- static abstract class Compactor<T> implements Callable<T>
- {
- protected final ColumnFamilyStore cfstore;
- public Compactor(ColumnFamilyStore columnFamilyStore)
- {
- cfstore = columnFamilyStore;
- }
-
- abstract T compact() throws IOException;
-
- public T call()
- {
- T results;
- if (logger.isDebugEnabled())
- logger.debug("Starting " + this + ".");
- try
- {
- results = compact();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- if (logger.isDebugEnabled())
- logger.debug("Finished " + this + ".");
- return results;
- }
-
- @Override
- public String toString()
- {
- StringBuilder buff = new StringBuilder();
- buff.append("<").append(getClass().getSimpleName());
- buff.append(" for ").append(cfstore).append(">");
- return buff.toString();
- }
- }
-
- static class AntiCompactor extends Compactor<List<SSTableReader>>
- {
- private final Collection<Range> ranges;
- private final InetAddress target;
- AntiCompactor(ColumnFamilyStore cfstore, Collection<Range> ranges, InetAddress target)
- {
- super(cfstore);
- this.ranges = ranges;
- this.target = target;
- }
-
- public List<SSTableReader> compact() throws IOException
- {
- return cfstore.doAntiCompaction(ranges, target);
- }
- }
-
- static class OnDemandCompactor extends Compactor<Object>
- {
- private final long skip;
- OnDemandCompactor(ColumnFamilyStore cfstore, long skip)
- {
- super(cfstore);
- this.skip = skip;
- }
-
- public Object compact() throws IOException
- {
- cfstore.doMajorCompaction(skip);
- return this;
- }
- }
-
- static class CleanupCompactor extends Compactor<Object>
- {
- CleanupCompactor(ColumnFamilyStore cfstore)
- {
- super(cfstore);
+ mbs.registerMBean(instance, new ObjectName(MBEAN_OBJECT_NAME));
}
-
- public Object compact() throws IOException
+ catch (Exception e)
{
- cfstore.doCleanupCompaction();
- return this;
+ throw new RuntimeException(e);
}
}
-
- static class MinorCompactor extends Compactor<Integer>
- {
- private final int minimum;
- private final int maximum;
- MinorCompactor(ColumnFamilyStore cfstore, int minimumThreshold, int maximumThreshold)
- {
- super(cfstore);
- minimum = minimumThreshold;
- maximum = maximumThreshold;
- }
- public Integer compact() throws IOException
- {
- return cfstore.doCompaction(minimum, maximum);
- }
- }
-
- static class ReadonlyCompactor extends Compactor<Object>
- {
- private final InetAddress initiator;
- ReadonlyCompactor(ColumnFamilyStore cfstore, InetAddress initiator)
- {
- super(cfstore);
- this.initiator = initiator;
- }
-
- public Object compact() throws IOException
- {
- cfstore.doReadonlyCompaction(initiator);
- return this;
- }
- }
-
-
private ExecutorService compactor_ = new DebuggableThreadPoolExecutor("COMPACTION-POOL");
/**
@@ -196,34 +64,72 @@
* It's okay to over-call (within reason) since the compactions are single-threaded,
* and if a call is unnecessary, it will just be no-oped in the bucketing phase.
*/
- public Future<Integer> submit(final ColumnFamilyStore columnFamilyStore)
+ public Future<Integer> submitMinor(final ColumnFamilyStore columnFamilyStore)
{
- return submit(columnFamilyStore, minimumCompactionThreshold, maximumCompactionThreshold);
+ return submitMinor(columnFamilyStore, minimumCompactionThreshold, maximumCompactionThreshold);
}
- Future<Integer> submit(final ColumnFamilyStore columnFamilyStore, final int minThreshold, final int maxThreshold)
+ Future<Integer> submitMinor(final ColumnFamilyStore cfStore, final int minThreshold, final int maxThreshold)
{
- return compactor_.submit(new MinorCompactor(columnFamilyStore, minThreshold, maxThreshold));
+ Callable<Integer> callable = new Callable<Integer>()
+ {
+ public Integer call() throws IOException
+ {
+ return cfStore.doCompaction(minThreshold, maxThreshold);
+ }
+ };
+ return compactor_.submit(callable);
}
- public Future submitCleanup(ColumnFamilyStore columnFamilyStore)
+ public Future<Object> submitCleanup(final ColumnFamilyStore cfStore)
{
- return compactor_.submit(new CleanupCompactor(columnFamilyStore));
+ Callable<Object> runnable = new Callable<Object>()
+ {
+ public Object call() throws IOException
+ {
+ cfStore.doCleanupCompaction();
+ return this;
+ }
+ };
+ return compactor_.submit(runnable);
}
- public Future<List<SSTableReader>> submitAnti(ColumnFamilyStore columnFamilyStore, Collection<Range> ranges, InetAddress target)
+ public Future<List<SSTableReader>> submitAnti(final ColumnFamilyStore cfStore, final Collection<Range> ranges, final InetAddress target)
{
- return compactor_.submit(new AntiCompactor(columnFamilyStore, ranges, target));
+ Callable<List<SSTableReader>> callable = new Callable<List<SSTableReader>>()
+ {
+ public List<SSTableReader> call() throws IOException
+ {
+ return cfStore.doAntiCompaction(ranges, target);
+ }
+ };
+ return compactor_.submit(callable);
}
- public Future submitMajor(ColumnFamilyStore columnFamilyStore, long skip)
+ public Future submitMajor(final ColumnFamilyStore cfStore, final long skip)
{
- return compactor_.submit(new OnDemandCompactor(columnFamilyStore, skip));
+ Callable<Object> callable = new Callable<Object>()
+ {
+ public Object call() throws IOException
+ {
+ cfStore.doMajorCompaction(skip);
+ return this;
+ }
+ };
+ return compactor_.submit(callable);
}
- public Future submitReadonly(ColumnFamilyStore columnFamilyStore, InetAddress initiator)
+ public Future submitReadonly(final ColumnFamilyStore cfStore, final InetAddress initiator)
{
- return compactor_.submit(new ReadonlyCompactor(columnFamilyStore, initiator));
+ Callable<Object> callable = new Callable<Object>()
+ {
+ public Object call() throws IOException
+ {
+ cfStore.doReadonlyCompaction(initiator);
+ return this;
+ }
+ };
+ return compactor_.submit(callable);
}
/**
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=890022&r1=890021&r2=890022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Sun Dec 13 04:16:59 2009
@@ -340,7 +340,7 @@
{
ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
if ( cfStore != null )
- CompactionManager.instance().submitMajor(cfStore, 0);
+ CompactionManager.instance.submitMajor(cfStore, 0);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=890022&r1=890021&r2=890022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Sun Dec 13 04:16:59 2009
@@ -636,7 +636,7 @@
try
{
List<Range> ranges = new ArrayList<Range>(differences);
- List<SSTableReader> sstables = CompactionManager.instance().submitAnti(cfstore, ranges, remote).get();
+ List<SSTableReader> sstables = CompactionManager.instance.submitAnti(cfstore, ranges, remote).get();
Streaming.transferSSTables(remote, sstables, cf.table);
}
catch(Exception e)
@@ -723,7 +723,7 @@
// trigger readonly-compaction
logger.debug("Queueing readonly compaction for request from " + message.getFrom() + " for " + request);
Table table = Table.open(request.table);
- CompactionManager.instance().submitReadonly(table.getColumnFamilyStore(request.cf), message.getFrom());
+ CompactionManager.instance.submitReadonly(table.getColumnFamilyStore(request.cf), message.getFrom());
}
catch (IOException e)
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=890022&r1=890021&r2=890022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Sun Dec 13 04:16:59 2009
@@ -639,7 +639,7 @@
System.exit(1);
}
int minthreshold = Integer.parseInt(arguments[1]);
- int maxthreshold = CompactionManager.instance().getMaximumCompactionThreshold();
+ int maxthreshold = CompactionManager.instance.getMaximumCompactionThreshold();
if (arguments.length > 2)
{
maxthreshold = Integer.parseInt(arguments[2]);
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=890022&r1=890021&r2=890022&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java Sun Dec 13 04:16:59 2009
@@ -31,7 +31,6 @@
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.filter.IdentityQueryFilter;
import org.apache.cassandra.utils.FBUtilities;
@@ -65,7 +64,7 @@
}
while (true)
{
- Future<Integer> ft = CompactionManager.instance().submit(store);
+ Future<Integer> ft = CompactionManager.instance.submitMinor(store);
if (ft.get() == 0)
break;
}
@@ -79,7 +78,7 @@
@Test
public void testCompactionPurge() throws IOException, ExecutionException, InterruptedException
{
- CompactionManager.instance().disableCompactions();
+ CompactionManager.instance.disableCompactions();
Table table = Table.open(TABLE1);
String cfName = "Standard1";
@@ -132,7 +131,7 @@
@Test
public void testCompactionPurgeOneFile() throws IOException, ExecutionException, InterruptedException
{
- CompactionManager.instance().disableCompactions();
+ CompactionManager.instance.disableCompactions();
Table table = Table.open(TABLE1);
String cfName = "Standard2";
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java?rev=890022&r1=890021&r2=890022&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java Sun Dec 13 04:16:59 2009
@@ -46,7 +46,7 @@
store.forceBlockingFlush();
assertEquals(inserted.size(), table.getColumnFamilyStore(columnFamilyName).getKeyRange("", "", 10000).keys.size());
}
- Future<Integer> ft = CompactionManager.instance().submit(store, 2, 32);
+ Future<Integer> ft = CompactionManager.instance.submitMinor(store, 2, 32);
ft.get();
assertEquals(1, store.getSSTables().size());
assertEquals(table.getColumnFamilyStore(columnFamilyName).getKeyRange("", "", 10000).keys.size(), inserted.size());
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java?rev=890022&r1=890021&r2=890022&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java Sun Dec 13 04:16:59 2009
@@ -25,7 +25,6 @@
import java.util.concurrent.ExecutionException;
import java.util.Set;
import java.util.HashSet;
-import java.util.Arrays;
import java.util.Collections;
import org.junit.Test;
@@ -38,7 +37,7 @@
@Test
public void testWithFlush() throws IOException, ExecutionException, InterruptedException
{
- CompactionManager.instance().disableCompactions();
+ CompactionManager.instance.disableCompactions();
for (int i = 0; i < 100; i++)
{
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java?rev=890022&r1=890021&r2=890022&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java Sun Dec 13 04:16:59 2009
@@ -59,7 +59,7 @@
store.forceBlockingFlush();
validateRemoveTwoSources();
- Future<Integer> ft = CompactionManager.instance().submit(store, 2, 32);
+ Future<Integer> ft = CompactionManager.instance.submitMinor(store, 2, 32);
ft.get();
assertEquals(1, store.getSSTables().size());
validateRemoveCompacted();
@@ -144,7 +144,7 @@
store.forceBlockingFlush();
validateRemoveWithNewData();
- Future<Integer> ft = CompactionManager.instance().submit(store, 2, 32);
+ Future<Integer> ft = CompactionManager.instance.submitMinor(store, 2, 32);
ft.get();
assertEquals(1, store.getSSTables().size());
validateRemoveWithNewData();
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=890022&r1=890021&r2=890022&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Sun Dec 13 04:16:59 2009
@@ -18,7 +18,6 @@
*/
package org.apache.cassandra.service;
-import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.*;
@@ -37,13 +36,11 @@
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.CompactionIterator.CompactedRow;
import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.io.SSTableReader;
import static org.apache.cassandra.service.AntiEntropyService.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.io.SSTableUtils;
import org.junit.Before;
import org.junit.Test;
@@ -162,7 +159,7 @@
// force a major compaction, and wait for it to finish
MerkleTree old = aes.getCachedTree(tablename, cfname, LOCAL);
- CompactionManager.instance().submitMajor(store, 0).get(5000, TimeUnit.MILLISECONDS);
+ CompactionManager.instance.submitMajor(store, 0).get(5000, TimeUnit.MILLISECONDS);
// check that a tree was created and cached
flushAES().get(5000, TimeUnit.MILLISECONDS);