You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/01/29 10:27:09 UTC

cassandra git commit: Stop compactions before exiting offline tools

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 2bff135cd -> 3ad0d3a8f


Stop compactions before exiting offline tools

Patch by marcuse; reviewed by yukim for CASSANDRA-8623


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3ad0d3a8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3ad0d3a8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3ad0d3a8

Branch: refs/heads/cassandra-2.1
Commit: 3ad0d3a8f2ff4a2f5207b47620b52c192f4e5d82
Parents: 2bff135
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu Jan 29 10:20:09 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Thu Jan 29 10:21:39 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 46 +++++++++++++++++++-
 .../cassandra/tools/StandaloneScrubber.java     |  4 +-
 .../cassandra/tools/StandaloneSplitter.java     |  4 +-
 .../cassandra/tools/StandaloneUpgrader.java     |  4 +-
 5 files changed, 55 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ad0d3a8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d273350..fce4898 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * Stop compactions before exiting offline tools (CASSANDRA-8623)
  * Update tools/stress/README.txt to match current behaviour (CASSANDRA-7933)
  * Fix schema from Thrift conversion with empty metadata (CASSANDRA-8695)
  * Safer Resource Management (CASSANDRA-7705)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ad0d3a8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index f59938f..68313a3 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -50,6 +50,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -178,9 +179,13 @@ public class CompactionManager implements CompactionManagerMBean
                      cfs.name,
                      cfs.getCompactionStrategy().getName());
         List<Future<?>> futures = new ArrayList<Future<?>>();
-
         // we must schedule it at least once, otherwise compaction will stop for a CF until next flush
         do {
+            if (executor.isShutdown())
+            {
+                logger.info("Executor has shut down, not submitting background task");
+                return Collections.emptyList();
+            }
             compactingCF.add(cfs);
             futures.add(executor.submit(new BackgroundCompactionTask(cfs)));
             // if we have room for more compactions, then fill up executor
@@ -197,6 +202,12 @@ public class CompactionManager implements CompactionManagerMBean
         return false;
     }
 
+    public void finishCompactionsAndShutdown(long timeout, TimeUnit unit) throws InterruptedException
+    {
+        executor.shutdown();
+        executor.awaitTermination(timeout, unit);
+    }
+
     // the actual sstables to compact are not determined until we run the BCT; that way, if new sstables
     // are created between task submission and execution, we execute against the most up-to-date information
     class BackgroundCompactionTask implements Runnable
@@ -256,6 +267,12 @@ public class CompactionManager implements CompactionManagerMBean
 
             for (final SSTableReader sstable : sstables)
             {
+                if (executor.isShutdown())
+                {
+                    logger.info("Executor has shut down, not submitting task");
+                    return AllSSTableOpStatus.ABORTED;
+                }
+
                 futures.add(executor.submit(new Callable<Object>()
                 {
                     @Override
@@ -394,6 +411,12 @@ public class CompactionManager implements CompactionManagerMBean
                 performAnticompaction(cfs, ranges, sstables, repairedAt);
             }
         };
+        if (executor.isShutdown())
+        {
+            logger.info("Compaction executor has shut down, not submitting anticompaction");
+            return Futures.immediateCancelledFuture();
+        }
+
         return executor.submit(runnable);
     }
 
@@ -489,6 +512,11 @@ public class CompactionManager implements CompactionManagerMBean
                     task.execute(metrics);
                 }
             };
+            if (executor.isShutdown())
+            {
+                logger.info("Compaction executor has shut down, not submitting task");
+                return Collections.emptyList();
+            }
             futures.add(executor.submit(runnable));
         }
         return futures;
@@ -554,6 +582,12 @@ public class CompactionManager implements CompactionManagerMBean
                 }
             }
         };
+        if (executor.isShutdown())
+        {
+            logger.info("Compaction executor has shut down, not submitting task");
+            return Futures.immediateCancelledFuture();
+        }
+
         return executor.submit(runnable);
     }
 
@@ -1090,6 +1124,11 @@ public class CompactionManager implements CompactionManagerMBean
                 }
             }
         };
+        if (executor.isShutdown())
+        {
+            logger.info("Compaction executor has shut down, not submitting index build");
+            return null;
+        }
 
         return executor.submit(runnable);
     }
@@ -1123,6 +1162,11 @@ public class CompactionManager implements CompactionManagerMBean
                 }
             }
         };
+        if (executor.isShutdown())
+        {
+            logger.info("Executor has shut down, not submitting background task");
+            Futures.immediateCancelledFuture();
+        }
         return executor.submit(runnable);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ad0d3a8/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index 63a3727..1bc2674 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.tools;
 
 import java.io.File;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
@@ -33,6 +34,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
 import org.apache.cassandra.db.compaction.LeveledManifest;
 import org.apache.cassandra.db.compaction.Scrubber;
@@ -131,7 +133,7 @@ public class StandaloneScrubber
 
             // Check (and repair) manifests
             checkManifest(cfs.getCompactionStrategy(), cfs, sstables);
-
+            CompactionManager.instance.finishCompactionsAndShutdown(5, TimeUnit.MINUTES);
             SSTableDeletingTask.waitForDeletions();
             System.exit(0); // We need that to stop non daemonized threads
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ad0d3a8/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
index 5ed1543..242b1c0 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.tools;
 import java.io.File;
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.cli.*;
 
@@ -28,6 +29,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.SSTableSplitter;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -108,7 +110,6 @@ public class StandaloneSplitter
             // Do not load sstables since they might be broken
             Keyspace keyspace = Keyspace.openWithoutSSTables(ksName);
             ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
-
             String snapshotName = "pre-split-" + System.currentTimeMillis();
 
             List<SSTableReader> sstables = new ArrayList<SSTableReader>();
@@ -159,6 +160,7 @@ public class StandaloneSplitter
                         e.printStackTrace(System.err);
                 }
             }
+            CompactionManager.instance.finishCompactionsAndShutdown(5, TimeUnit.MINUTES);
             SSTableDeletingTask.waitForDeletions();
             System.exit(0); // We need that to stop non daemonized threads
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ad0d3a8/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
index 92b6445..8fa5b60 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.tools;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.cli.*;
 
@@ -27,6 +28,7 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.Upgrader;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -107,7 +109,7 @@ public class StandaloneUpgrader
                         e.printStackTrace(System.err);
                 }
             }
-
+            CompactionManager.instance.finishCompactionsAndShutdown(5, TimeUnit.MINUTES);
             SSTableDeletingTask.waitForDeletions();
             System.exit(0);
         }