You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2012/09/05 17:21:16 UTC

[3/3] git commit: Respect 5min flush moratorium after CL replay; patch by yukim reviewed by jbellis for CASSANDRA-4474

Respect 5min flush moratorium after CL replay; patch by yukim reviewed by jbellis for CASSANDRA-4474


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8eb2fed1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8eb2fed1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8eb2fed1

Branch: refs/heads/cassandra-1.1
Commit: 8eb2fed1ebcbb9a1dd6d64673e8b9aede577660e
Parents: 846b140
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Sep 5 10:07:07 2012 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Sep 5 10:07:07 2012 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/db/ColumnFamilyStore.java |    6 ++
 .../db/compaction/AbstractCompactionStrategy.java  |   13 ----
 .../cassandra/service/AbstractCassandraDaemon.java |   44 +++++++++++++-
 .../org/apache/cassandra/utils/DefaultInteger.java |    5 ++
 4 files changed, 51 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8eb2fed1/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index a39530a..64b2402 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1846,6 +1846,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         maxCompactionThreshold.set(0);
     }
 
+    public void enableAutoCompaction()
+    {
+        minCompactionThreshold.reset();
+        maxCompactionThreshold.reset();
+    }
+
     /*
      JMX getters and setters for the Default<T>s.
        - get/set minCompactionThreshold

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8eb2fed1/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 3f51b88..93c6298 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -47,19 +47,6 @@ public abstract class AbstractCompactionStrategy
         assert cfs != null;
         this.cfs = cfs;
         this.options = options;
-
-        // start compactions in five minutes (if no flushes have occurred by then to do so)
-        Runnable runnable = new Runnable()
-        {
-            public void run()
-            {
-                if (CompactionManager.instance.getActiveCompactions() == 0)
-                {
-                    CompactionManager.instance.submitBackground(AbstractCompactionStrategy.this.cfs);
-                }
-            }
-        };
-        StorageService.optionalTasks.schedule(runnable, 5 * 60, TimeUnit.SECONDS);
     }
 
     public Map<String, String> getOptions()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8eb2fed1/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java b/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
index 9f19123..2a66e73 100644
--- a/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -45,6 +45,7 @@ import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.SystemTable;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.Mx4jTool;
 
@@ -143,8 +144,8 @@ public abstract class AbstractCassandraDaemon implements CassandraDaemon
 
         // check all directories(data, commitlog, saved cache) for existence and permission
         Iterable<String> dirs = Iterables.concat(Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()),
-                                                 Arrays.asList(new String[] {DatabaseDescriptor.getCommitLogLocation(),
-                                                                             DatabaseDescriptor.getSavedCachesLocation()}));
+                                                 Arrays.asList(DatabaseDescriptor.getCommitLogLocation(),
+                                                               DatabaseDescriptor.getSavedCachesLocation()));
         for (String dataDir : dirs)
         {
             logger.debug("Checking directory {}", dataDir);
@@ -201,7 +202,14 @@ public abstract class AbstractCassandraDaemon implements CassandraDaemon
         {
             if (logger.isDebugEnabled())
                 logger.debug("opening keyspace " + table);
-            Table.open(table);
+            // disable auto compaction until commit log replay ends
+            for (ColumnFamilyStore cfs : Table.open(table).getColumnFamilyStores())
+            {
+                for (ColumnFamilyStore store : cfs.concatWithIndexes())
+                {
+                    store.disableAutoCompaction();
+                }
+            }
         }
 
         if (CacheService.instance.keyCache.size() > 0)
@@ -222,6 +230,34 @@ public abstract class AbstractCassandraDaemon implements CassandraDaemon
         // replay the log if necessary
         CommitLog.instance.recover();
 
+        // enable auto compaction
+        for (Table table : Table.all())
+        {
+            for (ColumnFamilyStore cfs : table.getColumnFamilyStores())
+            {
+                for (final ColumnFamilyStore store : cfs.concatWithIndexes())
+                {
+                    store.enableAutoCompaction();
+                }
+            }
+        }
+        // start compactions in five minutes (if no flushes have occurred by then to do so)
+        Runnable runnable = new Runnable()
+        {
+            public void run()
+            {
+                for (Table table : Table.all())
+                {
+                    for (ColumnFamilyStore cf : table.getColumnFamilyStores())
+                    {
+                        for (ColumnFamilyStore store : cf.concatWithIndexes())
+                            CompactionManager.instance.submitBackground(store);
+                    }
+                }
+            }
+        };
+        StorageService.optionalTasks.schedule(runnable, 5 * 60, TimeUnit.SECONDS);
+
         SystemTable.finishStartup();
 
         // start server internals

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8eb2fed1/src/java/org/apache/cassandra/utils/DefaultInteger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/DefaultInteger.java b/src/java/org/apache/cassandra/utils/DefaultInteger.java
index 05fd5a3..63aa417 100644
--- a/src/java/org/apache/cassandra/utils/DefaultInteger.java
+++ b/src/java/org/apache/cassandra/utils/DefaultInteger.java
@@ -40,6 +40,11 @@ public class DefaultInteger
         currentValue = i;
     }
 
+    public void reset()
+    {
+        currentValue = originalValue;
+    }
+
     public boolean isModified()
     {
         return originalValue != currentValue;