You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/01/29 10:27:09 UTC
cassandra git commit: Stop compactions before exiting offline tools
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 2bff135cd -> 3ad0d3a8f
Stop compactions before exiting offline tools
Patch by marcuse; reviewed by yukim for CASSANDRA-8623
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3ad0d3a8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3ad0d3a8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3ad0d3a8
Branch: refs/heads/cassandra-2.1
Commit: 3ad0d3a8f2ff4a2f5207b47620b52c192f4e5d82
Parents: 2bff135
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu Jan 29 10:20:09 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Thu Jan 29 10:21:39 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 46 +++++++++++++++++++-
.../cassandra/tools/StandaloneScrubber.java | 4 +-
.../cassandra/tools/StandaloneSplitter.java | 4 +-
.../cassandra/tools/StandaloneUpgrader.java | 4 +-
5 files changed, 55 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ad0d3a8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d273350..fce4898 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * Stop compactions before exiting offline tools (CASSANDRA-8623)
* Update tools/stress/README.txt to match current behaviour (CASSANDRA-7933)
* Fix schema from Thrift conversion with empty metadata (CASSANDRA-8695)
* Safer Resource Management (CASSANDRA-7705)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ad0d3a8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index f59938f..68313a3 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -50,6 +50,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -178,9 +179,13 @@ public class CompactionManager implements CompactionManagerMBean
cfs.name,
cfs.getCompactionStrategy().getName());
List<Future<?>> futures = new ArrayList<Future<?>>();
-
// we must schedule it at least once, otherwise compaction will stop for a CF until next flush
do {
+ if (executor.isShutdown())
+ {
+ logger.info("Executor has shut down, not submitting background task");
+ return Collections.emptyList();
+ }
compactingCF.add(cfs);
futures.add(executor.submit(new BackgroundCompactionTask(cfs)));
// if we have room for more compactions, then fill up executor
@@ -197,6 +202,12 @@ public class CompactionManager implements CompactionManagerMBean
return false;
}
+ public void finishCompactionsAndShutdown(long timeout, TimeUnit unit) throws InterruptedException
+ {
+ executor.shutdown();
+ executor.awaitTermination(timeout, unit);
+ }
+
// the actual sstables to compact are not determined until we run the BCT; that way, if new sstables
// are created between task submission and execution, we execute against the most up-to-date information
class BackgroundCompactionTask implements Runnable
@@ -256,6 +267,12 @@ public class CompactionManager implements CompactionManagerMBean
for (final SSTableReader sstable : sstables)
{
+ if (executor.isShutdown())
+ {
+ logger.info("Executor has shut down, not submitting task");
+ return AllSSTableOpStatus.ABORTED;
+ }
+
futures.add(executor.submit(new Callable<Object>()
{
@Override
@@ -394,6 +411,12 @@ public class CompactionManager implements CompactionManagerMBean
performAnticompaction(cfs, ranges, sstables, repairedAt);
}
};
+ if (executor.isShutdown())
+ {
+ logger.info("Compaction executor has shut down, not submitting anticompaction");
+ return Futures.immediateCancelledFuture();
+ }
+
return executor.submit(runnable);
}
@@ -489,6 +512,11 @@ public class CompactionManager implements CompactionManagerMBean
task.execute(metrics);
}
};
+ if (executor.isShutdown())
+ {
+ logger.info("Compaction executor has shut down, not submitting task");
+ return Collections.emptyList();
+ }
futures.add(executor.submit(runnable));
}
return futures;
@@ -554,6 +582,12 @@ public class CompactionManager implements CompactionManagerMBean
}
}
};
+ if (executor.isShutdown())
+ {
+ logger.info("Compaction executor has shut down, not submitting task");
+ return Futures.immediateCancelledFuture();
+ }
+
return executor.submit(runnable);
}
@@ -1090,6 +1124,11 @@ public class CompactionManager implements CompactionManagerMBean
}
}
};
+ if (executor.isShutdown())
+ {
+ logger.info("Compaction executor has shut down, not submitting index build");
+ return null;
+ }
return executor.submit(runnable);
}
@@ -1123,6 +1162,11 @@ public class CompactionManager implements CompactionManagerMBean
}
}
};
+ if (executor.isShutdown())
+ {
+ logger.info("Executor has shut down, not submitting background task");
+ Futures.immediateCancelledFuture();
+ }
return executor.submit(runnable);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ad0d3a8/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index 63a3727..1bc2674 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.tools;
import java.io.File;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
@@ -33,6 +34,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
import org.apache.cassandra.db.compaction.LeveledManifest;
import org.apache.cassandra.db.compaction.Scrubber;
@@ -131,7 +133,7 @@ public class StandaloneScrubber
// Check (and repair) manifests
checkManifest(cfs.getCompactionStrategy(), cfs, sstables);
-
+ CompactionManager.instance.finishCompactionsAndShutdown(5, TimeUnit.MINUTES);
SSTableDeletingTask.waitForDeletions();
System.exit(0); // We need that to stop non daemonized threads
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ad0d3a8/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
index 5ed1543..242b1c0 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.tools;
import java.io.File;
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.*;
@@ -28,6 +29,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.SSTableSplitter;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -108,7 +110,6 @@ public class StandaloneSplitter
// Do not load sstables since they might be broken
Keyspace keyspace = Keyspace.openWithoutSSTables(ksName);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
-
String snapshotName = "pre-split-" + System.currentTimeMillis();
List<SSTableReader> sstables = new ArrayList<SSTableReader>();
@@ -159,6 +160,7 @@ public class StandaloneSplitter
e.printStackTrace(System.err);
}
}
+ CompactionManager.instance.finishCompactionsAndShutdown(5, TimeUnit.MINUTES);
SSTableDeletingTask.waitForDeletions();
System.exit(0); // We need that to stop non daemonized threads
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ad0d3a8/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
index 92b6445..8fa5b60 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.tools;
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.*;
@@ -27,6 +28,7 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.Upgrader;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -107,7 +109,7 @@ public class StandaloneUpgrader
e.printStackTrace(System.err);
}
}
-
+ CompactionManager.instance.finishCompactionsAndShutdown(5, TimeUnit.MINUTES);
SSTableDeletingTask.waitForDeletions();
System.exit(0);
}