You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/11/20 00:01:49 UTC
cassandra git commit: Centralize shared executors
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 e3862bc3e -> 4397c3447
Centralize shared executors
patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for
CASSANDRA-8055
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4397c344
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4397c344
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4397c344
Branch: refs/heads/cassandra-2.1
Commit: 4397c34476070ea15ee0d2b9c625887a8b08b622
Parents: e3862bc
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Thu Nov 20 01:42:03 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Nov 20 01:57:01 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/auth/Auth.java | 17 ++++----
.../cassandra/auth/PasswordAuthenticator.java | 18 ++++----
.../apache/cassandra/cache/AutoSavingCache.java | 10 ++---
.../concurrent/ScheduledExecutors.java | 43 ++++++++++++++++++++
.../apache/cassandra/cql3/QueryProcessor.java | 5 +--
.../apache/cassandra/db/BatchlogManager.java | 8 +++-
.../apache/cassandra/db/ColumnFamilyStore.java | 42 +++++++++++--------
.../cassandra/db/HintedHandOffManager.java | 7 ++--
.../db/commitlog/CommitLogArchiver.java | 2 +-
.../io/sstable/SSTableDeletingTask.java | 6 +--
.../cassandra/io/sstable/SSTableReader.java | 3 +-
.../org/apache/cassandra/io/util/FileUtils.java | 4 +-
.../locator/DynamicEndpointSnitch.java | 5 ++-
.../apache/cassandra/net/MessagingService.java | 4 +-
.../cassandra/service/CassandraDaemon.java | 3 +-
.../cassandra/service/LoadBroadcaster.java | 3 +-
.../cassandra/service/MigrationManager.java | 3 +-
.../cassandra/service/StorageService.java | 38 ++++-------------
.../apache/cassandra/utils/ResourceWatcher.java | 4 +-
.../org/apache/cassandra/cql3/CQLTester.java | 6 +--
.../org/apache/cassandra/db/KeyCacheTest.java | 5 +--
22 files changed, 139 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c00e671..41a5aaf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * Centralize shared executors (CASSANDRA-8055)
* Fix filtering for CONTAINS (KEY) relations on frozen collection
clustering columns when the query is restricted to a single
partition (CASSANDRA-8203)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/auth/Auth.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java
index 4f18111..ed7aa87 100644
--- a/src/java/org/apache/cassandra/auth/Auth.java
+++ b/src/java/org/apache/cassandra/auth/Auth.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
@@ -189,15 +190,13 @@ public class Auth implements AuthMBean
// the delay is here to give the node some time to see its peers - to reduce
// "Skipped default superuser setup: some nodes were not ready" log spam.
// It's the only reason for the delay.
- StorageService.tasks.schedule(new Runnable()
- {
- public void run()
- {
- setupDefaultSuperuser();
- }
- },
- SUPERUSER_SETUP_DELAY,
- TimeUnit.MILLISECONDS);
+ ScheduledExecutors.nonPeriodicTasks.schedule(new Runnable()
+ {
+ public void run()
+ {
+ setupDefaultSuperuser();
+ }
+ }, SUPERUSER_SETUP_DELAY, TimeUnit.MILLISECONDS);
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
index 1218ee2..9570770 100644
--- a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
+++ b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
@@ -30,6 +30,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.QueryOptions;
@@ -37,7 +38,6 @@ import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.service.QueryState;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.mindrot.jbcrypt.BCrypt;
@@ -169,15 +169,13 @@ public class PasswordAuthenticator implements ISaslAwareAuthenticator
// the delay is here to give the node some time to see its peers - to reduce
// "skipped default user setup: some nodes are were not ready" log spam.
// It's the only reason for the delay.
- StorageService.tasks.schedule(new Runnable()
- {
- public void run()
- {
- setupDefaultUser();
- }
- },
- Auth.SUPERUSER_SETUP_DELAY,
- TimeUnit.MILLISECONDS);
+ ScheduledExecutors.nonPeriodicTasks.schedule(new Runnable()
+ {
+ public void run()
+ {
+ setupDefaultUser();
+ }
+ }, Auth.SUPERUSER_SETUP_DELAY, TimeUnit.MILLISECONDS);
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index fca939a..2117eb8 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -27,6 +27,7 @@ import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
@@ -39,7 +40,6 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.service.CacheService;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
@@ -121,10 +121,10 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
submitWrite(keysToSave);
}
};
- saveTask = StorageService.optionalTasks.scheduleWithFixedDelay(runnable,
- savePeriodInSeconds,
- savePeriodInSeconds,
- TimeUnit.SECONDS);
+ saveTask = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(runnable,
+ savePeriodInSeconds,
+ savePeriodInSeconds,
+ TimeUnit.SECONDS);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
new file mode 100644
index 0000000..5935669
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+/**
+ * Centralized location for shared executors
+ */
+public class ScheduledExecutors
+{
+ /**
+ * This pool is used for periodic short (sub-second) tasks.
+ */
+ public static final DebuggableScheduledThreadPoolExecutor scheduledTasks = new DebuggableScheduledThreadPoolExecutor("ScheduledTasks");
+
+ /**
+ * This executor is used for tasks that can have longer execution times, and usually are non periodic.
+ */
+ public static final DebuggableScheduledThreadPoolExecutor nonPeriodicTasks = new DebuggableScheduledThreadPoolExecutor("NonPeriodicTasks");
+ static
+ {
+ nonPeriodicTasks.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ }
+
+ /**
+ * This executor is used for tasks that do not need to be waited for on shutdown/drain.
+ */
+ public static final DebuggableScheduledThreadPoolExecutor optionalTasks = new DebuggableScheduledThreadPoolExecutor("OptionalTasks");
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 9f71d71..45ef39c 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -22,7 +22,6 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
@@ -33,6 +32,7 @@ import org.github.jamm.MemoryMeter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.cql3.statements.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.composites.*;
@@ -89,7 +89,6 @@ public class QueryProcessor implements QueryHandler
public static final CQLMetrics metrics = new CQLMetrics();
private static final AtomicInteger lastMinuteEvictionsCount = new AtomicInteger(0);
- private static final ScheduledExecutorService evictionCheckTimer = Executors.newScheduledThreadPool(1);
static
{
@@ -118,7 +117,7 @@ public class QueryProcessor implements QueryHandler
})
.build();
- evictionCheckTimer.scheduleAtFixedRate(new Runnable()
+ ScheduledExecutors.scheduledTasks.scheduleAtFixedRate(new Runnable()
{
public void run()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 279f876..20f134d 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -70,7 +70,7 @@ public class BatchlogManager implements BatchlogManagerMBean
private final AtomicLong totalBatchesReplayed = new AtomicLong();
// Single-thread executor service for scheduling and serializing log replay.
- public static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
+ private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
public void start()
{
@@ -95,6 +95,12 @@ public class BatchlogManager implements BatchlogManagerMBean
batchlogTasks.scheduleWithFixedDelay(runnable, StorageService.RING_DELAY, REPLAY_INTERVAL, TimeUnit.MILLISECONDS);
}
+ public static void shutdown() throws InterruptedException
+ {
+ batchlogTasks.shutdown();
+ batchlogTasks.awaitTermination(60, TimeUnit.SECONDS);
+ }
+
public int countAllBatches()
{
String query = String.format("SELECT count(*) FROM %s.%s", Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 0fa50bb..7e1dd18 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -33,15 +33,13 @@ import com.google.common.collect.*;
import com.google.common.util.concurrent.*;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.cassandra.io.FSWriteError;
+
import org.json.simple.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.*;
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.*;
import org.apache.cassandra.config.*;
import org.apache.cassandra.config.CFMetaData.SpeculativeRetry;
import org.apache.cassandra.db.commitlog.CommitLog;
@@ -61,6 +59,7 @@ import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.Descriptor;
@@ -86,18 +85,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("MemtableFlushWriter"),
"internal");
+
// post-flush executor is single threaded to provide guarantee that any flush Future on a CF will never return until prior flushes have completed
- public static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor(1,
- StageManager.KEEPALIVE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory("MemtablePostFlush"),
- "internal");
- public static final ExecutorService reclaimExecutor = new JMXEnabledThreadPoolExecutor(1, StageManager.KEEPALIVE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory("MemtableReclaimMemory"),
- "internal");
+ private static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor(1,
+ StageManager.KEEPALIVE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory("MemtablePostFlush"),
+ "internal");
+
+ private static final ExecutorService reclaimExecutor = new JMXEnabledThreadPoolExecutor(1,
+ StageManager.KEEPALIVE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory("MemtableReclaimMemory"),
+ "internal");
public final Keyspace keyspace;
public final String name;
@@ -134,6 +136,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public final ColumnFamilyMetrics metric;
public volatile long sampleLatencyNanos;
+ public static void shutdownPostFlushExecutor() throws InterruptedException
+ {
+ postFlushExecutor.shutdown();
+ postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS);
+ }
+
public void reload()
{
// metadata object has been mutated directly. make all the members jibe with new settings.
@@ -188,7 +196,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
};
- StorageService.scheduledTasks.schedule(runnable, period, TimeUnit.MILLISECONDS);
+ ScheduledExecutors.scheduledTasks.schedule(runnable, period, TimeUnit.MILLISECONDS);
}
}
@@ -310,7 +318,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
throw new RuntimeException(e);
}
logger.debug("retryPolicy for {} is {}", name, this.metadata.getSpeculativeRetry());
- StorageService.optionalTasks.scheduleWithFixedDelay(new Runnable()
+ ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(new Runnable()
{
public void run()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 0e68a71..ad8546e 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
@@ -176,7 +177,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
metrics.log();
}
};
- StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
+ ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
}
private static void deleteHint(ByteBuffer tokenBytes, CellName columnName, long timestamp)
@@ -228,7 +229,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
}
}
};
- StorageService.optionalTasks.submit(runnable);
+ ScheduledExecutors.optionalTasks.submit(runnable);
}
//foobar
@@ -249,7 +250,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
}
}
};
- StorageService.optionalTasks.submit(runnable).get();
+ ScheduledExecutors.optionalTasks.submit(runnable).get();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index 1b1a1e0..6cba603 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -52,7 +52,7 @@ public class CommitLogArchiver
}
public final Map<String, Future<?>> archivePending = new ConcurrentHashMap<String, Future<?>>();
- public final ExecutorService executor = new JMXEnabledThreadPoolExecutor("CommitLogArchiver");
+ private final ExecutorService executor = new JMXEnabledThreadPoolExecutor("CommitLogArchiver");
private final String archiveCommand;
private final String restoreCommand;
private final String restoreDirectories;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
index d95dff7..fb1cbb3 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
@@ -27,9 +27,9 @@ import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.db.DataTracker;
import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
public class SSTableDeletingTask implements Runnable
@@ -69,7 +69,7 @@ public class SSTableDeletingTask implements Runnable
public void schedule()
{
- StorageService.tasks.submit(this);
+ ScheduledExecutors.nonPeriodicTasks.submit(this);
}
public void run()
@@ -119,7 +119,7 @@ public class SSTableDeletingTask implements Runnable
}
};
- FBUtilities.waitOnFuture(StorageService.tasks.schedule(runnable, 0, TimeUnit.MILLISECONDS));
+ FBUtilities.waitOnFuture(ScheduledExecutors.nonPeriodicTasks.schedule(runnable, 0, TimeUnit.MILLISECONDS));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 8f302f3..a3e3cf5 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -59,6 +59,7 @@ import org.apache.cassandra.cache.CachingOptions;
import org.apache.cassandra.cache.InstrumentingCache;
import org.apache.cassandra.cache.KeyCacheKey;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.Config;
@@ -635,7 +636,7 @@ public class SSTableReader extends SSTable
else
barrier = null;
- StorageService.tasks.execute(new Runnable()
+ ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
{
public void run()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 295679e..7d187ac 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -38,7 +38,7 @@ import sun.nio.ch.DirectBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.Config;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.BlacklistedDirectories;
import org.apache.cassandra.db.Keyspace;
@@ -326,7 +326,7 @@ public class FileUtils
deleteWithConfirm(new File(file));
}
};
- StorageService.tasks.execute(runnable);
+ ScheduledExecutors.nonPeriodicTasks.execute(runnable);
}
public static String stringifyFileSize(double value)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index 49442c8..e4b714c 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
@@ -84,8 +85,8 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
reset();
}
};
- StorageService.scheduledTasks.scheduleWithFixedDelay(update, UPDATE_INTERVAL_IN_MS, UPDATE_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
- StorageService.scheduledTasks.scheduleWithFixedDelay(reset, RESET_INTERVAL_IN_MS, RESET_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+ ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update, UPDATE_INTERVAL_IN_MS, UPDATE_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+ ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset, RESET_INTERVAL_IN_MS, RESET_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
registerMBean();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 05b449c..73bc9ff 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -37,6 +37,8 @@ import com.google.common.collect.Lists;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.concurrent.TracingAwareExecutorService;
@@ -329,7 +331,7 @@ public final class MessagingService implements MessagingServiceMBean
logDroppedMessages();
}
};
- StorageService.scheduledTasks.scheduleWithFixedDelay(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+ ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
Function<Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>>, ?> timeoutReporter = new Function<Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>>, Object>()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 5897a22..1c99348 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
import com.addthis.metrics.reporter.config.ReporterConfig;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
@@ -329,7 +330,7 @@ public class CassandraDaemon
}
}
};
- StorageService.optionalTasks.schedule(runnable, 5 * 60, TimeUnit.SECONDS);
+ ScheduledExecutors.optionalTasks.schedule(runnable, 5 * 60, TimeUnit.SECONDS);
SystemKeyspace.finishStartup();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/service/LoadBroadcaster.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/LoadBroadcaster.java b/src/java/org/apache/cassandra/service/LoadBroadcaster.java
index 4996e52..d12ffba 100644
--- a/src/java/org/apache/cassandra/service/LoadBroadcaster.java
+++ b/src/java/org/apache/cassandra/service/LoadBroadcaster.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.gms.*;
public class LoadBroadcaster implements IEndpointStateChangeSubscriber
@@ -91,7 +92,7 @@ public class LoadBroadcaster implements IEndpointStateChangeSubscriber
StorageService.instance.valueFactory.load(StorageService.instance.getLoad()));
}
};
- StorageService.scheduledTasks.scheduleWithFixedDelay(runnable, 2 * Gossiper.intervalInMillis, BROADCAST_INTERVAL, TimeUnit.MILLISECONDS);
+ ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(runnable, 2 * Gossiper.intervalInMillis, BROADCAST_INTERVAL, TimeUnit.MILLISECONDS);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index bdae208..ce4dca4 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -32,6 +32,7 @@ import java.lang.management.RuntimeMXBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
@@ -126,7 +127,7 @@ public class MigrationManager
submitMigrationTask(endpoint);
}
};
- StorageService.optionalTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
+ ScheduledExecutors.optionalTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 29054f4..ae8c798 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -54,7 +54,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.Auth;
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
@@ -121,24 +121,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return 30 * 1000;
}
- /**
- * This pool is used for periodic short (sub-second) tasks.
- */
- public static final DebuggableScheduledThreadPoolExecutor scheduledTasks = new DebuggableScheduledThreadPoolExecutor("ScheduledTasks");
-
- /**
- * This pool is used by tasks that can have longer execution times, and usually are non periodic.
- */
- public static final DebuggableScheduledThreadPoolExecutor tasks = new DebuggableScheduledThreadPoolExecutor("NonPeriodicTasks");
- /**
- * tasks that do not need to be waited for on shutdown/drain
- */
- public static final DebuggableScheduledThreadPoolExecutor optionalTasks = new DebuggableScheduledThreadPoolExecutor("OptionalTasks");
- static
- {
- tasks.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
- }
-
/* This abstraction maintains the token/endpoint metadata information */
private TokenMetadata tokenMetadata = new TokenMetadata();
@@ -597,7 +579,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (daemon != null)
shutdownClientServers();
- optionalTasks.shutdown();
+ ScheduledExecutors.optionalTasks.shutdown();
Gossiper.instance.stop();
// In-progress writes originating here could generate hints to be written, so shut down MessagingService
@@ -633,8 +615,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
CommitLog.instance.shutdownBlocking();
// wait for miscellaneous tasks like sstable and commitlog segment deletion
- tasks.shutdown();
- if (!tasks.awaitTermination(1, TimeUnit.MINUTES))
+ ScheduledExecutors.nonPeriodicTasks.shutdown();
+ if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES))
logger.warn("Miscellaneous task executor still busy after one minute; proceeding with shutdown");
}
}, "StorageServiceShutdownHook");
@@ -3602,7 +3584,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
setMode(Mode.DRAINING, "starting drain process", true);
shutdownClientServers();
- optionalTasks.shutdown();
+ ScheduledExecutors.optionalTasks.shutdown();
Gossiper.instance.stop();
setMode(Mode.DRAINING, "shutting down MessageService", false);
@@ -3647,21 +3629,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
FBUtilities.waitOnFutures(flushes);
- BatchlogManager.batchlogTasks.shutdown();
- BatchlogManager.batchlogTasks.awaitTermination(60, TimeUnit.SECONDS);
+ BatchlogManager.shutdown();
// whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure
// there are no segments to replay, so we force the recycling of any remaining (should be at most one)
CommitLog.instance.forceRecycleAllSegments();
- ColumnFamilyStore.postFlushExecutor.shutdown();
- ColumnFamilyStore.postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS);
+ ColumnFamilyStore.shutdownPostFlushExecutor();
CommitLog.instance.shutdownBlocking();
// wait for miscellaneous tasks like sstable and commitlog segment deletion
- tasks.shutdown();
- if (!tasks.awaitTermination(1, TimeUnit.MINUTES))
+ ScheduledExecutors.nonPeriodicTasks.shutdown();
+ if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES))
logger.warn("Miscellaneous task executor still busy after one minute; proceeding with shutdown");
setMode(Mode.DRAINED, true);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/utils/ResourceWatcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ResourceWatcher.java b/src/java/org/apache/cassandra/utils/ResourceWatcher.java
index 2dfab95..5e7cbdd 100644
--- a/src/java/org/apache/cassandra/utils/ResourceWatcher.java
+++ b/src/java/org/apache/cassandra/utils/ResourceWatcher.java
@@ -23,13 +23,13 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
public class ResourceWatcher
{
public static void watch(String resource, Runnable callback, int period)
{
- StorageService.scheduledTasks.scheduleWithFixedDelay(new WatchedResource(resource, callback), period, period, TimeUnit.MILLISECONDS);
+ ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(new WatchedResource(resource, callback), period, period, TimeUnit.MILLISECONDS);
}
public static class WatchedResource implements Runnable
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 470b701..dd22896 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Directories;
@@ -43,7 +44,6 @@ import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.serializers.TypeSerializer;
-import org.apache.cassandra.service.StorageService;
/**
* Base class for CQL tests.
@@ -88,7 +88,7 @@ public abstract class CQLTester
currentTypes.clear();
// We want to clean up after the test, but dropping a table is rather long so just do that asynchronously
- StorageService.optionalTasks.execute(new Runnable()
+ ScheduledExecutors.optionalTasks.execute(new Runnable()
{
public void run()
{
@@ -105,7 +105,7 @@ public abstract class CQLTester
// mono-threaded, just push a task on the queue to find when it's empty. No perfect but good enough.
final CountDownLatch latch = new CountDownLatch(1);
- StorageService.tasks.execute(new Runnable()
+ ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
{
public void run()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index c0560ab..1bc7caf 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db;
-import java.nio.file.Files;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -31,12 +30,12 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.cache.KeyCacheKey;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.service.CacheService;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import static org.junit.Assert.assertEquals;
@@ -165,7 +164,7 @@ public class KeyCacheTest extends SchemaLoader
reader.releaseReference();
Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);;
- while (StorageService.tasks.getActiveCount() + StorageService.tasks.getQueue().size() > 0);
+ while (ScheduledExecutors.nonPeriodicTasks.getActiveCount() + ScheduledExecutors.nonPeriodicTasks.getQueue().size() > 0);
// after releasing the reference this should drop to 2
assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);