You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2016/08/18 01:45:32 UTC

[5/6] cassandra git commit: Let DatabaseDescriptor not implicitly startup services

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 6f71817..f9d0498 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -35,30 +35,31 @@ import com.google.common.primitives.Longs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.auth.*;
+import org.apache.cassandra.auth.AuthConfig;
+import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.auth.IAuthorizer;
+import org.apache.cassandra.auth.IInternodeAuthenticator;
+import org.apache.cassandra.auth.IRoleManager;
 import org.apache.cassandra.config.Config.CommitLogSync;
 import org.apache.cassandra.config.Config.RequestSchedulerId;
-import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
-import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.util.DiskOptimizationStrategy;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.SpinningDiskOptimizationStrategy;
 import org.apache.cassandra.io.util.SsdDiskOptimizationStrategy;
-import org.apache.cassandra.locator.*;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.locator.DynamicEndpointSnitch;
+import org.apache.cassandra.locator.EndpointSnitchInfo;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.SeedProvider;
 import org.apache.cassandra.scheduler.IRequestScheduler;
 import org.apache.cassandra.scheduler.NoScheduler;
 import org.apache.cassandra.security.EncryptionContext;
-import org.apache.cassandra.service.CacheService;
-import org.apache.cassandra.thrift.ThriftServer;
+import org.apache.cassandra.service.CacheService.CacheType;
+import org.apache.cassandra.thrift.ThriftServer.ThriftServerType;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.memory.*;
+
 import org.apache.commons.lang3.StringUtils;
 
 public class DatabaseDescriptor
@@ -71,6 +72,8 @@ public class DatabaseDescriptor
      */
     private static final int MAX_NUM_TOKENS = 1536;
 
+    private static Config conf;
+
     private static IEndpointSnitch snitch;
     private static InetAddress listenAddress; // leave null so we can fall through to getLocalHost
     private static InetAddress broadcastAddress;
@@ -85,12 +88,8 @@ public class DatabaseDescriptor
 
     private static Config.DiskAccessMode indexAccessMode;
 
-    private static Config conf;
-
-    private static SSTableFormat.Type sstable_format = SSTableFormat.Type.BIG;
-
-    private static IAuthenticator authenticator = new AllowAllAuthenticator();
-    private static IAuthorizer authorizer = new AllowAllAuthorizer();
+    private static IAuthenticator authenticator;
+    private static IAuthorizer authorizer;
     // Don't initialize the role manager until applying config. The options supported by CassandraRoleManager
     // depend on the configured IAuthenticator, so defer creating it until that's been set.
     private static IRoleManager roleManager;
@@ -113,36 +112,63 @@ public class DatabaseDescriptor
 
     private static DiskOptimizationStrategy diskOptimizationStrategy;
 
-    public static void forceStaticInitialization() {}
-    static
+    private static boolean clientInitialized;
+    private static boolean toolInitialized;
+    private static boolean daemonInitialized;
+
+    public static void daemonInitialization() throws ConfigurationException
     {
-        // In client mode, we use a default configuration. Note that the fields of this class will be
-        // left unconfigured however (the partitioner or localDC will be null for instance) so this
-        // should be used with care.
-        try
-        {
-            if (Config.isClientMode())
-            {
-                conf = new Config();
-            }
-            else
-            {
-                applyConfig(loadConfig());
-            }
-            switch (conf.disk_optimization_strategy)
-            {
-                case ssd:
-                    diskOptimizationStrategy = new SsdDiskOptimizationStrategy(conf.disk_optimization_page_cross_chance);
-                    break;
-                case spinning:
-                    diskOptimizationStrategy = new SpinningDiskOptimizationStrategy();
-                    break;
-            }
-        }
-        catch (Exception e)
-        {
-            throw new ExceptionInInitializerError(e);
-        }
+        assert !toolInitialized;
+        assert !clientInitialized;
+
+        // Some unit tests require this :(
+        if (daemonInitialized)
+            return;
+        daemonInitialized = true;
+
+        setConfig(loadConfig());
+        applyAll();
+        AuthConfig.applyAuthz();
+    }
+
+    public static void toolInitialization()
+    {
+        assert !daemonInitialized;
+        assert !clientInitialized;
+
+        if (toolInitialized)
+            return;
+        toolInitialized = true;
+
+        Config.setToolsMode(true);
+
+        setConfig(loadConfig());
+
+        applySimpleConfig();
+
+        applyPartitioner();
+
+        applySnitch();
+
+        applyEncryptionContext();
+    }
+
+    public static void clientInitialization()
+    {
+        assert !daemonInitialized;
+        assert !toolInitialized;
+
+        if (clientInitialized)
+            return;
+        clientInitialized = true;
+
+        Config.setClientMode(true);
+        conf = new Config();
+    }
+
+    public static Config getRawConfig()
+    {
+        return conf;
     }
 
     @VisibleForTesting
@@ -193,105 +219,34 @@ public class DatabaseDescriptor
         }
     }
 
-    @VisibleForTesting
-    public static void applyAddressConfig(Config config) throws ConfigurationException
+    private static void setConfig(Config config)
     {
-        listenAddress = null;
-        rpcAddress = null;
-        broadcastAddress = null;
-        broadcastRpcAddress = null;
+        conf = config;
+    }
 
-        /* Local IP, hostname or interface to bind services to */
-        if (config.listen_address != null && config.listen_interface != null)
-        {
-            throw new ConfigurationException("Set listen_address OR listen_interface, not both", false);
-        }
-        else if (config.listen_address != null)
-        {
-            try
-            {
-                listenAddress = InetAddress.getByName(config.listen_address);
-            }
-            catch (UnknownHostException e)
-            {
-                throw new ConfigurationException("Unknown listen_address '" + config.listen_address + "'", false);
-            }
+    private static void applyAll() throws ConfigurationException
+    {
+        applySimpleConfig();
 
-            if (listenAddress.isAnyLocalAddress())
-                throw new ConfigurationException("listen_address cannot be a wildcard address (" + config.listen_address + ")!", false);
-        }
-        else if (config.listen_interface != null)
-        {
-            listenAddress = getNetworkInterfaceAddress(config.listen_interface, "listen_interface", config.listen_interface_prefer_ipv6);
-        }
+        applyPartitioner();
 
-        /* Gossip Address to broadcast */
-        if (config.broadcast_address != null)
-        {
-            try
-            {
-                broadcastAddress = InetAddress.getByName(config.broadcast_address);
-            }
-            catch (UnknownHostException e)
-            {
-                throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + "'", false);
-            }
+        applyAddressConfig();
 
-            if (broadcastAddress.isAnyLocalAddress())
-                throw new ConfigurationException("broadcast_address cannot be a wildcard address (" + config.broadcast_address + ")!", false);
-        }
+        applyThriftHSHA();
 
-        /* Local IP, hostname or interface to bind RPC server to */
-        if (config.rpc_address != null && config.rpc_interface != null)
-        {
-            throw new ConfigurationException("Set rpc_address OR rpc_interface, not both", false);
-        }
-        else if (config.rpc_address != null)
-        {
-            try
-            {
-                rpcAddress = InetAddress.getByName(config.rpc_address);
-            }
-            catch (UnknownHostException e)
-            {
-                throw new ConfigurationException("Unknown host in rpc_address " + config.rpc_address, false);
-            }
-        }
-        else if (config.rpc_interface != null)
-        {
-            rpcAddress = getNetworkInterfaceAddress(config.rpc_interface, "rpc_interface", config.rpc_interface_prefer_ipv6);
-        }
-        else
-        {
-            rpcAddress = FBUtilities.getLocalAddress();
-        }
+        applySnitch();
 
-        /* RPC address to broadcast */
-        if (config.broadcast_rpc_address != null)
-        {
-            try
-            {
-                broadcastRpcAddress = InetAddress.getByName(config.broadcast_rpc_address);
-            }
-            catch (UnknownHostException e)
-            {
-                throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false);
-            }
+        applyRequestScheduler();
 
-            if (broadcastRpcAddress.isAnyLocalAddress())
-                throw new ConfigurationException("broadcast_rpc_address cannot be a wildcard address (" + config.broadcast_rpc_address + ")!", false);
-        }
-        else
-        {
-            if (rpcAddress.isAnyLocalAddress())
-                throw new ConfigurationException("If rpc_address is set to a wildcard address (" + config.rpc_address + "), then " +
-                                                 "you must set broadcast_rpc_address to a value other than " + config.rpc_address, false);
-        }
+        applyInitialTokens();
+
+        applySeedProvider();
+
+        applyEncryptionContext();
     }
 
-    public static void applyConfig(Config config) throws ConfigurationException
+    private static void applySimpleConfig()
     {
-        conf = config;
 
         if (conf.commitlog_sync == null)
         {
@@ -342,67 +297,6 @@ public class DatabaseDescriptor
             logger.info("DiskAccessMode is {}, indexAccessMode is {}", conf.disk_access_mode, indexAccessMode);
         }
 
-        /* Authentication, authorization and role management backend, implementing IAuthenticator, IAuthorizer & IRoleMapper*/
-        if (conf.authenticator != null)
-            authenticator = FBUtilities.newAuthenticator(conf.authenticator);
-
-        // the configuration options regarding credentials caching are only guaranteed to
-        // work with PasswordAuthenticator, so log a message if some other authenticator
-        // is in use and non-default values are detected
-        if (!(authenticator instanceof PasswordAuthenticator)
-            && (conf.credentials_update_interval_in_ms != -1
-                || conf.credentials_validity_in_ms != 2000
-                || conf.credentials_cache_max_entries != 1000))
-        {
-            logger.info("Configuration options credentials_update_interval_in_ms, credentials_validity_in_ms and " +
-                        "credentials_cache_max_entries may not be applicable for the configured authenticator ({})",
-                        authenticator.getClass().getName());
-        }
-
-        if (conf.authorizer != null)
-            authorizer = FBUtilities.newAuthorizer(conf.authorizer);
-
-        if (!authenticator.requireAuthentication() && authorizer.requireAuthorization())
-            throw new ConfigurationException(conf.authenticator + " can't be used with " +  conf.authorizer, false);
-
-        if (conf.role_manager != null)
-            roleManager = FBUtilities.newRoleManager(conf.role_manager);
-        else
-            roleManager = new CassandraRoleManager();
-
-        if (authenticator instanceof PasswordAuthenticator && !(roleManager instanceof CassandraRoleManager))
-            throw new ConfigurationException("CassandraRoleManager must be used with PasswordAuthenticator", false);
-
-        if (conf.internode_authenticator != null)
-            internodeAuthenticator = FBUtilities.construct(conf.internode_authenticator, "internode_authenticator");
-        else
-            internodeAuthenticator = new AllowAllInternodeAuthenticator();
-
-        authenticator.validateConfiguration();
-        authorizer.validateConfiguration();
-        roleManager.validateConfiguration();
-        internodeAuthenticator.validateConfiguration();
-
-        /* Hashing strategy */
-        if (conf.partitioner == null)
-        {
-            throw new ConfigurationException("Missing directive: partitioner", false);
-        }
-        try
-        {
-            partitioner = FBUtilities.newPartitioner(System.getProperty("cassandra.partitioner", conf.partitioner));
-        }
-        catch (Exception e)
-        {
-            throw new ConfigurationException("Invalid partitioner class " + conf.partitioner, false);
-        }
-        paritionerName = partitioner.getClass().getCanonicalName();
-
-        if (config.gc_log_threshold_in_ms < 0)
-        {
-            throw new ConfigurationException("gc_log_threshold_in_ms must be a positive integer");
-        }
-
         if (conf.gc_warn_threshold_in_ms < 0)
         {
             throw new ConfigurationException("gc_warn_threshold_in_ms must be a positive integer");
@@ -454,83 +348,12 @@ public class DatabaseDescriptor
         else
             logger.info("Global memtable off-heap threshold is enabled at {}MB", conf.memtable_offheap_space_in_mb);
 
-        applyAddressConfig(config);
-
         if (conf.thrift_framed_transport_size_in_mb <= 0)
             throw new ConfigurationException("thrift_framed_transport_size_in_mb must be positive, but was " + conf.thrift_framed_transport_size_in_mb, false);
 
         if (conf.native_transport_max_frame_size_in_mb <= 0)
             throw new ConfigurationException("native_transport_max_frame_size_in_mb must be positive, but was " + conf.native_transport_max_frame_size_in_mb, false);
 
-        // fail early instead of OOMing (see CASSANDRA-8116)
-        if (ThriftServer.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads == Integer.MAX_VALUE)
-            throw new ConfigurationException("The hsha rpc_server_type is not compatible with an rpc_max_threads " +
-                                             "setting of 'unlimited'.  Please see the comments in cassandra.yaml " +
-                                             "for rpc_server_type and rpc_max_threads.",
-                                             false);
-        if (ThriftServer.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads > (FBUtilities.getAvailableProcessors() * 2 + 1024))
-            logger.warn("rpc_max_threads setting of {} may be too high for the hsha server and cause unnecessary thread contention, reducing performance", conf.rpc_max_threads);
-
-        /* end point snitch */
-        if (conf.endpoint_snitch == null)
-        {
-            throw new ConfigurationException("Missing endpoint_snitch directive", false);
-        }
-        snitch = createEndpointSnitch(conf.dynamic_snitch, conf.endpoint_snitch);
-        EndpointSnitchInfo.create();
-
-        localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
-        localComparator = new Comparator<InetAddress>()
-        {
-            public int compare(InetAddress endpoint1, InetAddress endpoint2)
-            {
-                boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
-                boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
-                if (local1 && !local2)
-                    return -1;
-                if (local2 && !local1)
-                    return 1;
-                return 0;
-            }
-        };
-
-        /* Request Scheduler setup */
-        requestSchedulerOptions = conf.request_scheduler_options;
-        if (conf.request_scheduler != null)
-        {
-            try
-            {
-                if (requestSchedulerOptions == null)
-                {
-                    requestSchedulerOptions = new RequestSchedulerOptions();
-                }
-                Class<?> cls = Class.forName(conf.request_scheduler);
-                requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
-            }
-            catch (ClassNotFoundException e)
-            {
-                throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler, false);
-            }
-            catch (Exception e)
-            {
-                throw new ConfigurationException("Unable to instantiate request scheduler", e);
-            }
-        }
-        else
-        {
-            requestScheduler = new NoScheduler();
-        }
-
-        if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
-        {
-            requestSchedulerId = conf.request_scheduler_id;
-        }
-        else
-        {
-            // Default to Keyspace
-            requestSchedulerId = RequestSchedulerId.keyspace;
-        }
-
         // if data dirs, commitlog dir, or saved caches dir are set in cassandra.yaml, use that.  Otherwise,
         // use -Dcassandra.storagedir (set in cassandra-env.sh) as the parent dir for data/, commitlog/, and saved_caches/
         if (conf.commitlog_directory == null)
@@ -700,97 +523,242 @@ public class DatabaseDescriptor
         else if (conf.num_tokens > MAX_NUM_TOKENS)
             throw new ConfigurationException(String.format("A maximum number of %d tokens per node is supported", MAX_NUM_TOKENS), false);
 
-        if (conf.initial_token != null)
+        try
         {
-            Collection<String> tokens = tokensFromString(conf.initial_token);
-            if (tokens.size() != conf.num_tokens)
-                throw new ConfigurationException("The number of initial tokens (by initial_token) specified is different from num_tokens value", false);
+            // if prepared_statements_cache_size_mb option was set to "auto" then size of the cache should be "max(1/256 of Heap (in MB), 10MB)"
+            preparedStatementsCacheSizeInMB = (conf.prepared_statements_cache_size_mb == null)
+                                              ? Math.max(10, (int) (Runtime.getRuntime().maxMemory() / 1024 / 1024 / 256))
+                                              : conf.prepared_statements_cache_size_mb;
 
-            for (String token : tokens)
-                partitioner.getTokenFactory().validate(token);
+            if (preparedStatementsCacheSizeInMB <= 0)
+                throw new NumberFormatException(); // to escape duplicating error message
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException("prepared_statements_cache_size_mb option was set incorrectly to '"
+                                             + conf.prepared_statements_cache_size_mb + "', supported values are <integer> >= 0.", false);
+        }
+
+        try
+        {
+            // if thrift_prepared_statements_cache_size_mb option was set to "auto" then size of the cache should be "max(1/256 of Heap (in MB), 10MB)"
+            thriftPreparedStatementsCacheSizeInMB = (conf.thrift_prepared_statements_cache_size_mb == null)
+                                                    ? Math.max(10, (int) (Runtime.getRuntime().maxMemory() / 1024 / 1024 / 256))
+                                                    : conf.thrift_prepared_statements_cache_size_mb;
+
+            if (thriftPreparedStatementsCacheSizeInMB <= 0)
+                throw new NumberFormatException(); // to escape duplicating error message
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException("thrift_prepared_statements_cache_size_mb option was set incorrectly to '"
+                                             + conf.thrift_prepared_statements_cache_size_mb + "', supported values are <integer> >= 0.", false);
         }
 
+        try
+        {
+            // if key_cache_size_in_mb option was set to "auto" then size of the cache should be "min(5% of Heap (in MB), 100MB)
+            keyCacheSizeInMB = (conf.key_cache_size_in_mb == null)
+                               ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)), 100)
+                               : conf.key_cache_size_in_mb;
+
+            if (keyCacheSizeInMB < 0)
+                throw new NumberFormatException(); // to escape duplicating error message
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException("key_cache_size_in_mb option was set incorrectly to '"
+                                             + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
+        }
 
         try
         {
-            // if prepared_statements_cache_size_mb option was set to "auto" then size of the cache should be "max(1/256 of Heap (in MB), 10MB)"
-            preparedStatementsCacheSizeInMB = (conf.prepared_statements_cache_size_mb == null)
-                                              ? Math.max(10, (int) (Runtime.getRuntime().maxMemory() / 1024 / 1024 / 256))
-                                              : conf.prepared_statements_cache_size_mb;
+            // if counter_cache_size_in_mb option was set to "auto" then size of the cache should be "min(2.5% of Heap (in MB), 50MB)
+            counterCacheSizeInMB = (conf.counter_cache_size_in_mb == null)
+                                   ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.025 / 1024 / 1024)), 50)
+                                   : conf.counter_cache_size_in_mb;
+
+            if (counterCacheSizeInMB < 0)
+                throw new NumberFormatException(); // to escape duplicating error message
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException("counter_cache_size_in_mb option was set incorrectly to '"
+                                             + conf.counter_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
+        }
+
+        // if set to empty/"auto" then use 5% of Heap size
+        indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null)
+                                   ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024))
+                                   : conf.index_summary_capacity_in_mb;
+
+        if (indexSummaryCapacityInMB < 0)
+            throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '"
+                                             + conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.", false);
+
+        if(conf.encryption_options != null)
+        {
+            logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
+            //operate under the assumption that server_encryption_options is not set in yaml rather than both
+            conf.server_encryption_options = conf.encryption_options;
+        }
+
+        if (conf.user_defined_function_fail_timeout < 0)
+            throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false);
+        if (conf.user_defined_function_warn_timeout < 0)
+            throw new ConfigurationException("user_defined_function_warn_timeout must not be negative", false);
+
+        if (conf.user_defined_function_fail_timeout < conf.user_defined_function_warn_timeout)
+            throw new ConfigurationException("user_defined_function_warn_timeout must less than user_defined_function_fail_timeout", false);
+
+        if (conf.max_mutation_size_in_kb == null)
+            conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2;
+        else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb)
+            throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false);
+
+        // native transport encryption options
+        if (conf.native_transport_port_ssl != null
+            && conf.native_transport_port_ssl.intValue() != conf.native_transport_port.intValue()
+            && !conf.client_encryption_options.enabled)
+        {
+            throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false);
+        }
+
+        if (conf.max_value_size_in_mb == null || conf.max_value_size_in_mb <= 0)
+            throw new ConfigurationException("max_value_size_in_mb must be positive", false);
+
+        switch (conf.disk_optimization_strategy)
+        {
+            case ssd:
+                diskOptimizationStrategy = new SsdDiskOptimizationStrategy(conf.disk_optimization_page_cross_chance);
+                break;
+            case spinning:
+                diskOptimizationStrategy = new SpinningDiskOptimizationStrategy();
+                break;
+        }
+    }
+
+    public static void applyAddressConfig() throws ConfigurationException
+    {
+        applyAddressConfig(conf);
+    }
+
+    public static void applyAddressConfig(Config config) throws ConfigurationException
+    {
+        listenAddress = null;
+        rpcAddress = null;
+        broadcastAddress = null;
+        broadcastRpcAddress = null;
+
+        /* Local IP, hostname or interface to bind services to */
+        if (config.listen_address != null && config.listen_interface != null)
+        {
+            throw new ConfigurationException("Set listen_address OR listen_interface, not both", false);
+        }
+        else if (config.listen_address != null)
+        {
+            try
+            {
+                listenAddress = InetAddress.getByName(config.listen_address);
+            }
+            catch (UnknownHostException e)
+            {
+                throw new ConfigurationException("Unknown listen_address '" + config.listen_address + "'", false);
+            }
 
-            if (preparedStatementsCacheSizeInMB <= 0)
-                throw new NumberFormatException(); // to escape duplicating error message
+            if (listenAddress.isAnyLocalAddress())
+                throw new ConfigurationException("listen_address cannot be a wildcard address (" + config.listen_address + ")!", false);
         }
-        catch (NumberFormatException e)
+        else if (config.listen_interface != null)
         {
-            throw new ConfigurationException("prepared_statements_cache_size_mb option was set incorrectly to '"
-                                             + conf.prepared_statements_cache_size_mb + "', supported values are <integer> >= 0.", false);
+            listenAddress = getNetworkInterfaceAddress(config.listen_interface, "listen_interface", config.listen_interface_prefer_ipv6);
         }
 
-        try
+        /* Gossip Address to broadcast */
+        if (config.broadcast_address != null)
         {
-            // if thrift_prepared_statements_cache_size_mb option was set to "auto" then size of the cache should be "max(1/256 of Heap (in MB), 10MB)"
-            thriftPreparedStatementsCacheSizeInMB = (conf.thrift_prepared_statements_cache_size_mb == null)
-                                                    ? Math.max(10, (int) (Runtime.getRuntime().maxMemory() / 1024 / 1024 / 256))
-                                                    : conf.thrift_prepared_statements_cache_size_mb;
+            try
+            {
+                broadcastAddress = InetAddress.getByName(config.broadcast_address);
+            }
+            catch (UnknownHostException e)
+            {
+                throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + "'", false);
+            }
 
-            if (thriftPreparedStatementsCacheSizeInMB <= 0)
-                throw new NumberFormatException(); // to escape duplicating error message
+            if (broadcastAddress.isAnyLocalAddress())
+                throw new ConfigurationException("broadcast_address cannot be a wildcard address (" + config.broadcast_address + ")!", false);
         }
-        catch (NumberFormatException e)
+
+        /* Local IP, hostname or interface to bind RPC server to */
+        if (config.rpc_address != null && config.rpc_interface != null)
         {
-            throw new ConfigurationException("thrift_prepared_statements_cache_size_mb option was set incorrectly to '"
-                                             + conf.thrift_prepared_statements_cache_size_mb + "', supported values are <integer> >= 0.", false);
+            throw new ConfigurationException("Set rpc_address OR rpc_interface, not both", false);
         }
-
-        try
+        else if (config.rpc_address != null)
         {
-            // if key_cache_size_in_mb option was set to "auto" then size of the cache should be "min(5% of Heap (in MB), 100MB)
-            keyCacheSizeInMB = (conf.key_cache_size_in_mb == null)
-                ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)), 100)
-                : conf.key_cache_size_in_mb;
-
-            if (keyCacheSizeInMB < 0)
-                throw new NumberFormatException(); // to escape duplicating error message
+            try
+            {
+                rpcAddress = InetAddress.getByName(config.rpc_address);
+            }
+            catch (UnknownHostException e)
+            {
+                throw new ConfigurationException("Unknown host in rpc_address " + config.rpc_address, false);
+            }
         }
-        catch (NumberFormatException e)
+        else if (config.rpc_interface != null)
         {
-            throw new ConfigurationException("key_cache_size_in_mb option was set incorrectly to '"
-                    + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
+            rpcAddress = getNetworkInterfaceAddress(config.rpc_interface, "rpc_interface", config.rpc_interface_prefer_ipv6);
+        }
+        else
+        {
+            rpcAddress = FBUtilities.getLocalAddress();
         }
 
-        try
+        /* RPC address to broadcast */
+        if (config.broadcast_rpc_address != null)
         {
-            // if counter_cache_size_in_mb option was set to "auto" then size of the cache should be "min(2.5% of Heap (in MB), 50MB)
-            counterCacheSizeInMB = (conf.counter_cache_size_in_mb == null)
-                    ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.025 / 1024 / 1024)), 50)
-                    : conf.counter_cache_size_in_mb;
+            try
+            {
+                broadcastRpcAddress = InetAddress.getByName(config.broadcast_rpc_address);
+            }
+            catch (UnknownHostException e)
+            {
+                throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false);
+            }
 
-            if (counterCacheSizeInMB < 0)
-                throw new NumberFormatException(); // to escape duplicating error message
+            if (broadcastRpcAddress.isAnyLocalAddress())
+                throw new ConfigurationException("broadcast_rpc_address cannot be a wildcard address (" + config.broadcast_rpc_address + ")!", false);
         }
-        catch (NumberFormatException e)
+        else
         {
-            throw new ConfigurationException("counter_cache_size_in_mb option was set incorrectly to '"
-                    + conf.counter_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
+            if (rpcAddress.isAnyLocalAddress())
+                throw new ConfigurationException("If rpc_address is set to a wildcard address (" + config.rpc_address + "), then " +
+                                                 "you must set broadcast_rpc_address to a value other than " + config.rpc_address, false);
         }
+    }
 
-        // if set to empty/"auto" then use 5% of Heap size
-        indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null)
-            ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024))
-            : conf.index_summary_capacity_in_mb;
-
-        if (indexSummaryCapacityInMB < 0)
-            throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '"
-                    + conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.", false);
+    public static void applyThriftHSHA()
+    {
+        // fail early instead of OOMing (see CASSANDRA-8116)
+        if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads == Integer.MAX_VALUE)
+            throw new ConfigurationException("The hsha rpc_server_type is not compatible with an rpc_max_threads " +
+                                             "setting of 'unlimited'.  Please see the comments in cassandra.yaml " +
+                                             "for rpc_server_type and rpc_max_threads.",
+                                             false);
+        if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads > (FBUtilities.getAvailableProcessors() * 2 + 1024))
+            logger.warn("rpc_max_threads setting of {} may be too high for the hsha server and cause unnecessary thread contention, reducing performance", conf.rpc_max_threads);
+    }
 
-        if(conf.encryption_options != null)
-        {
-            logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
-            //operate under the assumption that server_encryption_options is not set in yaml rather than both
-            conf.server_encryption_options = conf.encryption_options;
-        }
+    public static void applyEncryptionContext()
+    {
+        // always attempt to load the cipher factory, as we could be in the situation where the user has disabled encryption,
+        // but has existing commitlogs and sstables on disk that are still encrypted (and still need to be read)
+        encryptionContext = new EncryptionContext(conf.transparent_data_encryption_options);
+    }
 
+    public static void applySeedProvider()
+    {
         // load the seeds for node contact points
         if (conf.seed_provider == null)
         {
@@ -808,34 +776,107 @@ public class DatabaseDescriptor
         }
         if (seedProvider.getSeeds().size() == 0)
             throw new ConfigurationException("The seed provider lists no seeds.", false);
+    }
 
-        if (conf.user_defined_function_fail_timeout < 0)
-            throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false);
-        if (conf.user_defined_function_warn_timeout < 0)
-            throw new ConfigurationException("user_defined_function_warn_timeout must not be negative", false);
+    public static void applyInitialTokens()
+    {
+        if (conf.initial_token != null)
+        {
+            Collection<String> tokens = tokensFromString(conf.initial_token);
+            if (tokens.size() != conf.num_tokens)
+                throw new ConfigurationException("The number of initial tokens (by initial_token) specified is different from num_tokens value", false);
 
-        if (conf.user_defined_function_fail_timeout < conf.user_defined_function_warn_timeout)
-            throw new ConfigurationException("user_defined_function_warn_timeout must less than user_defined_function_fail_timeout", false);
+            for (String token : tokens)
+                partitioner.getTokenFactory().validate(token);
+        }
+    }
 
-        // always attempt to load the cipher factory, as we could be in the situation where the user has disabled encryption,
-        // but has existing commitlogs and sstables on disk that are still encrypted (and still need to be read)
-        encryptionContext = new EncryptionContext(config.transparent_data_encryption_options);
+    // Maybe safe for clients + tools
+    public static void applyRequestScheduler()
+    {
+        /* Request Scheduler setup */
+        requestSchedulerOptions = conf.request_scheduler_options;
+        if (conf.request_scheduler != null)
+        {
+            try
+            {
+                if (requestSchedulerOptions == null)
+                {
+                    requestSchedulerOptions = new RequestSchedulerOptions();
+                }
+                Class<?> cls = Class.forName(conf.request_scheduler);
+                requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
+            }
+            catch (ClassNotFoundException e)
+            {
+                throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler, false);
+            }
+            catch (Exception e)
+            {
+                throw new ConfigurationException("Unable to instantiate request scheduler", e);
+            }
+        }
+        else
+        {
+            requestScheduler = new NoScheduler();
+        }
 
-        if (conf.max_mutation_size_in_kb == null)
-            conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2;
-        else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb)
-            throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false);
+        if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
+        {
+            requestSchedulerId = conf.request_scheduler_id;
+        }
+        else
+        {
+            // Default to Keyspace
+            requestSchedulerId = RequestSchedulerId.keyspace;
+        }
+    }
 
-        // native transport encryption options
-        if (conf.native_transport_port_ssl != null
-            && conf.native_transport_port_ssl.intValue() != conf.native_transport_port.intValue()
-            && !conf.client_encryption_options.enabled)
+    // definitely not safe for tools + clients - implicitly instantiates StorageService
+    public static void applySnitch()
+    {
+        /* end point snitch */
+        if (conf.endpoint_snitch == null)
         {
-            throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false);
+            throw new ConfigurationException("Missing endpoint_snitch directive", false);
         }
+        snitch = createEndpointSnitch(conf.dynamic_snitch, conf.endpoint_snitch);
+        EndpointSnitchInfo.create();
 
-        if (conf.max_value_size_in_mb == null || conf.max_value_size_in_mb <= 0)
-            throw new ConfigurationException("max_value_size_in_mb must be positive", false);
+        localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
+        localComparator = new Comparator<InetAddress>()
+        {
+            public int compare(InetAddress endpoint1, InetAddress endpoint2)
+            {
+                boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
+                boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
+                if (local1 && !local2)
+                    return -1;
+                if (local2 && !local1)
+                    return 1;
+                return 0;
+            }
+        };
+    }
+
+    // definitely not safe for tools + clients - implicitly instantiates schema
+    public static void applyPartitioner()
+    {
+        /* Hashing strategy */
+        if (conf.partitioner == null)
+        {
+            throw new ConfigurationException("Missing directive: partitioner", false);
+        }
+        try
+        {
+            partitioner = FBUtilities.newPartitioner(System.getProperty("cassandra.partitioner", conf.partitioner));
+        }
+        catch (Exception e)
+        {
+            throw new ConfigurationException("Invalid partitioner class " + conf.partitioner, false);
+        }
+
+        paritionerName = partitioner.getClass().getCanonicalName();
     }
 
     private static FileStore guessFileStore(String dir) throws IOException
@@ -870,16 +911,31 @@ public class DatabaseDescriptor
         return authenticator;
     }
 
+    public static void setAuthenticator(IAuthenticator authenticator)
+    {
+        DatabaseDescriptor.authenticator = authenticator;
+    }
+
     public static IAuthorizer getAuthorizer()
     {
         return authorizer;
     }
 
+    public static void setAuthorizer(IAuthorizer authorizer)
+    {
+        DatabaseDescriptor.authorizer = authorizer;
+    }
+
     public static IRoleManager getRoleManager()
     {
         return roleManager;
     }
 
+    public static void setRoleManager(IRoleManager roleManager)
+    {
+        DatabaseDescriptor.roleManager = roleManager;
+    }
+
     public static int getPermissionsValidity()
     {
         return conf.permissions_validity_in_ms;
@@ -1183,16 +1239,6 @@ public class DatabaseDescriptor
         }
     }
 
-    public static boolean isReplacing()
-    {
-        if (System.getProperty("cassandra.replace_address_first_boot", null) != null && SystemKeyspace.bootstrapComplete())
-        {
-            logger.info("Replace address on first boot requested; this node is already bootstrapped");
-            return false;
-        }
-        return getReplaceAddress() != null;
-    }
-
     public static String getClusterName()
     {
         return conf.cluster_name;
@@ -1293,34 +1339,6 @@ public class DatabaseDescriptor
         return conf.cross_node_timeout;
     }
 
-    // not part of the Verb enum so we can change timeouts easily via JMX
-    public static long getTimeout(MessagingService.Verb verb)
-    {
-        switch (verb)
-        {
-            case READ:
-                return getReadRpcTimeout();
-            case RANGE_SLICE:
-            case PAGED_RANGE:
-                return getRangeRpcTimeout();
-            case TRUNCATE:
-                return getTruncateRpcTimeout();
-            case READ_REPAIR:
-            case MUTATION:
-            case PAXOS_COMMIT:
-            case PAXOS_PREPARE:
-            case PAXOS_PROPOSE:
-            case HINT:
-            case BATCH_STORE:
-            case BATCH_REMOVE:
-                return getWriteRpcTimeout();
-            case COUNTER_MUTATION:
-                return getCounterWriteRpcTimeout();
-            default:
-                return getRpcTimeout();
-        }
-    }
-
     public static long getSlowQueryTimeout()
     {
         return conf.slow_query_log_timeout_in_ms;
@@ -1530,6 +1548,11 @@ public class DatabaseDescriptor
         return internodeAuthenticator;
     }
 
+    public static void setInternodeAuthenticator(IInternodeAuthenticator internodeAuthenticator)
+    {
+        DatabaseDescriptor.internodeAuthenticator = internodeAuthenticator;
+    }
+
     public static void setBroadcastAddress(InetAddress broadcastAdd)
     {
         broadcastAddress = broadcastAdd;
@@ -1793,7 +1816,7 @@ public class DatabaseDescriptor
         return new File(conf.hints_directory);
     }
 
-    public static File getSerializedCachePath(CacheService.CacheType cacheType, String version, String extension)
+    public static File getSerializedCachePath(CacheType cacheType, String version, String extension)
     {
         String name = cacheType.toString()
                 + (version == null ? "" : "-" + version + "." + extension);
@@ -1828,12 +1851,12 @@ public class DatabaseDescriptor
         conf.dynamic_snitch_badness_threshold = dynamicBadnessThreshold;
     }
 
-    public static ServerEncryptionOptions getServerEncryptionOptions()
+    public static EncryptionOptions.ServerEncryptionOptions getServerEncryptionOptions()
     {
         return conf.server_encryption_options;
     }
 
-    public static ClientEncryptionOptions getClientEncryptionOptions()
+    public static EncryptionOptions.ClientEncryptionOptions getClientEncryptionOptions()
     {
         return conf.client_encryption_options;
     }
@@ -2060,33 +2083,24 @@ public class DatabaseDescriptor
         return conf.inter_dc_tcp_nodelay;
     }
 
+    public static long getMemtableHeapSpaceInMb()
+    {
+        return conf.memtable_heap_space_in_mb;
+    }
+
+    public static long getMemtableOffheapSpaceInMb()
+    {
+        return conf.memtable_offheap_space_in_mb;
+    }
 
-    public static SSTableFormat.Type getSSTableFormat()
+    public static Config.MemtableAllocationType getMemtableAllocationType()
     {
-        return sstable_format;
+        return conf.memtable_allocation_type;
     }
 
-    public static MemtablePool getMemtableAllocatorPool()
+    public static Float getMemtableCleanupThreshold()
     {
-        long heapLimit = ((long) conf.memtable_heap_space_in_mb) << 20;
-        long offHeapLimit = ((long) conf.memtable_offheap_space_in_mb) << 20;
-        switch (conf.memtable_allocation_type)
-        {
-            case unslabbed_heap_buffers:
-                return new HeapPool(heapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
-            case heap_buffers:
-                return new SlabPool(heapLimit, 0, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
-            case offheap_buffers:
-                if (!FileUtils.isCleanerAvailable)
-                {
-                    throw new IllegalStateException("Could not free direct byte buffer: offheap_buffers is not a safe memtable_allocation_type without this ability, please adjust your config. This feature is only guaranteed to work on an Oracle JVM. Refusing to start.");
-                }
-                return new SlabPool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
-            case offheap_objects:
-                return new NativePool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
-            default:
-                throw new AssertionError();
-        }
+        return conf.memtable_cleanup_threshold;
     }
 
     public static int getIndexSummaryResizeIntervalInMinutes()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index dd42779..eed316b 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -17,18 +17,14 @@
  */
 package org.apache.cassandra.config;
 
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
 import java.util.*;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.auth.AuthKeyspace;
 import org.apache.cassandra.cql3.functions.*;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
@@ -40,10 +36,8 @@ import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.locator.LocalStrategy;
-import org.apache.cassandra.repair.SystemDistributedKeyspace;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.tracing.TraceKeyspace;
 import org.apache.cassandra.utils.ConcurrentBiMap;
 import org.apache.cassandra.utils.Pair;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -54,22 +48,6 @@ public class Schema
 
     public static final Schema instance = new Schema();
 
-    /* system keyspace names (the ones with LocalStrategy replication strategy) */
-    public static final Set<String> SYSTEM_KEYSPACE_NAMES = ImmutableSet.of(SystemKeyspace.NAME, SchemaKeyspace.NAME);
-
-    /* replicate system keyspace names (the ones with a "true" replication strategy) */
-    public static final Set<String> REPLICATED_SYSTEM_KEYSPACE_NAMES = ImmutableSet.of(TraceKeyspace.NAME,
-                                                                                       AuthKeyspace.NAME,
-                                                                                       SystemDistributedKeyspace.NAME);
-
-    /**
-     * longest permissible KS or CF name.  Our main concern is that filename not be more than 255 characters;
-     * the filename will contain both the KS and CF names. Since non-schema-name components only take up
-     * ~64 characters, we could allow longer names than this, but on Windows, the entire path should be not greater than
-     * 255 characters, so a lower limit here helps avoid problems.  See CASSANDRA-4110.
-     */
-    public static final int NAME_LENGTH = 48;
-
     /* metadata map for faster keyspace lookup */
     private final Map<String, KeyspaceMetadata> keyspaces = new NonBlockingHashMap<>();
 
@@ -81,22 +59,6 @@ public class Schema
 
     private volatile UUID version;
 
-    // 59adb24e-f3cd-3e02-97f0-5b395827453f
-    public static final UUID emptyVersion;
-
-
-    static
-    {
-        try
-        {
-            emptyVersion = UUID.nameUUIDFromBytes(MessageDigest.getInstance("MD5").digest());
-        }
-        catch (NoSuchAlgorithmException e)
-        {
-            throw new AssertionError();
-        }
-    }
-
     /**
      * Initialize empty schema object and load the hardcoded system tables
      */
@@ -110,14 +72,6 @@ public class Schema
     }
 
     /**
-     * @return whether or not the keyspace is a really system one (w/ LocalStrategy, unmodifiable, hardcoded)
-     */
-    public static boolean isSystemKeyspace(String keyspaceName)
-    {
-        return SYSTEM_KEYSPACE_NAMES.contains(keyspaceName.toLowerCase());
-    }
-
-    /**
      * load keyspace (keyspace) definitions, but do not initialize the keyspace instances.
      * Schema version may be updated as the result.
      */
@@ -341,7 +295,7 @@ public class Schema
 
     private Set<String> getNonSystemKeyspacesSet()
     {
-        return Sets.difference(keyspaces.keySet(), SYSTEM_KEYSPACE_NAMES);
+        return Sets.difference(keyspaces.keySet(), SchemaConstants.SYSTEM_KEYSPACE_NAMES);
     }
 
     /**
@@ -370,7 +324,7 @@ public class Schema
      */
     public List<String> getUserKeyspaces()
     {
-        return ImmutableList.copyOf(Sets.difference(getNonSystemKeyspacesSet(), REPLICATED_SYSTEM_KEYSPACE_NAMES));
+        return ImmutableList.copyOf(Sets.difference(getNonSystemKeyspacesSet(), SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/config/SchemaConstants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/SchemaConstants.java b/src/java/org/apache/cassandra/config/SchemaConstants.java
new file mode 100644
index 0000000..2416d6b
--- /dev/null
+++ b/src/java/org/apache/cassandra/config/SchemaConstants.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.config;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableSet;
+
+public final class SchemaConstants
+{
+    public static final String SYSTEM_KEYSPACE_NAME = "system";
+    public static final String SCHEMA_KEYSPACE_NAME = "system_schema";
+
+    public static final String TRACE_KEYSPACE_NAME = "system_traces";
+    public static final String AUTH_KEYSPACE_NAME = "system_auth";
+    public static final String DISTRIBUTED_KEYSPACE_NAME = "system_distributed";
+
+    /* system keyspace names (the ones with LocalStrategy replication strategy) */
+    public static final Set<String> SYSTEM_KEYSPACE_NAMES = ImmutableSet.of(SYSTEM_KEYSPACE_NAME, SCHEMA_KEYSPACE_NAME);
+
+    /* replicate system keyspace names (the ones with a "true" replication strategy) */
+    public static final Set<String> REPLICATED_SYSTEM_KEYSPACE_NAMES = ImmutableSet.of(TRACE_KEYSPACE_NAME,
+                                                                                       AUTH_KEYSPACE_NAME,
+                                                                                       DISTRIBUTED_KEYSPACE_NAME);
+    /**
+     * longest permissible KS or CF name.  Our main concern is that filename not be more than 255 characters;
+     * the filename will contain both the KS and CF names. Since non-schema-name components only take up
+     * ~64 characters, we could allow longer names than this, but on Windows, the entire path should be not greater than
+     * 255 characters, so a lower limit here helps avoid problems.  See CASSANDRA-4110.
+     */
+    public static final int NAME_LENGTH = 48;
+
+    // 59adb24e-f3cd-3e02-97f0-5b395827453f
+    public static final UUID emptyVersion;
+
+    static
+    {
+        try
+        {
+            emptyVersion = UUID.nameUUIDFromBytes(MessageDigest.getInstance("MD5").digest());
+        }
+        catch (NoSuchAlgorithmException e)
+        {
+            throw new AssertionError();
+        }
+    }
+
+    /**
+     * @return whether or not the keyspace is a really system one (w/ LocalStrategy, unmodifiable, hardcoded)
+     */
+    public static boolean isSystemKeyspace(String keyspaceName)
+    {
+        return SYSTEM_KEYSPACE_NAMES.contains(keyspaceName.toLowerCase());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
index bd5638a..ca5e41a 100644
--- a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
+++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
@@ -91,11 +91,13 @@ public class YamlConfigurationLoader implements ConfigurationLoader
         return url;
     }
 
-    private static final URL storageConfigURL = getStorageConfigURL();
+    private static URL storageConfigURL;
 
     @Override
     public Config loadConfig() throws ConfigurationException
     {
+        if (storageConfigURL == null)
+            storageConfigURL = getStorageConfigURL();
         return loadConfig(storageConfigURL);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 47462e4..4baa38c 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -35,8 +35,10 @@ import org.slf4j.LoggerFactory;
 import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
 import org.antlr.runtime.*;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.functions.FunctionName;
 import org.apache.cassandra.cql3.statements.*;
@@ -134,7 +136,7 @@ public class QueryProcessor implements QueryHandler
         InternalStateInstance()
         {
             ClientState state = ClientState.forInternalCalls();
-            state.setKeyspace(SystemKeyspace.NAME);
+            state.setKeyspace(SchemaConstants.SYSTEM_KEYSPACE_NAME);
             this.queryState = new QueryState(state);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionName.java b/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
index d732efa..aa980e9 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.cql3.functions;
 
 import com.google.common.base.Objects;
 
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.config.SchemaConstants;
 
 public final class FunctionName
 {
@@ -28,7 +28,7 @@ public final class FunctionName
 
     public static FunctionName nativeFunction(String name)
     {
-        return new FunctionName(SystemKeyspace.NAME, name);
+        return new FunctionName(SchemaConstants.SYSTEM_KEYSPACE_NAME, name);
     }
 
     public FunctionName(String keyspace, String name)
@@ -67,8 +67,8 @@ public final class FunctionName
 
     public final boolean equalsNativeFunction(FunctionName nativeFunction)
     {
-        assert nativeFunction.keyspace.equals(SystemKeyspace.NAME);
-        if (this.hasKeyspace() && !this.keyspace.equals(SystemKeyspace.NAME))
+        assert nativeFunction.keyspace.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+        if (this.hasKeyspace() && !this.keyspace.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME))
             return false;
 
         return Objects.equal(this.name, nativeFunction.name);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
index 5642b0d..591e54a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.locator.LocalStrategy;
 import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -55,7 +56,7 @@ public class AlterKeyspaceStatement extends SchemaAlteringStatement
         KeyspaceMetadata ksm = Schema.instance.getKSMetaData(name);
         if (ksm == null)
             throw new InvalidRequestException("Unknown keyspace " + name);
-        if (Schema.isSystemKeyspace(ksm.name))
+        if (SchemaConstants.isSystemKeyspace(ksm.name))
             throw new InvalidRequestException("Cannot alter system keyspace");
 
         attrs.validate();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
index f88c04f..33d2ce4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
@@ -21,7 +21,7 @@ import java.util.regex.Pattern;
 
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.locator.LocalStrategy;
 import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -79,8 +79,8 @@ public class CreateKeyspaceStatement extends SchemaAlteringStatement
         // keyspace name
         if (!PATTERN_WORD_CHARS.matcher(name).matches())
             throw new InvalidRequestException(String.format("\"%s\" is not a valid keyspace name", name));
-        if (name.length() > Schema.NAME_LENGTH)
-            throw new InvalidRequestException(String.format("Keyspace names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, name));
+        if (name.length() > SchemaConstants.NAME_LENGTH)
+            throw new InvalidRequestException(String.format("Keyspace names shouldn't be more than %s characters long (got \"%s\")", SchemaConstants.NAME_LENGTH, name));
 
         attrs.validate();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index 08c3a4c..90f0cdb 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -206,8 +206,8 @@ public class CreateTableStatement extends SchemaAlteringStatement
             // Column family name
             if (!PATTERN_WORD_CHARS.matcher(columnFamily()).matches())
                 throw new InvalidRequestException(String.format("\"%s\" is not a valid table name (must be alphanumeric character or underscore only: [a-zA-Z_0-9]+)", columnFamily()));
-            if (columnFamily().length() > Schema.NAME_LENGTH)
-                throw new InvalidRequestException(String.format("Table names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, columnFamily()));
+            if (columnFamily().length() > SchemaConstants.NAME_LENGTH)
+                throw new InvalidRequestException(String.format("Table names shouldn't be more than %s characters long (got \"%s\")", SchemaConstants.NAME_LENGTH, columnFamily()));
 
             for (Multiset.Entry<ColumnIdentifier> entry : definedNames.entrySet())
                 if (entry.getCount() > 1)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
index 58f8e9c..b8f2f92 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
@@ -21,6 +21,7 @@ import java.util.*;
 
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -31,7 +32,7 @@ import org.apache.cassandra.transport.messages.ResultMessage;
 
 public class ListPermissionsStatement extends AuthorizationStatement
 {
-    private static final String KS = AuthKeyspace.NAME;
+    private static final String KS = SchemaConstants.AUTH_KEYSPACE_NAME;
     private static final String CF = "permissions"; // virtual cf to use for now.
 
     private static final List<ColumnSpecification> metadata;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
index 477aedc..3fee57a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
 
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.marshal.BooleanType;
 import org.apache.cassandra.db.marshal.MapType;
@@ -37,7 +38,7 @@ import org.apache.cassandra.transport.messages.ResultMessage;
 public class ListRolesStatement extends AuthorizationStatement
 {
     // pseudo-virtual cf as the actual datasource is dependent on the IRoleManager impl
-    private static final String KS = AuthKeyspace.NAME;
+    private static final String KS = SchemaConstants.AUTH_KEYSPACE_NAME;
     private static final String CF = AuthKeyspace.ROLES;
 
     private static final MapType optionsType = MapType.getInstance(UTF8Type.instance, UTF8Type.instance, false);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java
index 7251980..0101363 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
 
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.ResultSet;
@@ -33,7 +34,7 @@ import org.apache.cassandra.transport.messages.ResultMessage;
 public class ListUsersStatement extends ListRolesStatement
 {
     // pseudo-virtual cf as the actual datasource is dependent on the IRoleManager impl
-    private static final String KS = AuthKeyspace.NAME;
+    private static final String KS = SchemaConstants.AUTH_KEYSPACE_NAME;
     private static final String CF = "users";
 
     private static final List<ColumnSpecification> metadata =

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java b/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java
index b22e400..3ae6bd8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java
@@ -21,8 +21,8 @@ import java.util.Set;
 
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.cql3.RoleName;
-import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
@@ -54,7 +54,7 @@ public abstract class PermissionsManagementStatement extends AuthorizationStatem
 
         // altering permissions on builtin functions is not supported
         if (resource instanceof FunctionResource
-            && SystemKeyspace.NAME.equals(((FunctionResource)resource).getKeyspace()))
+            && SchemaConstants.SYSTEM_KEYSPACE_NAME.equals(((FunctionResource)resource).getKeyspace()))
         {
             throw new InvalidRequestException("Altering permissions on builtin functions is not supported");
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 06dff0f..14d6440 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -817,7 +817,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public String getSSTablePath(File directory)
     {
-        return getSSTablePath(directory, DatabaseDescriptor.getSSTableFormat().info.getLatestVersion(), DatabaseDescriptor.getSSTableFormat());
+        return getSSTablePath(directory, SSTableFormat.Type.current().info.getLatestVersion(), SSTableFormat.Type.current());
     }
 
     public String getSSTablePath(File directory, SSTableFormat.Type format)
@@ -1775,7 +1775,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 }
 
                 writeSnapshotManifest(filesJSONArr, snapshotName);
-                if (!Schema.SYSTEM_KEYSPACE_NAMES.contains(metadata.ksName) && !Schema.REPLICATED_SYSTEM_KEYSPACE_NAMES.contains(metadata.ksName))
+                if (!SchemaConstants.SYSTEM_KEYSPACE_NAMES.contains(metadata.ksName) && !SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES.contains(metadata.ksName))
                     writeSnapshotSchema(snapshotName);
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 0d78245..741058f 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -100,7 +100,7 @@ public class Keyspace
 
     public static Keyspace open(String keyspaceName)
     {
-        assert initialized || Schema.isSystemKeyspace(keyspaceName);
+        assert initialized || SchemaConstants.isSystemKeyspace(keyspaceName);
         return open(keyspaceName, Schema.instance, true);
     }
 
@@ -685,7 +685,7 @@ public class Keyspace
 
     public static Iterable<Keyspace> system()
     {
-        return Iterables.transform(Schema.SYSTEM_KEYSPACE_NAMES, keyspaceTransformer);
+        return Iterables.transform(SchemaConstants.SYSTEM_KEYSPACE_NAMES, keyspaceTransformer);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 9e7e9b6..4fdf28c 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.*;
 
+import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.utils.AbstractIterator;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
@@ -1025,7 +1026,7 @@ public abstract class LegacyLayout
                 // then simply ignore the cell is fine. But also not that we ignore if it's the
                 // system keyspace because for those table we actually remove columns without registering
                 // them in the dropped columns
-                assert metadata.ksName.equals(SystemKeyspace.NAME) || metadata.getDroppedColumnDefinition(e.columnName) != null : e.getMessage();
+                assert metadata.ksName.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME) || metadata.getDroppedColumnDefinition(e.columnName) != null : e.getMessage();
             }
         }
     }
@@ -1105,7 +1106,7 @@ public abstract class LegacyLayout
                     // then simply ignore the cell is fine. But also not that we ignore if it's the
                     // system keyspace because for those table we actually remove columns without registering
                     // them in the dropped columns
-                    if (metadata.ksName.equals(SystemKeyspace.NAME) || metadata.getDroppedColumnDefinition(e.columnName) != null)
+                    if (metadata.ksName.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME) || metadata.getDroppedColumnDefinition(e.columnName) != null)
                         return computeNext();
                     else
                         throw new IOError(e);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index e9cca4a..f8b258c 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.commitlog.IntervalSet;
@@ -48,20 +49,48 @@ import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.HeapPool;
 import org.apache.cassandra.utils.memory.MemtableAllocator;
 import org.apache.cassandra.utils.memory.MemtablePool;
+import org.apache.cassandra.utils.memory.NativePool;
+import org.apache.cassandra.utils.memory.SlabPool;
 
 public class Memtable implements Comparable<Memtable>
 {
     private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
 
-    public static final MemtablePool MEMORY_POOL = DatabaseDescriptor.getMemtableAllocatorPool();
+    public static final MemtablePool MEMORY_POOL = createMemtableAllocatorPool();
+
+    private static MemtablePool createMemtableAllocatorPool()
+    {
+        long heapLimit = DatabaseDescriptor.getMemtableHeapSpaceInMb() << 20;
+        long offHeapLimit = DatabaseDescriptor.getMemtableOffheapSpaceInMb() << 20;
+        switch (DatabaseDescriptor.getMemtableAllocationType())
+        {
+            case unslabbed_heap_buffers:
+                return new HeapPool(heapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily());
+            case heap_buffers:
+                return new SlabPool(heapLimit, 0, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily());
+            case offheap_buffers:
+                if (!FileUtils.isCleanerAvailable)
+                {
+                    throw new IllegalStateException("Could not free direct byte buffer: offheap_buffers is not a safe memtable_allocation_type without this ability, please adjust your config. This feature is only guaranteed to work on an Oracle JVM. Refusing to start.");
+                }
+                return new SlabPool(heapLimit, offHeapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily());
+            case offheap_objects:
+                return new NativePool(heapLimit, offHeapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily());
+            default:
+                throw new AssertionError();
+        }
+    }
+
     private static final int ROW_OVERHEAD_HEAP_SIZE = estimateRowOverhead(Integer.parseInt(System.getProperty("cassandra.memtable_row_overhead_computation_step", "100000")));
 
     private final MemtableAllocator allocator;
@@ -416,7 +445,7 @@ public class Memtable implements Comparable<Memtable>
                                     + liveDataSize.get()) // data
                                     * 1.2); // bloom filter and row index overhead
 
-            this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
+            this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SchemaConstants.SYSTEM_KEYSPACE_NAME);
 
             if (flushLocation == null)
                 writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getWriteableLocationAsFile(estimatedSize)), columnsCollector.get(), statsCollector.get());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 5f01733..63ea89d 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -452,7 +452,7 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
             private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold();
             private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold();
 
-            private final boolean respectTombstoneThresholds = !Schema.isSystemKeyspace(ReadCommand.this.metadata().ksName);
+            private final boolean respectTombstoneThresholds = !SchemaConstants.isSystemKeyspace(ReadCommand.this.metadata().ksName);
 
             private int liveRows = 0;
             private int tombstones = 0;