You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2016/08/18 01:45:32 UTC
[5/6] cassandra git commit: Let DatabaseDescriptor not implicitly
startup services
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 6f71817..f9d0498 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -35,30 +35,31 @@ import com.google.common.primitives.Longs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.auth.*;
+import org.apache.cassandra.auth.AuthConfig;
+import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.auth.IAuthorizer;
+import org.apache.cassandra.auth.IInternodeAuthenticator;
+import org.apache.cassandra.auth.IRoleManager;
import org.apache.cassandra.config.Config.CommitLogSync;
import org.apache.cassandra.config.Config.RequestSchedulerId;
-import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
-import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.util.DiskOptimizationStrategy;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.SpinningDiskOptimizationStrategy;
import org.apache.cassandra.io.util.SsdDiskOptimizationStrategy;
-import org.apache.cassandra.locator.*;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.locator.DynamicEndpointSnitch;
+import org.apache.cassandra.locator.EndpointSnitchInfo;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.SeedProvider;
import org.apache.cassandra.scheduler.IRequestScheduler;
import org.apache.cassandra.scheduler.NoScheduler;
import org.apache.cassandra.security.EncryptionContext;
-import org.apache.cassandra.service.CacheService;
-import org.apache.cassandra.thrift.ThriftServer;
+import org.apache.cassandra.service.CacheService.CacheType;
+import org.apache.cassandra.thrift.ThriftServer.ThriftServerType;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.memory.*;
+
import org.apache.commons.lang3.StringUtils;
public class DatabaseDescriptor
@@ -71,6 +72,8 @@ public class DatabaseDescriptor
*/
private static final int MAX_NUM_TOKENS = 1536;
+ private static Config conf;
+
private static IEndpointSnitch snitch;
private static InetAddress listenAddress; // leave null so we can fall through to getLocalHost
private static InetAddress broadcastAddress;
@@ -85,12 +88,8 @@ public class DatabaseDescriptor
private static Config.DiskAccessMode indexAccessMode;
- private static Config conf;
-
- private static SSTableFormat.Type sstable_format = SSTableFormat.Type.BIG;
-
- private static IAuthenticator authenticator = new AllowAllAuthenticator();
- private static IAuthorizer authorizer = new AllowAllAuthorizer();
+ private static IAuthenticator authenticator;
+ private static IAuthorizer authorizer;
// Don't initialize the role manager until applying config. The options supported by CassandraRoleManager
// depend on the configured IAuthenticator, so defer creating it until that's been set.
private static IRoleManager roleManager;
@@ -113,36 +112,63 @@ public class DatabaseDescriptor
private static DiskOptimizationStrategy diskOptimizationStrategy;
- public static void forceStaticInitialization() {}
- static
+ private static boolean clientInitialized;
+ private static boolean toolInitialized;
+ private static boolean daemonInitialized;
+
+ public static void daemonInitialization() throws ConfigurationException
{
- // In client mode, we use a default configuration. Note that the fields of this class will be
- // left unconfigured however (the partitioner or localDC will be null for instance) so this
- // should be used with care.
- try
- {
- if (Config.isClientMode())
- {
- conf = new Config();
- }
- else
- {
- applyConfig(loadConfig());
- }
- switch (conf.disk_optimization_strategy)
- {
- case ssd:
- diskOptimizationStrategy = new SsdDiskOptimizationStrategy(conf.disk_optimization_page_cross_chance);
- break;
- case spinning:
- diskOptimizationStrategy = new SpinningDiskOptimizationStrategy();
- break;
- }
- }
- catch (Exception e)
- {
- throw new ExceptionInInitializerError(e);
- }
+ assert !toolInitialized;
+ assert !clientInitialized;
+
+ // Some unit tests require this :(
+ if (daemonInitialized)
+ return;
+ daemonInitialized = true;
+
+ setConfig(loadConfig());
+ applyAll();
+ AuthConfig.applyAuthz();
+ }
+
+ public static void toolInitialization()
+ {
+ assert !daemonInitialized;
+ assert !clientInitialized;
+
+ if (toolInitialized)
+ return;
+ toolInitialized = true;
+
+ Config.setToolsMode(true);
+
+ setConfig(loadConfig());
+
+ applySimpleConfig();
+
+ applyPartitioner();
+
+ applySnitch();
+
+ applyEncryptionContext();
+ }
+
+ public static void clientInitialization()
+ {
+ assert !daemonInitialized;
+ assert !toolInitialized;
+
+ if (clientInitialized)
+ return;
+ clientInitialized = true;
+
+ Config.setClientMode(true);
+ conf = new Config();
+ }
+
+ public static Config getRawConfig()
+ {
+ return conf;
}
@VisibleForTesting
@@ -193,105 +219,34 @@ public class DatabaseDescriptor
}
}
- @VisibleForTesting
- public static void applyAddressConfig(Config config) throws ConfigurationException
+ private static void setConfig(Config config)
{
- listenAddress = null;
- rpcAddress = null;
- broadcastAddress = null;
- broadcastRpcAddress = null;
+ conf = config;
+ }
- /* Local IP, hostname or interface to bind services to */
- if (config.listen_address != null && config.listen_interface != null)
- {
- throw new ConfigurationException("Set listen_address OR listen_interface, not both", false);
- }
- else if (config.listen_address != null)
- {
- try
- {
- listenAddress = InetAddress.getByName(config.listen_address);
- }
- catch (UnknownHostException e)
- {
- throw new ConfigurationException("Unknown listen_address '" + config.listen_address + "'", false);
- }
+ private static void applyAll() throws ConfigurationException
+ {
+ applySimpleConfig();
- if (listenAddress.isAnyLocalAddress())
- throw new ConfigurationException("listen_address cannot be a wildcard address (" + config.listen_address + ")!", false);
- }
- else if (config.listen_interface != null)
- {
- listenAddress = getNetworkInterfaceAddress(config.listen_interface, "listen_interface", config.listen_interface_prefer_ipv6);
- }
+ applyPartitioner();
- /* Gossip Address to broadcast */
- if (config.broadcast_address != null)
- {
- try
- {
- broadcastAddress = InetAddress.getByName(config.broadcast_address);
- }
- catch (UnknownHostException e)
- {
- throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + "'", false);
- }
+ applyAddressConfig();
- if (broadcastAddress.isAnyLocalAddress())
- throw new ConfigurationException("broadcast_address cannot be a wildcard address (" + config.broadcast_address + ")!", false);
- }
+ applyThriftHSHA();
- /* Local IP, hostname or interface to bind RPC server to */
- if (config.rpc_address != null && config.rpc_interface != null)
- {
- throw new ConfigurationException("Set rpc_address OR rpc_interface, not both", false);
- }
- else if (config.rpc_address != null)
- {
- try
- {
- rpcAddress = InetAddress.getByName(config.rpc_address);
- }
- catch (UnknownHostException e)
- {
- throw new ConfigurationException("Unknown host in rpc_address " + config.rpc_address, false);
- }
- }
- else if (config.rpc_interface != null)
- {
- rpcAddress = getNetworkInterfaceAddress(config.rpc_interface, "rpc_interface", config.rpc_interface_prefer_ipv6);
- }
- else
- {
- rpcAddress = FBUtilities.getLocalAddress();
- }
+ applySnitch();
- /* RPC address to broadcast */
- if (config.broadcast_rpc_address != null)
- {
- try
- {
- broadcastRpcAddress = InetAddress.getByName(config.broadcast_rpc_address);
- }
- catch (UnknownHostException e)
- {
- throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false);
- }
+ applyRequestScheduler();
- if (broadcastRpcAddress.isAnyLocalAddress())
- throw new ConfigurationException("broadcast_rpc_address cannot be a wildcard address (" + config.broadcast_rpc_address + ")!", false);
- }
- else
- {
- if (rpcAddress.isAnyLocalAddress())
- throw new ConfigurationException("If rpc_address is set to a wildcard address (" + config.rpc_address + "), then " +
- "you must set broadcast_rpc_address to a value other than " + config.rpc_address, false);
- }
+ applyInitialTokens();
+
+ applySeedProvider();
+
+ applyEncryptionContext();
}
- public static void applyConfig(Config config) throws ConfigurationException
+ private static void applySimpleConfig()
{
- conf = config;
if (conf.commitlog_sync == null)
{
@@ -342,67 +297,6 @@ public class DatabaseDescriptor
logger.info("DiskAccessMode is {}, indexAccessMode is {}", conf.disk_access_mode, indexAccessMode);
}
- /* Authentication, authorization and role management backend, implementing IAuthenticator, IAuthorizer & IRoleMapper*/
- if (conf.authenticator != null)
- authenticator = FBUtilities.newAuthenticator(conf.authenticator);
-
- // the configuration options regarding credentials caching are only guaranteed to
- // work with PasswordAuthenticator, so log a message if some other authenticator
- // is in use and non-default values are detected
- if (!(authenticator instanceof PasswordAuthenticator)
- && (conf.credentials_update_interval_in_ms != -1
- || conf.credentials_validity_in_ms != 2000
- || conf.credentials_cache_max_entries != 1000))
- {
- logger.info("Configuration options credentials_update_interval_in_ms, credentials_validity_in_ms and " +
- "credentials_cache_max_entries may not be applicable for the configured authenticator ({})",
- authenticator.getClass().getName());
- }
-
- if (conf.authorizer != null)
- authorizer = FBUtilities.newAuthorizer(conf.authorizer);
-
- if (!authenticator.requireAuthentication() && authorizer.requireAuthorization())
- throw new ConfigurationException(conf.authenticator + " can't be used with " + conf.authorizer, false);
-
- if (conf.role_manager != null)
- roleManager = FBUtilities.newRoleManager(conf.role_manager);
- else
- roleManager = new CassandraRoleManager();
-
- if (authenticator instanceof PasswordAuthenticator && !(roleManager instanceof CassandraRoleManager))
- throw new ConfigurationException("CassandraRoleManager must be used with PasswordAuthenticator", false);
-
- if (conf.internode_authenticator != null)
- internodeAuthenticator = FBUtilities.construct(conf.internode_authenticator, "internode_authenticator");
- else
- internodeAuthenticator = new AllowAllInternodeAuthenticator();
-
- authenticator.validateConfiguration();
- authorizer.validateConfiguration();
- roleManager.validateConfiguration();
- internodeAuthenticator.validateConfiguration();
-
- /* Hashing strategy */
- if (conf.partitioner == null)
- {
- throw new ConfigurationException("Missing directive: partitioner", false);
- }
- try
- {
- partitioner = FBUtilities.newPartitioner(System.getProperty("cassandra.partitioner", conf.partitioner));
- }
- catch (Exception e)
- {
- throw new ConfigurationException("Invalid partitioner class " + conf.partitioner, false);
- }
- paritionerName = partitioner.getClass().getCanonicalName();
-
- if (config.gc_log_threshold_in_ms < 0)
- {
- throw new ConfigurationException("gc_log_threshold_in_ms must be a positive integer");
- }
-
if (conf.gc_warn_threshold_in_ms < 0)
{
throw new ConfigurationException("gc_warn_threshold_in_ms must be a positive integer");
@@ -454,83 +348,12 @@ public class DatabaseDescriptor
else
logger.info("Global memtable off-heap threshold is enabled at {}MB", conf.memtable_offheap_space_in_mb);
- applyAddressConfig(config);
-
if (conf.thrift_framed_transport_size_in_mb <= 0)
throw new ConfigurationException("thrift_framed_transport_size_in_mb must be positive, but was " + conf.thrift_framed_transport_size_in_mb, false);
if (conf.native_transport_max_frame_size_in_mb <= 0)
throw new ConfigurationException("native_transport_max_frame_size_in_mb must be positive, but was " + conf.native_transport_max_frame_size_in_mb, false);
- // fail early instead of OOMing (see CASSANDRA-8116)
- if (ThriftServer.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads == Integer.MAX_VALUE)
- throw new ConfigurationException("The hsha rpc_server_type is not compatible with an rpc_max_threads " +
- "setting of 'unlimited'. Please see the comments in cassandra.yaml " +
- "for rpc_server_type and rpc_max_threads.",
- false);
- if (ThriftServer.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads > (FBUtilities.getAvailableProcessors() * 2 + 1024))
- logger.warn("rpc_max_threads setting of {} may be too high for the hsha server and cause unnecessary thread contention, reducing performance", conf.rpc_max_threads);
-
- /* end point snitch */
- if (conf.endpoint_snitch == null)
- {
- throw new ConfigurationException("Missing endpoint_snitch directive", false);
- }
- snitch = createEndpointSnitch(conf.dynamic_snitch, conf.endpoint_snitch);
- EndpointSnitchInfo.create();
-
- localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
- localComparator = new Comparator<InetAddress>()
- {
- public int compare(InetAddress endpoint1, InetAddress endpoint2)
- {
- boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
- boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
- if (local1 && !local2)
- return -1;
- if (local2 && !local1)
- return 1;
- return 0;
- }
- };
-
- /* Request Scheduler setup */
- requestSchedulerOptions = conf.request_scheduler_options;
- if (conf.request_scheduler != null)
- {
- try
- {
- if (requestSchedulerOptions == null)
- {
- requestSchedulerOptions = new RequestSchedulerOptions();
- }
- Class<?> cls = Class.forName(conf.request_scheduler);
- requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
- }
- catch (ClassNotFoundException e)
- {
- throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler, false);
- }
- catch (Exception e)
- {
- throw new ConfigurationException("Unable to instantiate request scheduler", e);
- }
- }
- else
- {
- requestScheduler = new NoScheduler();
- }
-
- if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
- {
- requestSchedulerId = conf.request_scheduler_id;
- }
- else
- {
- // Default to Keyspace
- requestSchedulerId = RequestSchedulerId.keyspace;
- }
-
// if data dirs, commitlog dir, or saved caches dir are set in cassandra.yaml, use that. Otherwise,
// use -Dcassandra.storagedir (set in cassandra-env.sh) as the parent dir for data/, commitlog/, and saved_caches/
if (conf.commitlog_directory == null)
@@ -700,97 +523,242 @@ public class DatabaseDescriptor
else if (conf.num_tokens > MAX_NUM_TOKENS)
throw new ConfigurationException(String.format("A maximum number of %d tokens per node is supported", MAX_NUM_TOKENS), false);
- if (conf.initial_token != null)
+ try
{
- Collection<String> tokens = tokensFromString(conf.initial_token);
- if (tokens.size() != conf.num_tokens)
- throw new ConfigurationException("The number of initial tokens (by initial_token) specified is different from num_tokens value", false);
+ // if prepared_statements_cache_size_mb option was set to "auto" then size of the cache should be "max(1/256 of Heap (in MB), 10MB)"
+ preparedStatementsCacheSizeInMB = (conf.prepared_statements_cache_size_mb == null)
+ ? Math.max(10, (int) (Runtime.getRuntime().maxMemory() / 1024 / 1024 / 256))
+ : conf.prepared_statements_cache_size_mb;
- for (String token : tokens)
- partitioner.getTokenFactory().validate(token);
+ if (preparedStatementsCacheSizeInMB <= 0)
+ throw new NumberFormatException(); // to escape duplicating error message
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ConfigurationException("prepared_statements_cache_size_mb option was set incorrectly to '"
+ + conf.prepared_statements_cache_size_mb + "', supported values are <integer> >= 0.", false);
+ }
+
+ try
+ {
+ // if thrift_prepared_statements_cache_size_mb option was set to "auto" then size of the cache should be "max(1/256 of Heap (in MB), 10MB)"
+ thriftPreparedStatementsCacheSizeInMB = (conf.thrift_prepared_statements_cache_size_mb == null)
+ ? Math.max(10, (int) (Runtime.getRuntime().maxMemory() / 1024 / 1024 / 256))
+ : conf.thrift_prepared_statements_cache_size_mb;
+
+ if (thriftPreparedStatementsCacheSizeInMB <= 0)
+ throw new NumberFormatException(); // to escape duplicating error message
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ConfigurationException("thrift_prepared_statements_cache_size_mb option was set incorrectly to '"
+ + conf.thrift_prepared_statements_cache_size_mb + "', supported values are <integer> >= 0.", false);
}
+ try
+ {
+ // if key_cache_size_in_mb option was set to "auto" then size of the cache should be "min(5% of Heap (in MB), 100MB)
+ keyCacheSizeInMB = (conf.key_cache_size_in_mb == null)
+ ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)), 100)
+ : conf.key_cache_size_in_mb;
+
+ if (keyCacheSizeInMB < 0)
+ throw new NumberFormatException(); // to escape duplicating error message
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ConfigurationException("key_cache_size_in_mb option was set incorrectly to '"
+ + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
+ }
try
{
- // if prepared_statements_cache_size_mb option was set to "auto" then size of the cache should be "max(1/256 of Heap (in MB), 10MB)"
- preparedStatementsCacheSizeInMB = (conf.prepared_statements_cache_size_mb == null)
- ? Math.max(10, (int) (Runtime.getRuntime().maxMemory() / 1024 / 1024 / 256))
- : conf.prepared_statements_cache_size_mb;
+ // if counter_cache_size_in_mb option was set to "auto" then size of the cache should be "min(2.5% of Heap (in MB), 50MB)
+ counterCacheSizeInMB = (conf.counter_cache_size_in_mb == null)
+ ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.025 / 1024 / 1024)), 50)
+ : conf.counter_cache_size_in_mb;
+
+ if (counterCacheSizeInMB < 0)
+ throw new NumberFormatException(); // to escape duplicating error message
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ConfigurationException("counter_cache_size_in_mb option was set incorrectly to '"
+ + conf.counter_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
+ }
+
+ // if set to empty/"auto" then use 5% of Heap size
+ indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null)
+ ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024))
+ : conf.index_summary_capacity_in_mb;
+
+ if (indexSummaryCapacityInMB < 0)
+ throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '"
+ + conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.", false);
+
+ if(conf.encryption_options != null)
+ {
+ logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
+ //operate under the assumption that server_encryption_options is not set in yaml rather than both
+ conf.server_encryption_options = conf.encryption_options;
+ }
+
+ if (conf.user_defined_function_fail_timeout < 0)
+ throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false);
+ if (conf.user_defined_function_warn_timeout < 0)
+ throw new ConfigurationException("user_defined_function_warn_timeout must not be negative", false);
+
+ if (conf.user_defined_function_fail_timeout < conf.user_defined_function_warn_timeout)
+ throw new ConfigurationException("user_defined_function_warn_timeout must less than user_defined_function_fail_timeout", false);
+
+ if (conf.max_mutation_size_in_kb == null)
+ conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2;
+ else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb)
+ throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false);
+
+ // native transport encryption options
+ if (conf.native_transport_port_ssl != null
+ && conf.native_transport_port_ssl.intValue() != conf.native_transport_port.intValue()
+ && !conf.client_encryption_options.enabled)
+ {
+ throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false);
+ }
+
+ if (conf.max_value_size_in_mb == null || conf.max_value_size_in_mb <= 0)
+ throw new ConfigurationException("max_value_size_in_mb must be positive", false);
+
+ switch (conf.disk_optimization_strategy)
+ {
+ case ssd:
+ diskOptimizationStrategy = new SsdDiskOptimizationStrategy(conf.disk_optimization_page_cross_chance);
+ break;
+ case spinning:
+ diskOptimizationStrategy = new SpinningDiskOptimizationStrategy();
+ break;
+ }
+ }
+
+ public static void applyAddressConfig() throws ConfigurationException
+ {
+ applyAddressConfig(conf);
+ }
+
+ public static void applyAddressConfig(Config config) throws ConfigurationException
+ {
+ listenAddress = null;
+ rpcAddress = null;
+ broadcastAddress = null;
+ broadcastRpcAddress = null;
+
+ /* Local IP, hostname or interface to bind services to */
+ if (config.listen_address != null && config.listen_interface != null)
+ {
+ throw new ConfigurationException("Set listen_address OR listen_interface, not both", false);
+ }
+ else if (config.listen_address != null)
+ {
+ try
+ {
+ listenAddress = InetAddress.getByName(config.listen_address);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new ConfigurationException("Unknown listen_address '" + config.listen_address + "'", false);
+ }
- if (preparedStatementsCacheSizeInMB <= 0)
- throw new NumberFormatException(); // to escape duplicating error message
+ if (listenAddress.isAnyLocalAddress())
+ throw new ConfigurationException("listen_address cannot be a wildcard address (" + config.listen_address + ")!", false);
}
- catch (NumberFormatException e)
+ else if (config.listen_interface != null)
{
- throw new ConfigurationException("prepared_statements_cache_size_mb option was set incorrectly to '"
- + conf.prepared_statements_cache_size_mb + "', supported values are <integer> >= 0.", false);
+ listenAddress = getNetworkInterfaceAddress(config.listen_interface, "listen_interface", config.listen_interface_prefer_ipv6);
}
- try
+ /* Gossip Address to broadcast */
+ if (config.broadcast_address != null)
{
- // if thrift_prepared_statements_cache_size_mb option was set to "auto" then size of the cache should be "max(1/256 of Heap (in MB), 10MB)"
- thriftPreparedStatementsCacheSizeInMB = (conf.thrift_prepared_statements_cache_size_mb == null)
- ? Math.max(10, (int) (Runtime.getRuntime().maxMemory() / 1024 / 1024 / 256))
- : conf.thrift_prepared_statements_cache_size_mb;
+ try
+ {
+ broadcastAddress = InetAddress.getByName(config.broadcast_address);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + "'", false);
+ }
- if (thriftPreparedStatementsCacheSizeInMB <= 0)
- throw new NumberFormatException(); // to escape duplicating error message
+ if (broadcastAddress.isAnyLocalAddress())
+ throw new ConfigurationException("broadcast_address cannot be a wildcard address (" + config.broadcast_address + ")!", false);
}
- catch (NumberFormatException e)
+
+ /* Local IP, hostname or interface to bind RPC server to */
+ if (config.rpc_address != null && config.rpc_interface != null)
{
- throw new ConfigurationException("thrift_prepared_statements_cache_size_mb option was set incorrectly to '"
- + conf.thrift_prepared_statements_cache_size_mb + "', supported values are <integer> >= 0.", false);
+ throw new ConfigurationException("Set rpc_address OR rpc_interface, not both", false);
}
-
- try
+ else if (config.rpc_address != null)
{
- // if key_cache_size_in_mb option was set to "auto" then size of the cache should be "min(5% of Heap (in MB), 100MB)
- keyCacheSizeInMB = (conf.key_cache_size_in_mb == null)
- ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)), 100)
- : conf.key_cache_size_in_mb;
-
- if (keyCacheSizeInMB < 0)
- throw new NumberFormatException(); // to escape duplicating error message
+ try
+ {
+ rpcAddress = InetAddress.getByName(config.rpc_address);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new ConfigurationException("Unknown host in rpc_address " + config.rpc_address, false);
+ }
}
- catch (NumberFormatException e)
+ else if (config.rpc_interface != null)
{
- throw new ConfigurationException("key_cache_size_in_mb option was set incorrectly to '"
- + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
+ rpcAddress = getNetworkInterfaceAddress(config.rpc_interface, "rpc_interface", config.rpc_interface_prefer_ipv6);
+ }
+ else
+ {
+ rpcAddress = FBUtilities.getLocalAddress();
}
- try
+ /* RPC address to broadcast */
+ if (config.broadcast_rpc_address != null)
{
- // if counter_cache_size_in_mb option was set to "auto" then size of the cache should be "min(2.5% of Heap (in MB), 50MB)
- counterCacheSizeInMB = (conf.counter_cache_size_in_mb == null)
- ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.025 / 1024 / 1024)), 50)
- : conf.counter_cache_size_in_mb;
+ try
+ {
+ broadcastRpcAddress = InetAddress.getByName(config.broadcast_rpc_address);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false);
+ }
- if (counterCacheSizeInMB < 0)
- throw new NumberFormatException(); // to escape duplicating error message
+ if (broadcastRpcAddress.isAnyLocalAddress())
+ throw new ConfigurationException("broadcast_rpc_address cannot be a wildcard address (" + config.broadcast_rpc_address + ")!", false);
}
- catch (NumberFormatException e)
+ else
{
- throw new ConfigurationException("counter_cache_size_in_mb option was set incorrectly to '"
- + conf.counter_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
+ if (rpcAddress.isAnyLocalAddress())
+ throw new ConfigurationException("If rpc_address is set to a wildcard address (" + config.rpc_address + "), then " +
+ "you must set broadcast_rpc_address to a value other than " + config.rpc_address, false);
}
+ }
- // if set to empty/"auto" then use 5% of Heap size
- indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null)
- ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024))
- : conf.index_summary_capacity_in_mb;
-
- if (indexSummaryCapacityInMB < 0)
- throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '"
- + conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.", false);
+ public static void applyThriftHSHA()
+ {
+ // fail early instead of OOMing (see CASSANDRA-8116)
+ if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads == Integer.MAX_VALUE)
+ throw new ConfigurationException("The hsha rpc_server_type is not compatible with an rpc_max_threads " +
+ "setting of 'unlimited'. Please see the comments in cassandra.yaml " +
+ "for rpc_server_type and rpc_max_threads.",
+ false);
+ if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads > (FBUtilities.getAvailableProcessors() * 2 + 1024))
+ logger.warn("rpc_max_threads setting of {} may be too high for the hsha server and cause unnecessary thread contention, reducing performance", conf.rpc_max_threads);
+ }
- if(conf.encryption_options != null)
- {
- logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
- //operate under the assumption that server_encryption_options is not set in yaml rather than both
- conf.server_encryption_options = conf.encryption_options;
- }
+ public static void applyEncryptionContext()
+ {
+ // always attempt to load the cipher factory, as we could be in the situation where the user has disabled encryption,
+ // but has existing commitlogs and sstables on disk that are still encrypted (and still need to be read)
+ encryptionContext = new EncryptionContext(conf.transparent_data_encryption_options);
+ }
+ public static void applySeedProvider()
+ {
// load the seeds for node contact points
if (conf.seed_provider == null)
{
@@ -808,34 +776,107 @@ public class DatabaseDescriptor
}
if (seedProvider.getSeeds().size() == 0)
throw new ConfigurationException("The seed provider lists no seeds.", false);
+ }
- if (conf.user_defined_function_fail_timeout < 0)
- throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false);
- if (conf.user_defined_function_warn_timeout < 0)
- throw new ConfigurationException("user_defined_function_warn_timeout must not be negative", false);
+ public static void applyInitialTokens()
+ {
+ if (conf.initial_token != null)
+ {
+ Collection<String> tokens = tokensFromString(conf.initial_token);
+ if (tokens.size() != conf.num_tokens)
+ throw new ConfigurationException("The number of initial tokens (by initial_token) specified is different from num_tokens value", false);
- if (conf.user_defined_function_fail_timeout < conf.user_defined_function_warn_timeout)
- throw new ConfigurationException("user_defined_function_warn_timeout must less than user_defined_function_fail_timeout", false);
+ for (String token : tokens)
+ partitioner.getTokenFactory().validate(token);
+ }
+ }
- // always attempt to load the cipher factory, as we could be in the situation where the user has disabled encryption,
- // but has existing commitlogs and sstables on disk that are still encrypted (and still need to be read)
- encryptionContext = new EncryptionContext(config.transparent_data_encryption_options);
+ // Maybe safe for clients + tools
+ public static void applyRequestScheduler()
+ {
+ /* Request Scheduler setup */
+ requestSchedulerOptions = conf.request_scheduler_options;
+ if (conf.request_scheduler != null)
+ {
+ try
+ {
+ if (requestSchedulerOptions == null)
+ {
+ requestSchedulerOptions = new RequestSchedulerOptions();
+ }
+ Class<?> cls = Class.forName(conf.request_scheduler);
+ requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler, false);
+ }
+ catch (Exception e)
+ {
+ throw new ConfigurationException("Unable to instantiate request scheduler", e);
+ }
+ }
+ else
+ {
+ requestScheduler = new NoScheduler();
+ }
- if (conf.max_mutation_size_in_kb == null)
- conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2;
- else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb)
- throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false);
+ if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
+ {
+ requestSchedulerId = conf.request_scheduler_id;
+ }
+ else
+ {
+ // Default to Keyspace
+ requestSchedulerId = RequestSchedulerId.keyspace;
+ }
+ }
- // native transport encryption options
- if (conf.native_transport_port_ssl != null
- && conf.native_transport_port_ssl.intValue() != conf.native_transport_port.intValue()
- && !conf.client_encryption_options.enabled)
+ // definitely not safe for tools + clients - implicitly instantiates StorageService
+ public static void applySnitch()
+ {
+ /* end point snitch */
+ if (conf.endpoint_snitch == null)
{
- throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false);
+ throw new ConfigurationException("Missing endpoint_snitch directive", false);
}
+ snitch = createEndpointSnitch(conf.dynamic_snitch, conf.endpoint_snitch);
+ EndpointSnitchInfo.create();
- if (conf.max_value_size_in_mb == null || conf.max_value_size_in_mb <= 0)
- throw new ConfigurationException("max_value_size_in_mb must be positive", false);
+ localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
+ localComparator = new Comparator<InetAddress>()
+ {
+ public int compare(InetAddress endpoint1, InetAddress endpoint2)
+ {
+ boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
+ boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
+ if (local1 && !local2)
+ return -1;
+ if (local2 && !local1)
+ return 1;
+ return 0;
+ }
+ };
+ }
+
+ // definitely not safe for tools + clients - implicitly instantiates schema
+ public static void applyPartitioner()
+ {
+ /* Hashing strategy */
+ if (conf.partitioner == null)
+ {
+ throw new ConfigurationException("Missing directive: partitioner", false);
+ }
+ try
+ {
+ partitioner = FBUtilities.newPartitioner(System.getProperty("cassandra.partitioner", conf.partitioner));
+ }
+ catch (Exception e)
+ {
+ throw new ConfigurationException("Invalid partitioner class " + conf.partitioner, false);
+ }
+
+ paritionerName = partitioner.getClass().getCanonicalName();
}
private static FileStore guessFileStore(String dir) throws IOException
@@ -870,16 +911,31 @@ public class DatabaseDescriptor
return authenticator;
}
+ public static void setAuthenticator(IAuthenticator authenticator)
+ {
+ DatabaseDescriptor.authenticator = authenticator;
+ }
+
public static IAuthorizer getAuthorizer()
{
return authorizer;
}
+ public static void setAuthorizer(IAuthorizer authorizer)
+ {
+ DatabaseDescriptor.authorizer = authorizer;
+ }
+
public static IRoleManager getRoleManager()
{
return roleManager;
}
+ public static void setRoleManager(IRoleManager roleManager)
+ {
+ DatabaseDescriptor.roleManager = roleManager;
+ }
+
public static int getPermissionsValidity()
{
return conf.permissions_validity_in_ms;
@@ -1183,16 +1239,6 @@ public class DatabaseDescriptor
}
}
- public static boolean isReplacing()
- {
- if (System.getProperty("cassandra.replace_address_first_boot", null) != null && SystemKeyspace.bootstrapComplete())
- {
- logger.info("Replace address on first boot requested; this node is already bootstrapped");
- return false;
- }
- return getReplaceAddress() != null;
- }
-
public static String getClusterName()
{
return conf.cluster_name;
@@ -1293,34 +1339,6 @@ public class DatabaseDescriptor
return conf.cross_node_timeout;
}
- // not part of the Verb enum so we can change timeouts easily via JMX
- public static long getTimeout(MessagingService.Verb verb)
- {
- switch (verb)
- {
- case READ:
- return getReadRpcTimeout();
- case RANGE_SLICE:
- case PAGED_RANGE:
- return getRangeRpcTimeout();
- case TRUNCATE:
- return getTruncateRpcTimeout();
- case READ_REPAIR:
- case MUTATION:
- case PAXOS_COMMIT:
- case PAXOS_PREPARE:
- case PAXOS_PROPOSE:
- case HINT:
- case BATCH_STORE:
- case BATCH_REMOVE:
- return getWriteRpcTimeout();
- case COUNTER_MUTATION:
- return getCounterWriteRpcTimeout();
- default:
- return getRpcTimeout();
- }
- }
-
public static long getSlowQueryTimeout()
{
return conf.slow_query_log_timeout_in_ms;
@@ -1530,6 +1548,11 @@ public class DatabaseDescriptor
return internodeAuthenticator;
}
+ public static void setInternodeAuthenticator(IInternodeAuthenticator internodeAuthenticator)
+ {
+ DatabaseDescriptor.internodeAuthenticator = internodeAuthenticator;
+ }
+
public static void setBroadcastAddress(InetAddress broadcastAdd)
{
broadcastAddress = broadcastAdd;
@@ -1793,7 +1816,7 @@ public class DatabaseDescriptor
return new File(conf.hints_directory);
}
- public static File getSerializedCachePath(CacheService.CacheType cacheType, String version, String extension)
+ public static File getSerializedCachePath(CacheType cacheType, String version, String extension)
{
String name = cacheType.toString()
+ (version == null ? "" : "-" + version + "." + extension);
@@ -1828,12 +1851,12 @@ public class DatabaseDescriptor
conf.dynamic_snitch_badness_threshold = dynamicBadnessThreshold;
}
- public static ServerEncryptionOptions getServerEncryptionOptions()
+ public static EncryptionOptions.ServerEncryptionOptions getServerEncryptionOptions()
{
return conf.server_encryption_options;
}
- public static ClientEncryptionOptions getClientEncryptionOptions()
+ public static EncryptionOptions.ClientEncryptionOptions getClientEncryptionOptions()
{
return conf.client_encryption_options;
}
@@ -2060,33 +2083,24 @@ public class DatabaseDescriptor
return conf.inter_dc_tcp_nodelay;
}
+ public static long getMemtableHeapSpaceInMb()
+ {
+ return conf.memtable_heap_space_in_mb;
+ }
+
+ public static long getMemtableOffheapSpaceInMb()
+ {
+ return conf.memtable_offheap_space_in_mb;
+ }
- public static SSTableFormat.Type getSSTableFormat()
+ public static Config.MemtableAllocationType getMemtableAllocationType()
{
- return sstable_format;
+ return conf.memtable_allocation_type;
}
- public static MemtablePool getMemtableAllocatorPool()
+ public static Float getMemtableCleanupThreshold()
{
- long heapLimit = ((long) conf.memtable_heap_space_in_mb) << 20;
- long offHeapLimit = ((long) conf.memtable_offheap_space_in_mb) << 20;
- switch (conf.memtable_allocation_type)
- {
- case unslabbed_heap_buffers:
- return new HeapPool(heapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
- case heap_buffers:
- return new SlabPool(heapLimit, 0, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
- case offheap_buffers:
- if (!FileUtils.isCleanerAvailable)
- {
- throw new IllegalStateException("Could not free direct byte buffer: offheap_buffers is not a safe memtable_allocation_type without this ability, please adjust your config. This feature is only guaranteed to work on an Oracle JVM. Refusing to start.");
- }
- return new SlabPool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
- case offheap_objects:
- return new NativePool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
- default:
- throw new AssertionError();
- }
+ return conf.memtable_cleanup_threshold;
}
public static int getIndexSummaryResizeIntervalInMinutes()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index dd42779..eed316b 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -17,18 +17,14 @@
*/
package org.apache.cassandra.config;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.auth.AuthKeyspace;
import org.apache.cassandra.cql3.functions.*;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
@@ -40,10 +36,8 @@ import org.apache.cassandra.db.marshal.UserType;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.locator.LocalStrategy;
-import org.apache.cassandra.repair.SystemDistributedKeyspace;
import org.apache.cassandra.schema.*;
import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.tracing.TraceKeyspace;
import org.apache.cassandra.utils.ConcurrentBiMap;
import org.apache.cassandra.utils.Pair;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -54,22 +48,6 @@ public class Schema
public static final Schema instance = new Schema();
- /* system keyspace names (the ones with LocalStrategy replication strategy) */
- public static final Set<String> SYSTEM_KEYSPACE_NAMES = ImmutableSet.of(SystemKeyspace.NAME, SchemaKeyspace.NAME);
-
- /* replicate system keyspace names (the ones with a "true" replication strategy) */
- public static final Set<String> REPLICATED_SYSTEM_KEYSPACE_NAMES = ImmutableSet.of(TraceKeyspace.NAME,
- AuthKeyspace.NAME,
- SystemDistributedKeyspace.NAME);
-
- /**
- * longest permissible KS or CF name. Our main concern is that filename not be more than 255 characters;
- * the filename will contain both the KS and CF names. Since non-schema-name components only take up
- * ~64 characters, we could allow longer names than this, but on Windows, the entire path should be not greater than
- * 255 characters, so a lower limit here helps avoid problems. See CASSANDRA-4110.
- */
- public static final int NAME_LENGTH = 48;
-
/* metadata map for faster keyspace lookup */
private final Map<String, KeyspaceMetadata> keyspaces = new NonBlockingHashMap<>();
@@ -81,22 +59,6 @@ public class Schema
private volatile UUID version;
- // 59adb24e-f3cd-3e02-97f0-5b395827453f
- public static final UUID emptyVersion;
-
-
- static
- {
- try
- {
- emptyVersion = UUID.nameUUIDFromBytes(MessageDigest.getInstance("MD5").digest());
- }
- catch (NoSuchAlgorithmException e)
- {
- throw new AssertionError();
- }
- }
-
/**
* Initialize empty schema object and load the hardcoded system tables
*/
@@ -110,14 +72,6 @@ public class Schema
}
/**
- * @return whether or not the keyspace is a really system one (w/ LocalStrategy, unmodifiable, hardcoded)
- */
- public static boolean isSystemKeyspace(String keyspaceName)
- {
- return SYSTEM_KEYSPACE_NAMES.contains(keyspaceName.toLowerCase());
- }
-
- /**
* load keyspace (keyspace) definitions, but do not initialize the keyspace instances.
* Schema version may be updated as the result.
*/
@@ -341,7 +295,7 @@ public class Schema
private Set<String> getNonSystemKeyspacesSet()
{
- return Sets.difference(keyspaces.keySet(), SYSTEM_KEYSPACE_NAMES);
+ return Sets.difference(keyspaces.keySet(), SchemaConstants.SYSTEM_KEYSPACE_NAMES);
}
/**
@@ -370,7 +324,7 @@ public class Schema
*/
public List<String> getUserKeyspaces()
{
- return ImmutableList.copyOf(Sets.difference(getNonSystemKeyspacesSet(), REPLICATED_SYSTEM_KEYSPACE_NAMES));
+ return ImmutableList.copyOf(Sets.difference(getNonSystemKeyspacesSet(), SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES));
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/config/SchemaConstants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/SchemaConstants.java b/src/java/org/apache/cassandra/config/SchemaConstants.java
new file mode 100644
index 0000000..2416d6b
--- /dev/null
+++ b/src/java/org/apache/cassandra/config/SchemaConstants.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.config;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableSet;
+
+public final class SchemaConstants
+{
+ public static final String SYSTEM_KEYSPACE_NAME = "system";
+ public static final String SCHEMA_KEYSPACE_NAME = "system_schema";
+
+ public static final String TRACE_KEYSPACE_NAME = "system_traces";
+ public static final String AUTH_KEYSPACE_NAME = "system_auth";
+ public static final String DISTRIBUTED_KEYSPACE_NAME = "system_distributed";
+
+ /* system keyspace names (the ones with LocalStrategy replication strategy) */
+ public static final Set<String> SYSTEM_KEYSPACE_NAMES = ImmutableSet.of(SYSTEM_KEYSPACE_NAME, SCHEMA_KEYSPACE_NAME);
+
+ /* replicate system keyspace names (the ones with a "true" replication strategy) */
+ public static final Set<String> REPLICATED_SYSTEM_KEYSPACE_NAMES = ImmutableSet.of(TRACE_KEYSPACE_NAME,
+ AUTH_KEYSPACE_NAME,
+ DISTRIBUTED_KEYSPACE_NAME);
+ /**
+ * longest permissible KS or CF name. Our main concern is that filename not be more than 255 characters;
+ * the filename will contain both the KS and CF names. Since non-schema-name components only take up
+ * ~64 characters, we could allow longer names than this, but on Windows, the entire path should be not greater than
+ * 255 characters, so a lower limit here helps avoid problems. See CASSANDRA-4110.
+ */
+ public static final int NAME_LENGTH = 48;
+
+ // 59adb24e-f3cd-3e02-97f0-5b395827453f
+ public static final UUID emptyVersion;
+
+ static
+ {
+ try
+ {
+ emptyVersion = UUID.nameUUIDFromBytes(MessageDigest.getInstance("MD5").digest());
+ }
+ catch (NoSuchAlgorithmException e)
+ {
+ throw new AssertionError();
+ }
+ }
+
+ /**
+ * @return whether or not the keyspace is a really system one (w/ LocalStrategy, unmodifiable, hardcoded)
+ */
+ public static boolean isSystemKeyspace(String keyspaceName)
+ {
+ return SYSTEM_KEYSPACE_NAMES.contains(keyspaceName.toLowerCase());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
index bd5638a..ca5e41a 100644
--- a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
+++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
@@ -91,11 +91,13 @@ public class YamlConfigurationLoader implements ConfigurationLoader
return url;
}
- private static final URL storageConfigURL = getStorageConfigURL();
+ private static URL storageConfigURL;
@Override
public Config loadConfig() throws ConfigurationException
{
+ if (storageConfigURL == null)
+ storageConfigURL = getStorageConfigURL();
return loadConfig(storageConfigURL);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 47462e4..4baa38c 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -35,8 +35,10 @@ import org.slf4j.LoggerFactory;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import org.antlr.runtime.*;
import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.cql3.functions.FunctionName;
import org.apache.cassandra.cql3.statements.*;
@@ -134,7 +136,7 @@ public class QueryProcessor implements QueryHandler
InternalStateInstance()
{
ClientState state = ClientState.forInternalCalls();
- state.setKeyspace(SystemKeyspace.NAME);
+ state.setKeyspace(SchemaConstants.SYSTEM_KEYSPACE_NAME);
this.queryState = new QueryState(state);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionName.java b/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
index d732efa..aa980e9 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.cql3.functions;
import com.google.common.base.Objects;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.config.SchemaConstants;
public final class FunctionName
{
@@ -28,7 +28,7 @@ public final class FunctionName
public static FunctionName nativeFunction(String name)
{
- return new FunctionName(SystemKeyspace.NAME, name);
+ return new FunctionName(SchemaConstants.SYSTEM_KEYSPACE_NAME, name);
}
public FunctionName(String keyspace, String name)
@@ -67,8 +67,8 @@ public final class FunctionName
public final boolean equalsNativeFunction(FunctionName nativeFunction)
{
- assert nativeFunction.keyspace.equals(SystemKeyspace.NAME);
- if (this.hasKeyspace() && !this.keyspace.equals(SystemKeyspace.NAME))
+ assert nativeFunction.keyspace.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+ if (this.hasKeyspace() && !this.keyspace.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME))
return false;
return Objects.equal(this.name, nativeFunction.name);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
index 5642b0d..591e54a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.locator.LocalStrategy;
import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -55,7 +56,7 @@ public class AlterKeyspaceStatement extends SchemaAlteringStatement
KeyspaceMetadata ksm = Schema.instance.getKSMetaData(name);
if (ksm == null)
throw new InvalidRequestException("Unknown keyspace " + name);
- if (Schema.isSystemKeyspace(ksm.name))
+ if (SchemaConstants.isSystemKeyspace(ksm.name))
throw new InvalidRequestException("Cannot alter system keyspace");
attrs.validate();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
index f88c04f..33d2ce4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
@@ -21,7 +21,7 @@ import java.util.regex.Pattern;
import org.apache.cassandra.auth.*;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.locator.LocalStrategy;
import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -79,8 +79,8 @@ public class CreateKeyspaceStatement extends SchemaAlteringStatement
// keyspace name
if (!PATTERN_WORD_CHARS.matcher(name).matches())
throw new InvalidRequestException(String.format("\"%s\" is not a valid keyspace name", name));
- if (name.length() > Schema.NAME_LENGTH)
- throw new InvalidRequestException(String.format("Keyspace names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, name));
+ if (name.length() > SchemaConstants.NAME_LENGTH)
+ throw new InvalidRequestException(String.format("Keyspace names shouldn't be more than %s characters long (got \"%s\")", SchemaConstants.NAME_LENGTH, name));
attrs.validate();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index 08c3a4c..90f0cdb 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -206,8 +206,8 @@ public class CreateTableStatement extends SchemaAlteringStatement
// Column family name
if (!PATTERN_WORD_CHARS.matcher(columnFamily()).matches())
throw new InvalidRequestException(String.format("\"%s\" is not a valid table name (must be alphanumeric character or underscore only: [a-zA-Z_0-9]+)", columnFamily()));
- if (columnFamily().length() > Schema.NAME_LENGTH)
- throw new InvalidRequestException(String.format("Table names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, columnFamily()));
+ if (columnFamily().length() > SchemaConstants.NAME_LENGTH)
+ throw new InvalidRequestException(String.format("Table names shouldn't be more than %s characters long (got \"%s\")", SchemaConstants.NAME_LENGTH, columnFamily()));
for (Multiset.Entry<ColumnIdentifier> entry : definedNames.entrySet())
if (entry.getCount() > 1)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
index 58f8e9c..b8f2f92 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
@@ -21,6 +21,7 @@ import java.util.*;
import org.apache.cassandra.auth.*;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -31,7 +32,7 @@ import org.apache.cassandra.transport.messages.ResultMessage;
public class ListPermissionsStatement extends AuthorizationStatement
{
- private static final String KS = AuthKeyspace.NAME;
+ private static final String KS = SchemaConstants.AUTH_KEYSPACE_NAME;
private static final String CF = "permissions"; // virtual cf to use for now.
private static final List<ColumnSpecification> metadata;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
index 477aedc..3fee57a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
import org.apache.cassandra.auth.*;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.db.marshal.BooleanType;
import org.apache.cassandra.db.marshal.MapType;
@@ -37,7 +38,7 @@ import org.apache.cassandra.transport.messages.ResultMessage;
public class ListRolesStatement extends AuthorizationStatement
{
// pseudo-virtual cf as the actual datasource is dependent on the IRoleManager impl
- private static final String KS = AuthKeyspace.NAME;
+ private static final String KS = SchemaConstants.AUTH_KEYSPACE_NAME;
private static final String CF = AuthKeyspace.ROLES;
private static final MapType optionsType = MapType.getInstance(UTF8Type.instance, UTF8Type.instance, false);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java
index 7251980..0101363 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.cassandra.auth.*;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.ColumnSpecification;
import org.apache.cassandra.cql3.ResultSet;
@@ -33,7 +34,7 @@ import org.apache.cassandra.transport.messages.ResultMessage;
public class ListUsersStatement extends ListRolesStatement
{
// pseudo-virtual cf as the actual datasource is dependent on the IRoleManager impl
- private static final String KS = AuthKeyspace.NAME;
+ private static final String KS = SchemaConstants.AUTH_KEYSPACE_NAME;
private static final String CF = "users";
private static final List<ColumnSpecification> metadata =
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java b/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java
index b22e400..3ae6bd8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java
@@ -21,8 +21,8 @@ import java.util.Set;
import org.apache.cassandra.auth.*;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.cql3.RoleName;
-import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.exceptions.UnauthorizedException;
@@ -54,7 +54,7 @@ public abstract class PermissionsManagementStatement extends AuthorizationStatem
// altering permissions on builtin functions is not supported
if (resource instanceof FunctionResource
- && SystemKeyspace.NAME.equals(((FunctionResource)resource).getKeyspace()))
+ && SchemaConstants.SYSTEM_KEYSPACE_NAME.equals(((FunctionResource)resource).getKeyspace()))
{
throw new InvalidRequestException("Altering permissions on builtin functions is not supported");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 06dff0f..14d6440 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -817,7 +817,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public String getSSTablePath(File directory)
{
- return getSSTablePath(directory, DatabaseDescriptor.getSSTableFormat().info.getLatestVersion(), DatabaseDescriptor.getSSTableFormat());
+ return getSSTablePath(directory, SSTableFormat.Type.current().info.getLatestVersion(), SSTableFormat.Type.current());
}
public String getSSTablePath(File directory, SSTableFormat.Type format)
@@ -1775,7 +1775,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
writeSnapshotManifest(filesJSONArr, snapshotName);
- if (!Schema.SYSTEM_KEYSPACE_NAMES.contains(metadata.ksName) && !Schema.REPLICATED_SYSTEM_KEYSPACE_NAMES.contains(metadata.ksName))
+ if (!SchemaConstants.SYSTEM_KEYSPACE_NAMES.contains(metadata.ksName) && !SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES.contains(metadata.ksName))
writeSnapshotSchema(snapshotName);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 0d78245..741058f 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -100,7 +100,7 @@ public class Keyspace
public static Keyspace open(String keyspaceName)
{
- assert initialized || Schema.isSystemKeyspace(keyspaceName);
+ assert initialized || SchemaConstants.isSystemKeyspace(keyspaceName);
return open(keyspaceName, Schema.instance, true);
}
@@ -685,7 +685,7 @@ public class Keyspace
public static Iterable<Keyspace> system()
{
- return Iterables.transform(Schema.SYSTEM_KEYSPACE_NAMES, keyspaceTransformer);
+ return Iterables.transform(SchemaConstants.SYSTEM_KEYSPACE_NAMES, keyspaceTransformer);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 9e7e9b6..4fdf28c 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.*;
+import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.utils.AbstractIterator;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
@@ -1025,7 +1026,7 @@ public abstract class LegacyLayout
// then simply ignore the cell is fine. But also not that we ignore if it's the
// system keyspace because for those table we actually remove columns without registering
// them in the dropped columns
- assert metadata.ksName.equals(SystemKeyspace.NAME) || metadata.getDroppedColumnDefinition(e.columnName) != null : e.getMessage();
+ assert metadata.ksName.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME) || metadata.getDroppedColumnDefinition(e.columnName) != null : e.getMessage();
}
}
}
@@ -1105,7 +1106,7 @@ public abstract class LegacyLayout
// then simply ignore the cell is fine. But also not that we ignore if it's the
// system keyspace because for those table we actually remove columns without registering
// them in the dropped columns
- if (metadata.ksName.equals(SystemKeyspace.NAME) || metadata.getDroppedColumnDefinition(e.columnName) != null)
+ if (metadata.ksName.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME) || metadata.getDroppedColumnDefinition(e.columnName) != null)
return computeNext();
else
throw new IOError(e);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index e9cca4a..f8b258c 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.commitlog.IntervalSet;
@@ -48,20 +49,48 @@ import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.HeapPool;
import org.apache.cassandra.utils.memory.MemtableAllocator;
import org.apache.cassandra.utils.memory.MemtablePool;
+import org.apache.cassandra.utils.memory.NativePool;
+import org.apache.cassandra.utils.memory.SlabPool;
public class Memtable implements Comparable<Memtable>
{
private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
- public static final MemtablePool MEMORY_POOL = DatabaseDescriptor.getMemtableAllocatorPool();
+ public static final MemtablePool MEMORY_POOL = createMemtableAllocatorPool();
+
+ private static MemtablePool createMemtableAllocatorPool()
+ {
+ long heapLimit = DatabaseDescriptor.getMemtableHeapSpaceInMb() << 20;
+ long offHeapLimit = DatabaseDescriptor.getMemtableOffheapSpaceInMb() << 20;
+ switch (DatabaseDescriptor.getMemtableAllocationType())
+ {
+ case unslabbed_heap_buffers:
+ return new HeapPool(heapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily());
+ case heap_buffers:
+ return new SlabPool(heapLimit, 0, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily());
+ case offheap_buffers:
+ if (!FileUtils.isCleanerAvailable)
+ {
+ throw new IllegalStateException("Could not free direct byte buffer: offheap_buffers is not a safe memtable_allocation_type without this ability, please adjust your config. This feature is only guaranteed to work on an Oracle JVM. Refusing to start.");
+ }
+ return new SlabPool(heapLimit, offHeapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily());
+ case offheap_objects:
+ return new NativePool(heapLimit, offHeapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily());
+ default:
+ throw new AssertionError();
+ }
+ }
+
private static final int ROW_OVERHEAD_HEAP_SIZE = estimateRowOverhead(Integer.parseInt(System.getProperty("cassandra.memtable_row_overhead_computation_step", "100000")));
private final MemtableAllocator allocator;
@@ -416,7 +445,7 @@ public class Memtable implements Comparable<Memtable>
+ liveDataSize.get()) // data
* 1.2); // bloom filter and row index overhead
- this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
+ this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SchemaConstants.SYSTEM_KEYSPACE_NAME);
if (flushLocation == null)
writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getWriteableLocationAsFile(estimatedSize)), columnsCollector.get(), statsCollector.get());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 5f01733..63ea89d 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -452,7 +452,7 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold();
private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold();
- private final boolean respectTombstoneThresholds = !Schema.isSystemKeyspace(ReadCommand.this.metadata().ksName);
+ private final boolean respectTombstoneThresholds = !SchemaConstants.isSystemKeyspace(ReadCommand.this.metadata().ksName);
private int liveRows = 0;
private int tombstones = 0;