You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2012/09/05 17:21:16 UTC
[2/3] git commit: Respect 5min flush moratorium after CL replay;
patch by yukim reviewed by jbellis for CASSANDRA-4474
Respect 5min flush moratorium after CL replay; patch by yukim reviewed by jbellis for CASSANDRA-4474
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8eb2fed1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8eb2fed1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8eb2fed1
Branch: refs/heads/trunk
Commit: 8eb2fed1ebcbb9a1dd6d64673e8b9aede577660e
Parents: 846b140
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Sep 5 10:07:07 2012 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Sep 5 10:07:07 2012 -0500
----------------------------------------------------------------------
.../org/apache/cassandra/db/ColumnFamilyStore.java | 6 ++
.../db/compaction/AbstractCompactionStrategy.java | 13 ----
.../cassandra/service/AbstractCassandraDaemon.java | 44 +++++++++++++-
.../org/apache/cassandra/utils/DefaultInteger.java | 5 ++
4 files changed, 51 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8eb2fed1/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index a39530a..64b2402 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1846,6 +1846,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
maxCompactionThreshold.set(0);
}
+ public void enableAutoCompaction()
+ {
+ minCompactionThreshold.reset();
+ maxCompactionThreshold.reset();
+ }
+
/*
JMX getters and setters for the Default<T>s.
- get/set minCompactionThreshold
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8eb2fed1/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 3f51b88..93c6298 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -47,19 +47,6 @@ public abstract class AbstractCompactionStrategy
assert cfs != null;
this.cfs = cfs;
this.options = options;
-
- // start compactions in five minutes (if no flushes have occurred by then to do so)
- Runnable runnable = new Runnable()
- {
- public void run()
- {
- if (CompactionManager.instance.getActiveCompactions() == 0)
- {
- CompactionManager.instance.submitBackground(AbstractCompactionStrategy.this.cfs);
- }
- }
- };
- StorageService.optionalTasks.schedule(runnable, 5 * 60, TimeUnit.SECONDS);
}
public Map<String, String> getOptions()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8eb2fed1/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java b/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
index 9f19123..2a66e73 100644
--- a/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -45,6 +45,7 @@ import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.SystemTable;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.Mx4jTool;
@@ -143,8 +144,8 @@ public abstract class AbstractCassandraDaemon implements CassandraDaemon
// check all directories(data, commitlog, saved cache) for existence and permission
Iterable<String> dirs = Iterables.concat(Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()),
- Arrays.asList(new String[] {DatabaseDescriptor.getCommitLogLocation(),
- DatabaseDescriptor.getSavedCachesLocation()}));
+ Arrays.asList(DatabaseDescriptor.getCommitLogLocation(),
+ DatabaseDescriptor.getSavedCachesLocation()));
for (String dataDir : dirs)
{
logger.debug("Checking directory {}", dataDir);
@@ -201,7 +202,14 @@ public abstract class AbstractCassandraDaemon implements CassandraDaemon
{
if (logger.isDebugEnabled())
logger.debug("opening keyspace " + table);
- Table.open(table);
+ // disable auto compaction until commit log replay ends
+ for (ColumnFamilyStore cfs : Table.open(table).getColumnFamilyStores())
+ {
+ for (ColumnFamilyStore store : cfs.concatWithIndexes())
+ {
+ store.disableAutoCompaction();
+ }
+ }
}
if (CacheService.instance.keyCache.size() > 0)
@@ -222,6 +230,34 @@ public abstract class AbstractCassandraDaemon implements CassandraDaemon
// replay the log if necessary
CommitLog.instance.recover();
+ // enable auto compaction
+ for (Table table : Table.all())
+ {
+ for (ColumnFamilyStore cfs : table.getColumnFamilyStores())
+ {
+ for (final ColumnFamilyStore store : cfs.concatWithIndexes())
+ {
+ store.enableAutoCompaction();
+ }
+ }
+ }
+ // start compactions in five minutes (if no flushes have occurred by then to do so)
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ for (Table table : Table.all())
+ {
+ for (ColumnFamilyStore cf : table.getColumnFamilyStores())
+ {
+ for (ColumnFamilyStore store : cf.concatWithIndexes())
+ CompactionManager.instance.submitBackground(store);
+ }
+ }
+ }
+ };
+ StorageService.optionalTasks.schedule(runnable, 5 * 60, TimeUnit.SECONDS);
+
SystemTable.finishStartup();
// start server internals
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8eb2fed1/src/java/org/apache/cassandra/utils/DefaultInteger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/DefaultInteger.java b/src/java/org/apache/cassandra/utils/DefaultInteger.java
index 05fd5a3..63aa417 100644
--- a/src/java/org/apache/cassandra/utils/DefaultInteger.java
+++ b/src/java/org/apache/cassandra/utils/DefaultInteger.java
@@ -40,6 +40,11 @@ public class DefaultInteger
currentValue = i;
}
+ public void reset()
+ {
+ currentValue = originalValue;
+ }
+
public boolean isModified()
{
return originalValue != currentValue;