You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/12/05 21:48:17 UTC
[2/3] git commit: Allow overriding available processors with
-Dcassandra.available_processors Patch by brandonwilliams,
reviewed by iamaleksey for CASSANDRA-4790
Allow overriding available processors with
-Dcassandra.available_processors
Patch by brandonwilliams, reviewed by iamaleksey for CASSANDRA-4790
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d6d4151e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d6d4151e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d6d4151e
Branch: refs/heads/trunk
Commit: d6d4151ee24791aa35a983de1c39cc4f895d6a55
Parents: 54ab2d2
Author: Brandon Williams <br...@apache.org>
Authored: Wed Dec 5 13:18:00 2012 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Dec 5 14:46:11 2012 -0600
----------------------------------------------------------------------
.../apache/cassandra/concurrent/StageManager.java | 9 +-
src/java/org/apache/cassandra/config/Config.java | 6 +-
.../cassandra/config/DatabaseDescriptor.java | 2 +-
.../PeriodicCommitLogExecutorService.java | 3 +-
.../db/compaction/ParallelCompactionIterable.java | 2 +-
.../cassandra/hadoop/ColumnFamilyRecordWriter.java | 3 +-
.../apache/cassandra/io/sstable/SSTableReader.java | 2 +-
.../org/apache/cassandra/service/StorageProxy.java | 4 +-
.../apache/cassandra/thrift/CassandraDaemon.java | 223 +++++++++++++++
.../org/apache/cassandra/utils/FBUtilities.java | 8 +
10 files changed, 251 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 7ca45f4..bf2e4c2 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.net.MessagingService;
import static org.apache.cassandra.config.DatabaseDescriptor.*;
+import org.apache.cassandra.utils.FBUtilities;
/**
@@ -41,21 +42,21 @@ public class StageManager
public static final long KEEPALIVE = 60; // seconds to keep "extra" threads alive for when idle
- public static final int MAX_REPLICATE_ON_WRITE_TASKS = 1024 * Runtime.getRuntime().availableProcessors();
+ public static final int MAX_REPLICATE_ON_WRITE_TASKS = 1024 * FBUtilities.getAvailableProcessors();
static
{
stages.put(Stage.MUTATION, multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters()));
stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ, getConcurrentReaders()));
- stages.put(Stage.REQUEST_RESPONSE, multiThreadedStage(Stage.REQUEST_RESPONSE, Runtime.getRuntime().availableProcessors()));
- stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, Runtime.getRuntime().availableProcessors()));
+ stages.put(Stage.REQUEST_RESPONSE, multiThreadedStage(Stage.REQUEST_RESPONSE, FBUtilities.getAvailableProcessors()));
+ stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, FBUtilities.getAvailableProcessors()));
stages.put(Stage.REPLICATE_ON_WRITE, multiThreadedConfigurableStage(Stage.REPLICATE_ON_WRITE, getConcurrentReplicators(), MAX_REPLICATE_ON_WRITE_TASKS));
// the rest are all single-threaded
stages.put(Stage.GOSSIP, new JMXEnabledThreadPoolExecutor(Stage.GOSSIP));
stages.put(Stage.ANTI_ENTROPY, new JMXEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY));
stages.put(Stage.MIGRATION, new JMXEnabledThreadPoolExecutor(Stage.MIGRATION));
stages.put(Stage.MISC, new JMXEnabledThreadPoolExecutor(Stage.MISC));
- stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, Runtime.getRuntime().availableProcessors()));
+ stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, FBUtilities.getAvailableProcessors()));
stages.put(Stage.TRACING, tracingExecutor());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index e9f190f..609633c 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -21,12 +21,15 @@ import org.apache.cassandra.cache.SerializingCacheProvider;
import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
import org.apache.cassandra.io.util.NativeAllocator;
+import org.apache.cassandra.utils.FBUtilities;
/**
* A class that contains configuration properties for the cassandra node it runs within.
*
* Properties declared as volatile can be mutated via JMX.
*/
+
+
public class Config
{
public String cluster_name = "Test Cluster";
@@ -101,8 +104,9 @@ public class Config
/* if the size of columns or super-columns are more than this, indexing will kick in */
public Integer column_index_size_in_kb = 64;
public Integer in_memory_compaction_limit_in_mb = 64;
- public Integer concurrent_compactors = Runtime.getRuntime().availableProcessors();
+ public Integer concurrent_compactors = FBUtilities.getAvailableProcessors();
public volatile Integer compaction_throughput_mb_per_sec = 16;
+ public Integer compaction_throughput_mb_per_sec = 16;
public Boolean multithreaded_compaction = false;
public Integer max_streaming_retries = 3;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index c624ba3..c57983a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -389,7 +389,7 @@ public class DatabaseDescriptor
}
if (conf.concurrent_compactors == null)
- conf.concurrent_compactors = Runtime.getRuntime().availableProcessors();
+ conf.concurrent_compactors = FBUtilities.getAvailableProcessors();
if (conf.concurrent_compactors <= 0)
throw new ConfigurationException("concurrent_compactors should be strictly greater than 0");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
index 39978ef..94f593e 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.concurrent.*;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
class PeriodicCommitLogExecutorService implements ICommitLogExecutorService
@@ -32,7 +33,7 @@ class PeriodicCommitLogExecutorService implements ICommitLogExecutorService
public PeriodicCommitLogExecutorService(final CommitLog commitLog)
{
- queue = new LinkedBlockingQueue<Runnable>(1024 * Runtime.getRuntime().availableProcessors());
+ queue = new LinkedBlockingQueue<Runnable>(1024 * FBUtilities.getAvailableProcessors());
Runnable runnable = new WrappedRunnable()
{
public void runMayThrow() throws Exception
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index 56fce20..b19395c 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -135,7 +135,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
private final List<RowContainer> rows = new ArrayList<RowContainer>();
private int row = 0;
- private final ThreadPoolExecutor executor = new DebuggableThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
+ private final ThreadPoolExecutor executor = new DebuggableThreadPoolExecutor(FBUtilities.getAvailableProcessors(),
Integer.MAX_VALUE,
TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
index 3b66976..909c291 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.client.RingCache;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordWriter;
@@ -101,7 +102,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
{
this.conf = conf;
this.ringCache = new RingCache(conf);
- this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * Runtime.getRuntime().availableProcessors());
+ this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
this.clients = new HashMap<Range,RangeClient>();
batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index c1c751c..45a1f7a 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -225,7 +225,7 @@ public class SSTableReader extends SSTable
{
final Collection<SSTableReader> sstables = new LinkedBlockingQueue<SSTableReader>();
- ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", Runtime.getRuntime().availableProcessors());
+ ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", FBUtilities.getAvailableProcessors());
for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
{
Runnable runnable = new Runnable()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 0c3eae9..a3b95ff 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -79,7 +79,9 @@ public class StorageProxy implements StorageProxyMBean
public static final StorageProxy instance = new StorageProxy();
- private static volatile int maxHintsInProgress = 1024 * Runtime.getRuntime().availableProcessors();
+ private static volatile boolean hintedHandoffEnabled = DatabaseDescriptor.hintedHandoffEnabled();
+ private static volatile int maxHintWindow = DatabaseDescriptor.getMaxHintWindow();
+ private static volatile int maxHintsInProgress = 1024 * FBUtilities.getAvailableProcessors();
private static final AtomicInteger totalHintsInProgress = new AtomicInteger();
private static final Map<InetAddress, AtomicInteger> hintsInProgress = new MapMaker().concurrencyLevel(1).makeComputingMap(new Function<InetAddress, AtomicInteger>()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraDaemon.java b/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
new file mode 100644
index 0000000..572e3e0
--- /dev/null
+++ b/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.thrift;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.service.AbstractCassandraDaemon;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+/**
+ * This class supports two methods for creating a Cassandra node daemon,
+ * invoking the class's main method, and using the jsvc wrapper from
+ * commons-daemon, (for more information on using this class with the
+ * jsvc wrapper, see the
+ * <a href="http://commons.apache.org/daemon/jsvc.html">Commons Daemon</a>
+ * documentation).
+ */
+
+public class CassandraDaemon extends org.apache.cassandra.service.AbstractCassandraDaemon
+{
+ protected static CassandraDaemon instance;
+
+ static
+ {
+ AbstractCassandraDaemon.initLog4j();
+ }
+
+ private static Logger logger = LoggerFactory.getLogger(CassandraDaemon.class);
+ private final static String SYNC = "sync";
+ private final static String ASYNC = "async";
+ private final static String HSHA = "hsha";
+ public final static List<String> rpc_server_types = Arrays.asList(SYNC, ASYNC, HSHA);
+ private ThriftServer server;
+
+ protected void startServer()
+ {
+ if (server == null)
+ {
+ server = new ThriftServer(listenAddr, listenPort);
+ server.start();
+ }
+ }
+
+ protected void stopServer()
+ {
+ if (server != null)
+ {
+ server.stopServer();
+ try
+ {
+ server.join();
+ }
+ catch (InterruptedException e)
+ {
+ logger.error("Interrupted while waiting thrift server to stop", e);
+ }
+ server = null;
+ }
+ }
+
+ public static void stop(String[] args)
+ {
+ instance.stopServer();
+ instance.deactivate();
+ }
+
+ public static void main(String[] args)
+ {
+ instance = new CassandraDaemon();
+ instance.activate();
+ }
+
+ /**
+ * Simple class to run the thrift connection accepting code in separate
+ * thread of control.
+ */
+ private static class ThriftServer extends Thread
+ {
+ private TServer serverEngine;
+
+ public ThriftServer(InetAddress listenAddr, int listenPort)
+ {
+ // now we start listening for clients
+ final CassandraServer cassandraServer = new CassandraServer();
+ Cassandra.Processor processor = new Cassandra.Processor(cassandraServer);
+
+ // Transport
+ logger.info(String.format("Binding thrift service to %s:%s", listenAddr, listenPort));
+
+ // Protocol factory
+ TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory(true, true, DatabaseDescriptor.getThriftMaxMessageLength());
+
+ // Transport factory
+ int tFramedTransportSize = DatabaseDescriptor.getThriftFramedTransportSize();
+ TTransportFactory inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
+ TTransportFactory outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
+ logger.info("Using TFastFramedTransport with a max frame size of {} bytes.", tFramedTransportSize);
+
+ if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(SYNC))
+ {
+ TServerTransport serverTransport;
+ try
+ {
+ serverTransport = new TCustomServerSocket(new InetSocketAddress(listenAddr, listenPort),
+ DatabaseDescriptor.getRpcKeepAlive(),
+ DatabaseDescriptor.getRpcSendBufferSize(),
+ DatabaseDescriptor.getRpcRecvBufferSize());
+ }
+ catch (TTransportException e)
+ {
+ throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e);
+ }
+ // ThreadPool Server and will be invocation per connection basis...
+ TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport)
+ .minWorkerThreads(DatabaseDescriptor.getRpcMinThreads())
+ .maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads())
+ .inputTransportFactory(inTransportFactory)
+ .outputTransportFactory(outTransportFactory)
+ .inputProtocolFactory(tProtocolFactory)
+ .outputProtocolFactory(tProtocolFactory)
+ .processor(processor);
+ ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState, serverArgs.minWorkerThreads, serverArgs.maxWorkerThreads);
+ serverEngine = new CustomTThreadPoolServer(serverArgs, executorService);
+ logger.info(String.format("Using synchronous/threadpool thrift server on %s : %s", listenAddr, listenPort));
+ }
+ else
+ {
+ TNonblockingServerTransport serverTransport;
+ try
+ {
+ serverTransport = new TCustomNonblockingServerSocket(new InetSocketAddress(listenAddr, listenPort),
+ DatabaseDescriptor.getRpcKeepAlive(),
+ DatabaseDescriptor.getRpcSendBufferSize(),
+ DatabaseDescriptor.getRpcRecvBufferSize());
+ }
+ catch (TTransportException e)
+ {
+ throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e);
+ }
+
+ if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(ASYNC))
+ {
+ // This is single threaded hence the invocation will be all
+ // in one thread.
+ TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
+ .outputTransportFactory(outTransportFactory)
+ .inputProtocolFactory(tProtocolFactory)
+ .outputProtocolFactory(tProtocolFactory)
+ .processor(processor);
+ logger.info(String.format("Using non-blocking/asynchronous thrift server on %s : %s", listenAddr, listenPort));
+ serverEngine = new CustomTNonBlockingServer(serverArgs);
+ }
+ else if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(HSHA))
+ {
+ // This is NIO selector service but the invocation will be Multi-Threaded with the Executor service.
+ ExecutorService executorService = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getRpcMinThreads(),
+ DatabaseDescriptor.getRpcMaxThreads(),
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(),
+ new NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL");
+ TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
+ .outputTransportFactory(outTransportFactory)
+ .inputProtocolFactory(tProtocolFactory)
+ .outputProtocolFactory(tProtocolFactory)
+ .processor(processor);
+ logger.info(String.format("Using custom half-sync/half-async thrift server on %s : %s", listenAddr, listenPort));
+ // Check for available processors in the system which will be equal to the IO Threads.
+ serverEngine = new CustomTHsHaServer(serverArgs, executorService, FBUtilities.getAvailableProcessors());
+ }
+ }
+ }
+
+ public void run()
+ {
+ logger.info("Listening for thrift clients...");
+ serverEngine.serve();
+ }
+
+ public void stopServer()
+ {
+ logger.info("Stop listening to thrift clients");
+ serverEngine.stop();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index bc910bd..82fab71 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -69,6 +69,14 @@ public class FBUtilities
private static volatile InetAddress localInetAddress;
private static volatile InetAddress broadcastInetAddress;
+ public static int getAvailableProcessors()
+ {
+ if (System.getProperty("cassandra.available_processors") != null)
+ return Integer.parseInt(System.getProperty("cassandra.available_processors"));
+ else
+ return Runtime.getRuntime().availableProcessors();
+ }
+
private static final ThreadLocal<MessageDigest> localMD5Digest = new ThreadLocal<MessageDigest>()
{
@Override