You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC
svn commit: r799331 [3/29] - in /incubator/cassandra/trunk: ./
src/java/org/apache/cassandra/concurrent/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/
src/j...
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Thu Jul 30 15:30:21 2009
@@ -1,905 +1,905 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.config;
-
-import java.util.*;
-import java.io.*;
-import java.lang.reflect.InvocationTargetException;
-
-import javax.xml.transform.TransformerException;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.locator.IEndPointSnitch;
-import org.apache.cassandra.utils.FileUtils;
-import org.apache.cassandra.utils.XMLUtils;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class DatabaseDescriptor
-{
- private static Logger logger_ = Logger.getLogger(DatabaseDescriptor.class);
-
- public static final String random_ = "RANDOM";
- public static final String ophf_ = "OPHF";
- private static int storagePort_ = 7000;
- private static int controlPort_ = 7001;
- private static int thriftPort_ = 9160;
- private static String listenAddress_; // leave null so we can fall through to getLocalHost
- private static String thriftAddress_;
- private static String clusterName_ = "Test";
- private static int replicationFactor_ = 3;
- private static long rpcTimeoutInMillis_ = 2000;
- private static Set<String> seeds_ = new HashSet<String>();
- /* Keeps the list of data file directories */
- private static String[] dataFileDirectories_;
- /* Current index into the above list of directories */
- private static int currentIndex_ = 0;
- private static String logFileDirectory_;
- private static String bootstrapFileDirectory_;
- private static int consistencyThreads_ = 4; // not configurable
- private static int concurrentReaders_ = 8;
- private static int concurrentWriters_ = 32;
- private static List<String> tables_ = new ArrayList<String>();
- private static Set<String> applicationColumnFamilies_ = new HashSet<String>();
-
- // Default descriptive names for use in CQL. The user can override
- // these choices in the config file. These are not case sensitive.
- // Hence, these are stored in UPPER case for easy comparison.
- private static String d_rowKey_ = "ROW_KEY";
- private static String d_superColumnMap_ = "SUPER_COLUMN_MAP";
- private static String d_superColumnKey_ = "SUPER_COLUMN_KEY";
- private static String d_columnMap_ = "COLUMN_MAP";
- private static String d_columnKey_ = "COLUMN_KEY";
- private static String d_columnValue_ = "COLUMN_VALUE";
- private static String d_columnTimestamp_ = "COLUMN_TIMESTAMP";
-
- private static Map<String, Double> tableKeysCachedFractions_;
- /*
- * A map from table names to the set of column families for the table and the
- * corresponding meta data for that column family.
- */
- private static Map<String, Map<String, CFMetaData>> tableToCFMetaDataMap_;
- /* Hashing strategy Random or OPHF */
- private static IPartitioner partitioner_;
-
- private static IEndPointSnitch endPointSnitch_;
-
- private static Class replicaPlacementStrategyClass_;
-
- /* if the size of columns or super-columns are more than this, indexing will kick in */
- private static int columnIndexSizeInKB_;
- /* Number of hours to keep a memtable in memory */
- private static int memtableLifetime_ = 6;
- /* Size of the memtable in memory before it is dumped */
- private static int memtableSize_ = 128;
- /* Number of objects in millions in the memtable before it is dumped */
- private static double memtableObjectCount_ = 1;
- /*
- * This parameter enables or disables consistency checks.
- * If set to false the read repairs are disable for very
- * high throughput on reads but at the cost of consistency.
- */
- private static boolean doConsistencyCheck_ = true;
- /* Callout directories */
- private static String calloutLocation_;
- /* Job Jar Location */
- private static String jobJarFileLocation_;
- /* Address where to run the job tracker */
- private static String jobTrackerHost_;
- /* time to wait before garbage collecting tombstones (deletion markers) */
- private static int gcGraceInSeconds_ = 10 * 24 * 3600; // 10 days
-
- // the path qualified config file (storage-conf.xml) name
- private static String configFileName_;
- /* initial token in the ring */
- private static String initialToken_ = null;
-
- private static boolean commitLogSync_;
-
- private static int commitLogSyncDelay_;
-
- static
- {
- try
- {
- configFileName_ = System.getProperty("storage-config") + File.separator + "storage-conf.xml";
- if (logger_.isDebugEnabled())
- logger_.debug("Loading settings from " + configFileName_);
- XMLUtils xmlUtils = new XMLUtils(configFileName_);
-
- /* Cluster Name */
- clusterName_ = xmlUtils.getNodeValue("/Storage/ClusterName");
-
- String syncRaw = xmlUtils.getNodeValue("/Storage/CommitLogSync");
- if (!"false".equals(syncRaw) && !"true".equals(syncRaw))
- {
- // Bool.valueOf will silently assume false for values it doesn't recognize
- throw new ConfigurationException("Unrecognized value for CommitLogSync. Use 'true' or 'false'.");
- }
- commitLogSync_ = Boolean.valueOf(xmlUtils.getNodeValue("/Storage/CommitLogSync"));
-
- commitLogSyncDelay_ = Integer.valueOf(xmlUtils.getNodeValue("/Storage/CommitLogSyncDelay"));
-
- /* Hashing strategy */
- String partitionerClassName = xmlUtils.getNodeValue("/Storage/Partitioner");
- if (partitionerClassName == null)
- {
- throw new ConfigurationException("Missing partitioner directive /Storage/Partitioner");
- }
- try
- {
- Class cls = Class.forName(partitionerClassName);
- partitioner_ = (IPartitioner) cls.getConstructor().newInstance();
- }
- catch (ClassNotFoundException e)
- {
- throw new ConfigurationException("Invalid partitioner class " + partitionerClassName);
- }
-
- /* end point snitch */
- String endPointSnitchClassName = xmlUtils.getNodeValue("/Storage/EndPointSnitch");
- if (endPointSnitchClassName == null)
- {
- throw new ConfigurationException("Missing endpointsnitch directive /Storage/EndPointSnitch");
- }
- try
- {
- Class cls = Class.forName(endPointSnitchClassName);
- endPointSnitch_ = (IEndPointSnitch) cls.getConstructor().newInstance();
- }
- catch (ClassNotFoundException e)
- {
- throw new ConfigurationException("Invalid endpointsnitch class " + endPointSnitchClassName);
- }
-
- /* Callout location */
- calloutLocation_ = xmlUtils.getNodeValue("/Storage/CalloutLocation");
-
- /* JobTracker address */
- jobTrackerHost_ = xmlUtils.getNodeValue("/Storage/JobTrackerHost");
-
- /* Job Jar file location */
- jobJarFileLocation_ = xmlUtils.getNodeValue("/Storage/JobJarFileLocation");
-
- String gcGrace = xmlUtils.getNodeValue("/Storage/GCGraceSeconds");
- if ( gcGrace != null )
- gcGraceInSeconds_ = Integer.parseInt(gcGrace);
-
- initialToken_ = xmlUtils.getNodeValue("/Storage/InitialToken");
-
- /* Data replication factor */
- String replicationFactor = xmlUtils.getNodeValue("/Storage/ReplicationFactor");
- if ( replicationFactor != null )
- replicationFactor_ = Integer.parseInt(replicationFactor);
-
- /* RPC Timeout */
- String rpcTimeoutInMillis = xmlUtils.getNodeValue("/Storage/RpcTimeoutInMillis");
- if ( rpcTimeoutInMillis != null )
- rpcTimeoutInMillis_ = Integer.parseInt(rpcTimeoutInMillis);
-
- /* Thread per pool */
- String rawReaders = xmlUtils.getNodeValue("/Storage/ConcurrentReads");
- if (rawReaders != null)
- {
- concurrentReaders_ = Integer.parseInt(rawReaders);
- }
- String rawWriters = xmlUtils.getNodeValue("/Storage/ConcurrentWrites");
- if (rawWriters != null)
- {
- concurrentWriters_ = Integer.parseInt(rawWriters);
- }
-
- /* TCP port on which the storage system listens */
- String port = xmlUtils.getNodeValue("/Storage/StoragePort");
- if ( port != null )
- storagePort_ = Integer.parseInt(port);
-
- /* Local IP or hostname to bind services to */
- String listenAddress = xmlUtils.getNodeValue("/Storage/ListenAddress");
- if ( listenAddress != null)
- listenAddress_ = listenAddress;
-
- /* Local IP or hostname to bind thrift server to */
- String thriftAddress = xmlUtils.getNodeValue("/Storage/ThriftAddress");
- if ( thriftAddress != null )
- thriftAddress_ = thriftAddress;
-
- /* UDP port for control messages */
- port = xmlUtils.getNodeValue("/Storage/ControlPort");
- if ( port != null )
- controlPort_ = Integer.parseInt(port);
-
- /* get the thrift port from conf file */
- port = xmlUtils.getNodeValue("/Storage/ThriftPort");
- if (port != null)
- thriftPort_ = Integer.parseInt(port);
-
- /* Number of days to keep the memtable around w/o flushing */
- String lifetime = xmlUtils.getNodeValue("/Storage/MemtableLifetimeInDays");
- if ( lifetime != null )
- memtableLifetime_ = Integer.parseInt(lifetime);
-
- /* Size of the memtable in memory in MB before it is dumped */
- String memtableSize = xmlUtils.getNodeValue("/Storage/MemtableSizeInMB");
- if ( memtableSize != null )
- memtableSize_ = Integer.parseInt(memtableSize);
- /* Number of objects in millions in the memtable before it is dumped */
- String memtableObjectCount = xmlUtils.getNodeValue("/Storage/MemtableObjectCountInMillions");
- if ( memtableObjectCount != null )
- memtableObjectCount_ = Double.parseDouble(memtableObjectCount);
- if (memtableObjectCount_ <= 0)
- {
- throw new ConfigurationException("Memtable object count must be a positive double");
- }
-
- /* This parameter enables or disables consistency checks.
- * If set to false the read repairs are disable for very
- * high throughput on reads but at the cost of consistency.*/
- String doConsistencyCheck = xmlUtils.getNodeValue("/Storage/DoConsistencyChecksBoolean");
- if ( doConsistencyCheck != null )
- doConsistencyCheck_ = Boolean.parseBoolean(doConsistencyCheck);
-
- /* read the size at which we should do column indexes */
- String columnIndexSizeInKB = xmlUtils.getNodeValue("/Storage/ColumnIndexSizeInKB");
- if(columnIndexSizeInKB == null)
- {
- columnIndexSizeInKB_ = 64;
- }
- else
- {
- columnIndexSizeInKB_ = Integer.parseInt(columnIndexSizeInKB);
- }
-
- /* data file directory */
- dataFileDirectories_ = xmlUtils.getNodeValues("/Storage/DataFileDirectories/DataFileDirectory");
- if (dataFileDirectories_.length == 0)
- {
- throw new ConfigurationException("At least one DataFileDirectory must be specified");
- }
- for ( String dataFileDirectory : dataFileDirectories_ )
- FileUtils.createDirectory(dataFileDirectory);
-
- /* bootstrap file directory */
- bootstrapFileDirectory_ = xmlUtils.getNodeValue("/Storage/BootstrapFileDirectory");
- if (bootstrapFileDirectory_ == null)
- {
- throw new ConfigurationException("BootstrapFileDirectory must be specified");
- }
- FileUtils.createDirectory(bootstrapFileDirectory_);
-
- /* commit log directory */
- logFileDirectory_ = xmlUtils.getNodeValue("/Storage/CommitLogDirectory");
- if (logFileDirectory_ == null)
- {
- throw new ConfigurationException("CommitLogDirectory must be specified");
- }
- FileUtils.createDirectory(logFileDirectory_);
-
- /* threshold after which commit log should be rotated. */
- String value = xmlUtils.getNodeValue("/Storage/CommitLogRotationThresholdInMB");
- if ( value != null)
- CommitLog.setSegmentSize(Integer.parseInt(value) * 1024 * 1024);
-
- tableToCFMetaDataMap_ = new HashMap<String, Map<String, CFMetaData>>();
- tableKeysCachedFractions_ = new HashMap<String, Double>();
-
- /* See which replica placement strategy to use */
- String replicaPlacementStrategyClassName = xmlUtils.getNodeValue("/Storage/ReplicaPlacementStrategy");
- if (replicaPlacementStrategyClassName == null)
- {
- throw new ConfigurationException("Missing replicaplacementstrategy directive /Storage/ReplicaPlacementStrategy");
- }
- try
- {
- replicaPlacementStrategyClass_ = Class.forName(replicaPlacementStrategyClassName);
- }
- catch (ClassNotFoundException e)
- {
- throw new ConfigurationException("Invalid replicaplacementstrategy class " + replicaPlacementStrategyClassName);
- }
-
- /* Read the table related stuff from config */
- NodeList tables = xmlUtils.getRequestedNodeList("/Storage/Keyspaces/Keyspace");
- int size = tables.getLength();
- for ( int i = 0; i < size; ++i )
- {
- Node table = tables.item(i);
-
- /* parsing out the table name */
- String tName = XMLUtils.getAttributeValue(table, "Name");
- if (tName == null)
- {
- throw new ConfigurationException("Table name attribute is required");
- }
- if (tName.equalsIgnoreCase(Table.SYSTEM_TABLE))
- {
- throw new ConfigurationException("'system' is a reserved table name for Cassandra internals");
- }
- tables_.add(tName);
- tableToCFMetaDataMap_.put(tName, new HashMap<String, CFMetaData>());
-
- String xqlCacheSize = "/Storage/Keyspaces/Keyspace[@Name='" + tName + "']/KeysCachedFraction";
- value = xmlUtils.getNodeValue(xqlCacheSize);
- if (value == null)
- {
- tableKeysCachedFractions_.put(tName, 0.01);
- }
- else
- {
- tableKeysCachedFractions_.put(tName, Double.valueOf(value));
- }
-
- String xqlTable = "/Storage/Keyspaces/Keyspace[@Name='" + tName + "']/";
- NodeList columnFamilies = xmlUtils.getRequestedNodeList(xqlTable + "ColumnFamily");
-
- // get name of the rowKey for this table
- String n_rowKey = xmlUtils.getNodeValue(xqlTable + "RowKey");
- if (n_rowKey == null)
- n_rowKey = d_rowKey_;
-
- //NodeList columnFamilies = xmlUtils.getRequestedNodeList(table, "ColumnFamily");
- int size2 = columnFamilies.getLength();
-
- for ( int j = 0; j < size2; ++j )
- {
- Node columnFamily = columnFamilies.item(j);
- String cfName = XMLUtils.getAttributeValue(columnFamily, "Name");
- if (cfName == null)
- {
- throw new ConfigurationException("ColumnFamily name attribute is required");
- }
- String xqlCF = xqlTable + "ColumnFamily[@Name='" + cfName + "']/";
-
- /* squirrel away the application column families */
- applicationColumnFamilies_.add(cfName);
-
- // Parse out the column type
- String rawColumnType = XMLUtils.getAttributeValue(columnFamily, "ColumnType");
- String columnType = ColumnFamily.getColumnType(rawColumnType);
- if (columnType == null)
- {
- throw new ConfigurationException("ColumnFamily " + cfName + " has invalid type " + rawColumnType);
- }
-
- if (XMLUtils.getAttributeValue(columnFamily, "ColumnSort") != null)
- {
- throw new ConfigurationException("ColumnSort is no longer an accepted attribute. Use CompareWith instead.");
- }
-
- // Parse out the column comparator
- AbstractType columnComparator = getComparator(columnFamily, "CompareWith");
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.config;
+
+import java.util.*;
+import java.io.*;
+import java.lang.reflect.InvocationTargetException;
+
+import javax.xml.transform.TransformerException;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.locator.IEndPointSnitch;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.utils.XMLUtils;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class DatabaseDescriptor
+{
+ private static Logger logger_ = Logger.getLogger(DatabaseDescriptor.class);
+
+ public static final String random_ = "RANDOM";
+ public static final String ophf_ = "OPHF";
+ private static int storagePort_ = 7000;
+ private static int controlPort_ = 7001;
+ private static int thriftPort_ = 9160;
+ private static String listenAddress_; // leave null so we can fall through to getLocalHost
+ private static String thriftAddress_;
+ private static String clusterName_ = "Test";
+ private static int replicationFactor_ = 3;
+ private static long rpcTimeoutInMillis_ = 2000;
+ private static Set<String> seeds_ = new HashSet<String>();
+ /* Keeps the list of data file directories */
+ private static String[] dataFileDirectories_;
+ /* Current index into the above list of directories */
+ private static int currentIndex_ = 0;
+ private static String logFileDirectory_;
+ private static String bootstrapFileDirectory_;
+ private static int consistencyThreads_ = 4; // not configurable
+ private static int concurrentReaders_ = 8;
+ private static int concurrentWriters_ = 32;
+ private static List<String> tables_ = new ArrayList<String>();
+ private static Set<String> applicationColumnFamilies_ = new HashSet<String>();
+
+ // Default descriptive names for use in CQL. The user can override
+ // these choices in the config file. These are not case sensitive.
+ // Hence, these are stored in UPPER case for easy comparison.
+ private static String d_rowKey_ = "ROW_KEY";
+ private static String d_superColumnMap_ = "SUPER_COLUMN_MAP";
+ private static String d_superColumnKey_ = "SUPER_COLUMN_KEY";
+ private static String d_columnMap_ = "COLUMN_MAP";
+ private static String d_columnKey_ = "COLUMN_KEY";
+ private static String d_columnValue_ = "COLUMN_VALUE";
+ private static String d_columnTimestamp_ = "COLUMN_TIMESTAMP";
+
+ private static Map<String, Double> tableKeysCachedFractions_;
+ /*
+ * A map from table names to the set of column families for the table and the
+ * corresponding meta data for that column family.
+ */
+ private static Map<String, Map<String, CFMetaData>> tableToCFMetaDataMap_;
+ /* Hashing strategy Random or OPHF */
+ private static IPartitioner partitioner_;
+
+ private static IEndPointSnitch endPointSnitch_;
+
+ private static Class replicaPlacementStrategyClass_;
+
+ /* if the size of columns or super-columns are more than this, indexing will kick in */
+ private static int columnIndexSizeInKB_;
+ /* Number of hours to keep a memtable in memory */
+ private static int memtableLifetime_ = 6;
+ /* Size of the memtable in memory before it is dumped */
+ private static int memtableSize_ = 128;
+ /* Number of objects in millions in the memtable before it is dumped */
+ private static double memtableObjectCount_ = 1;
+ /*
+ * This parameter enables or disables consistency checks.
+ * If set to false the read repairs are disable for very
+ * high throughput on reads but at the cost of consistency.
+ */
+ private static boolean doConsistencyCheck_ = true;
+ /* Callout directories */
+ private static String calloutLocation_;
+ /* Job Jar Location */
+ private static String jobJarFileLocation_;
+ /* Address where to run the job tracker */
+ private static String jobTrackerHost_;
+ /* time to wait before garbage collecting tombstones (deletion markers) */
+ private static int gcGraceInSeconds_ = 10 * 24 * 3600; // 10 days
+
+ // the path qualified config file (storage-conf.xml) name
+ private static String configFileName_;
+ /* initial token in the ring */
+ private static String initialToken_ = null;
+
+ private static boolean commitLogSync_;
+
+ private static int commitLogSyncDelay_;
+
+ static
+ {
+ try
+ {
+ configFileName_ = System.getProperty("storage-config") + File.separator + "storage-conf.xml";
+ if (logger_.isDebugEnabled())
+ logger_.debug("Loading settings from " + configFileName_);
+ XMLUtils xmlUtils = new XMLUtils(configFileName_);
+
+ /* Cluster Name */
+ clusterName_ = xmlUtils.getNodeValue("/Storage/ClusterName");
+
+ String syncRaw = xmlUtils.getNodeValue("/Storage/CommitLogSync");
+ if (!"false".equals(syncRaw) && !"true".equals(syncRaw))
+ {
+ // Bool.valueOf will silently assume false for values it doesn't recognize
+ throw new ConfigurationException("Unrecognized value for CommitLogSync. Use 'true' or 'false'.");
+ }
+ commitLogSync_ = Boolean.valueOf(xmlUtils.getNodeValue("/Storage/CommitLogSync"));
+
+ commitLogSyncDelay_ = Integer.valueOf(xmlUtils.getNodeValue("/Storage/CommitLogSyncDelay"));
+
+ /* Hashing strategy */
+ String partitionerClassName = xmlUtils.getNodeValue("/Storage/Partitioner");
+ if (partitionerClassName == null)
+ {
+ throw new ConfigurationException("Missing partitioner directive /Storage/Partitioner");
+ }
+ try
+ {
+ Class cls = Class.forName(partitionerClassName);
+ partitioner_ = (IPartitioner) cls.getConstructor().newInstance();
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new ConfigurationException("Invalid partitioner class " + partitionerClassName);
+ }
+
+ /* end point snitch */
+ String endPointSnitchClassName = xmlUtils.getNodeValue("/Storage/EndPointSnitch");
+ if (endPointSnitchClassName == null)
+ {
+ throw new ConfigurationException("Missing endpointsnitch directive /Storage/EndPointSnitch");
+ }
+ try
+ {
+ Class cls = Class.forName(endPointSnitchClassName);
+ endPointSnitch_ = (IEndPointSnitch) cls.getConstructor().newInstance();
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new ConfigurationException("Invalid endpointsnitch class " + endPointSnitchClassName);
+ }
+
+ /* Callout location */
+ calloutLocation_ = xmlUtils.getNodeValue("/Storage/CalloutLocation");
+
+ /* JobTracker address */
+ jobTrackerHost_ = xmlUtils.getNodeValue("/Storage/JobTrackerHost");
+
+ /* Job Jar file location */
+ jobJarFileLocation_ = xmlUtils.getNodeValue("/Storage/JobJarFileLocation");
+
+ String gcGrace = xmlUtils.getNodeValue("/Storage/GCGraceSeconds");
+ if ( gcGrace != null )
+ gcGraceInSeconds_ = Integer.parseInt(gcGrace);
+
+ initialToken_ = xmlUtils.getNodeValue("/Storage/InitialToken");
+
+ /* Data replication factor */
+ String replicationFactor = xmlUtils.getNodeValue("/Storage/ReplicationFactor");
+ if ( replicationFactor != null )
+ replicationFactor_ = Integer.parseInt(replicationFactor);
+
+ /* RPC Timeout */
+ String rpcTimeoutInMillis = xmlUtils.getNodeValue("/Storage/RpcTimeoutInMillis");
+ if ( rpcTimeoutInMillis != null )
+ rpcTimeoutInMillis_ = Integer.parseInt(rpcTimeoutInMillis);
+
+ /* Thread per pool */
+ String rawReaders = xmlUtils.getNodeValue("/Storage/ConcurrentReads");
+ if (rawReaders != null)
+ {
+ concurrentReaders_ = Integer.parseInt(rawReaders);
+ }
+ String rawWriters = xmlUtils.getNodeValue("/Storage/ConcurrentWrites");
+ if (rawWriters != null)
+ {
+ concurrentWriters_ = Integer.parseInt(rawWriters);
+ }
+
+ /* TCP port on which the storage system listens */
+ String port = xmlUtils.getNodeValue("/Storage/StoragePort");
+ if ( port != null )
+ storagePort_ = Integer.parseInt(port);
+
+ /* Local IP or hostname to bind services to */
+ String listenAddress = xmlUtils.getNodeValue("/Storage/ListenAddress");
+ if ( listenAddress != null)
+ listenAddress_ = listenAddress;
+
+ /* Local IP or hostname to bind thrift server to */
+ String thriftAddress = xmlUtils.getNodeValue("/Storage/ThriftAddress");
+ if ( thriftAddress != null )
+ thriftAddress_ = thriftAddress;
+
+ /* UDP port for control messages */
+ port = xmlUtils.getNodeValue("/Storage/ControlPort");
+ if ( port != null )
+ controlPort_ = Integer.parseInt(port);
+
+ /* get the thrift port from conf file */
+ port = xmlUtils.getNodeValue("/Storage/ThriftPort");
+ if (port != null)
+ thriftPort_ = Integer.parseInt(port);
+
+ /* Number of days to keep the memtable around w/o flushing */
+ String lifetime = xmlUtils.getNodeValue("/Storage/MemtableLifetimeInDays");
+ if ( lifetime != null )
+ memtableLifetime_ = Integer.parseInt(lifetime);
+
+ /* Size of the memtable in memory in MB before it is dumped */
+ String memtableSize = xmlUtils.getNodeValue("/Storage/MemtableSizeInMB");
+ if ( memtableSize != null )
+ memtableSize_ = Integer.parseInt(memtableSize);
+ /* Number of objects in millions in the memtable before it is dumped */
+ String memtableObjectCount = xmlUtils.getNodeValue("/Storage/MemtableObjectCountInMillions");
+ if ( memtableObjectCount != null )
+ memtableObjectCount_ = Double.parseDouble(memtableObjectCount);
+ if (memtableObjectCount_ <= 0)
+ {
+ throw new ConfigurationException("Memtable object count must be a positive double");
+ }
+
+ /* This parameter enables or disables consistency checks.
+ * If set to false the read repairs are disable for very
+ * high throughput on reads but at the cost of consistency.*/
+ String doConsistencyCheck = xmlUtils.getNodeValue("/Storage/DoConsistencyChecksBoolean");
+ if ( doConsistencyCheck != null )
+ doConsistencyCheck_ = Boolean.parseBoolean(doConsistencyCheck);
+
+ /* read the size at which we should do column indexes */
+ String columnIndexSizeInKB = xmlUtils.getNodeValue("/Storage/ColumnIndexSizeInKB");
+ if(columnIndexSizeInKB == null)
+ {
+ columnIndexSizeInKB_ = 64;
+ }
+ else
+ {
+ columnIndexSizeInKB_ = Integer.parseInt(columnIndexSizeInKB);
+ }
+
+ /* data file directory */
+ dataFileDirectories_ = xmlUtils.getNodeValues("/Storage/DataFileDirectories/DataFileDirectory");
+ if (dataFileDirectories_.length == 0)
+ {
+ throw new ConfigurationException("At least one DataFileDirectory must be specified");
+ }
+ for ( String dataFileDirectory : dataFileDirectories_ )
+ FileUtils.createDirectory(dataFileDirectory);
+
+ /* bootstrap file directory */
+ bootstrapFileDirectory_ = xmlUtils.getNodeValue("/Storage/BootstrapFileDirectory");
+ if (bootstrapFileDirectory_ == null)
+ {
+ throw new ConfigurationException("BootstrapFileDirectory must be specified");
+ }
+ FileUtils.createDirectory(bootstrapFileDirectory_);
+
+ /* commit log directory */
+ logFileDirectory_ = xmlUtils.getNodeValue("/Storage/CommitLogDirectory");
+ if (logFileDirectory_ == null)
+ {
+ throw new ConfigurationException("CommitLogDirectory must be specified");
+ }
+ FileUtils.createDirectory(logFileDirectory_);
+
+ /* threshold after which commit log should be rotated. */
+ String value = xmlUtils.getNodeValue("/Storage/CommitLogRotationThresholdInMB");
+ if ( value != null)
+ CommitLog.setSegmentSize(Integer.parseInt(value) * 1024 * 1024);
+
+ tableToCFMetaDataMap_ = new HashMap<String, Map<String, CFMetaData>>();
+ tableKeysCachedFractions_ = new HashMap<String, Double>();
+
+ /* See which replica placement strategy to use */
+ String replicaPlacementStrategyClassName = xmlUtils.getNodeValue("/Storage/ReplicaPlacementStrategy");
+ if (replicaPlacementStrategyClassName == null)
+ {
+ throw new ConfigurationException("Missing replicaplacementstrategy directive /Storage/ReplicaPlacementStrategy");
+ }
+ try
+ {
+ replicaPlacementStrategyClass_ = Class.forName(replicaPlacementStrategyClassName);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new ConfigurationException("Invalid replicaplacementstrategy class " + replicaPlacementStrategyClassName);
+ }
+
+ /* Read the table related stuff from config */
+ NodeList tables = xmlUtils.getRequestedNodeList("/Storage/Keyspaces/Keyspace");
+ int size = tables.getLength();
+ for ( int i = 0; i < size; ++i )
+ {
+ Node table = tables.item(i);
+
+ /* parsing out the table name */
+ String tName = XMLUtils.getAttributeValue(table, "Name");
+ if (tName == null)
+ {
+ throw new ConfigurationException("Table name attribute is required");
+ }
+ if (tName.equalsIgnoreCase(Table.SYSTEM_TABLE))
+ {
+ throw new ConfigurationException("'system' is a reserved table name for Cassandra internals");
+ }
+ tables_.add(tName);
+ tableToCFMetaDataMap_.put(tName, new HashMap<String, CFMetaData>());
+
+ String xqlCacheSize = "/Storage/Keyspaces/Keyspace[@Name='" + tName + "']/KeysCachedFraction";
+ value = xmlUtils.getNodeValue(xqlCacheSize);
+ if (value == null)
+ {
+ tableKeysCachedFractions_.put(tName, 0.01);
+ }
+ else
+ {
+ tableKeysCachedFractions_.put(tName, Double.valueOf(value));
+ }
+
+ String xqlTable = "/Storage/Keyspaces/Keyspace[@Name='" + tName + "']/";
+ NodeList columnFamilies = xmlUtils.getRequestedNodeList(xqlTable + "ColumnFamily");
+
+ // get name of the rowKey for this table
+ String n_rowKey = xmlUtils.getNodeValue(xqlTable + "RowKey");
+ if (n_rowKey == null)
+ n_rowKey = d_rowKey_;
+
+ //NodeList columnFamilies = xmlUtils.getRequestedNodeList(table, "ColumnFamily");
+ int size2 = columnFamilies.getLength();
+
+ for ( int j = 0; j < size2; ++j )
+ {
+ Node columnFamily = columnFamilies.item(j);
+ String cfName = XMLUtils.getAttributeValue(columnFamily, "Name");
+ if (cfName == null)
+ {
+ throw new ConfigurationException("ColumnFamily name attribute is required");
+ }
+ String xqlCF = xqlTable + "ColumnFamily[@Name='" + cfName + "']/";
+
+ /* squirrel away the application column families */
+ applicationColumnFamilies_.add(cfName);
+
+ // Parse out the column type
+ String rawColumnType = XMLUtils.getAttributeValue(columnFamily, "ColumnType");
+ String columnType = ColumnFamily.getColumnType(rawColumnType);
+ if (columnType == null)
+ {
+ throw new ConfigurationException("ColumnFamily " + cfName + " has invalid type " + rawColumnType);
+ }
+
+ if (XMLUtils.getAttributeValue(columnFamily, "ColumnSort") != null)
+ {
+ throw new ConfigurationException("ColumnSort is no longer an accepted attribute. Use CompareWith instead.");
+ }
+
+ // Parse out the column comparator
+ AbstractType columnComparator = getComparator(columnFamily, "CompareWith");
AbstractType subcolumnComparator = null;
- if (columnType.equals("Super"))
- {
- subcolumnComparator = getComparator(columnFamily, "CompareSubcolumnsWith");
- }
- else if (XMLUtils.getAttributeValue(columnFamily, "CompareSubcolumnsWith") != null)
- {
- throw new ConfigurationException("CompareSubcolumnsWith is only a valid attribute on super columnfamilies (not regular columnfamily " + cfName + ")");
- }
-
- // see if flush period is set
- String flushPeriodInMinutes = XMLUtils.getAttributeValue(columnFamily, "FlushPeriodInMinutes");
- int flushPeriod=0;
- if ( flushPeriodInMinutes != null )
- flushPeriod = Integer.parseInt(flushPeriodInMinutes);
-
-
- // Parse out user-specified logical names for the various dimensions
- // of a the column family from the config.
- String n_superColumnMap = xmlUtils.getNodeValue(xqlCF + "SuperColumnMap");
- if (n_superColumnMap == null)
- n_superColumnMap = d_superColumnMap_;
-
- String n_superColumnKey = xmlUtils.getNodeValue(xqlCF + "SuperColumnKey");
- if (n_superColumnKey == null)
- n_superColumnKey = d_superColumnKey_;
-
- String n_columnMap = xmlUtils.getNodeValue(xqlCF + "ColumnMap");
- if (n_columnMap == null)
- n_columnMap = d_columnMap_;
-
- String n_columnKey = xmlUtils.getNodeValue(xqlCF + "ColumnKey");
- if (n_columnKey == null)
- n_columnKey = d_columnKey_;
-
- String n_columnValue = xmlUtils.getNodeValue(xqlCF + "ColumnValue");
- if (n_columnValue == null)
- n_columnValue = d_columnValue_;
-
- String n_columnTimestamp = xmlUtils.getNodeValue(xqlCF + "ColumnTimestamp");
- if (n_columnTimestamp == null)
- n_columnTimestamp = d_columnTimestamp_;
-
- // now populate the column family meta data and
- // insert it into the table dictionary.
- CFMetaData cfMetaData = new CFMetaData();
-
- cfMetaData.tableName = tName;
- cfMetaData.cfName = cfName;
-
- cfMetaData.columnType = columnType;
- cfMetaData.comparator = columnComparator;
+ if (columnType.equals("Super"))
+ {
+ subcolumnComparator = getComparator(columnFamily, "CompareSubcolumnsWith");
+ }
+ else if (XMLUtils.getAttributeValue(columnFamily, "CompareSubcolumnsWith") != null)
+ {
+ throw new ConfigurationException("CompareSubcolumnsWith is only a valid attribute on super columnfamilies (not regular columnfamily " + cfName + ")");
+ }
+
+ // see if flush period is set
+ String flushPeriodInMinutes = XMLUtils.getAttributeValue(columnFamily, "FlushPeriodInMinutes");
+ int flushPeriod=0;
+ if ( flushPeriodInMinutes != null )
+ flushPeriod = Integer.parseInt(flushPeriodInMinutes);
+
+
+ // Parse out user-specified logical names for the various dimensions
+ // of a the column family from the config.
+ String n_superColumnMap = xmlUtils.getNodeValue(xqlCF + "SuperColumnMap");
+ if (n_superColumnMap == null)
+ n_superColumnMap = d_superColumnMap_;
+
+ String n_superColumnKey = xmlUtils.getNodeValue(xqlCF + "SuperColumnKey");
+ if (n_superColumnKey == null)
+ n_superColumnKey = d_superColumnKey_;
+
+ String n_columnMap = xmlUtils.getNodeValue(xqlCF + "ColumnMap");
+ if (n_columnMap == null)
+ n_columnMap = d_columnMap_;
+
+ String n_columnKey = xmlUtils.getNodeValue(xqlCF + "ColumnKey");
+ if (n_columnKey == null)
+ n_columnKey = d_columnKey_;
+
+ String n_columnValue = xmlUtils.getNodeValue(xqlCF + "ColumnValue");
+ if (n_columnValue == null)
+ n_columnValue = d_columnValue_;
+
+ String n_columnTimestamp = xmlUtils.getNodeValue(xqlCF + "ColumnTimestamp");
+ if (n_columnTimestamp == null)
+ n_columnTimestamp = d_columnTimestamp_;
+
+ // now populate the column family meta data and
+ // insert it into the table dictionary.
+ CFMetaData cfMetaData = new CFMetaData();
+
+ cfMetaData.tableName = tName;
+ cfMetaData.cfName = cfName;
+
+ cfMetaData.columnType = columnType;
+ cfMetaData.comparator = columnComparator;
cfMetaData.subcolumnComparator = subcolumnComparator;
-
- cfMetaData.n_rowKey = n_rowKey;
- cfMetaData.n_columnMap = n_columnMap;
- cfMetaData.n_columnKey = n_columnKey;
- cfMetaData.n_columnValue = n_columnValue;
- cfMetaData.n_columnTimestamp = n_columnTimestamp;
- if ("Super".equals(columnType))
- {
- cfMetaData.n_superColumnKey = n_superColumnKey;
- cfMetaData.n_superColumnMap = n_superColumnMap;
- }
- cfMetaData.flushPeriodInMinutes = flushPeriod;
-
- tableToCFMetaDataMap_.get(tName).put(cfName, cfMetaData);
- }
- }
-
- // Hardcoded system tables
- Map<String, CFMetaData> systemMetadata = new HashMap<String, CFMetaData>();
-
- CFMetaData data = new CFMetaData();
- data.comparator = new AsciiType();
- systemMetadata.put(SystemTable.LOCATION_CF, data);
-
- data = new CFMetaData();
- data.columnType = "Super";
- data.comparator = new UTF8Type();
- data.subcolumnComparator = new BytesType();
- systemMetadata.put(HintedHandOffManager.HINTS_CF, data);
-
- tableToCFMetaDataMap_.put("system", systemMetadata);
-
- /* make sure we have a directory for each table */
- createTableDirectories();
-
- /* Load the seeds for node contact points */
- String[] seeds = xmlUtils.getNodeValues("/Storage/Seeds/Seed");
- for( int i = 0; i < seeds.length; ++i )
- {
- seeds_.add( seeds[i] );
- }
- }
- catch (ConfigurationException e)
- {
- logger_.error("Fatal error: " + e.getMessage());
- System.err.println("Bad configuration; unable to start server");
- System.exit(1);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- private static AbstractType getComparator(Node columnFamily, String attr)
- throws ConfigurationException, TransformerException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, InstantiationException
- {
- Class<? extends AbstractType> typeClass;
- String compareWith = XMLUtils.getAttributeValue(columnFamily, attr);
- if (compareWith == null)
- {
- typeClass = AsciiType.class;
- }
- else
- {
- String className = compareWith.contains(".") ? compareWith : "org.apache.cassandra.db.marshal." + compareWith;
- try
- {
- typeClass = (Class<? extends AbstractType>)Class.forName(className);
- }
- catch (ClassNotFoundException e)
- {
- throw new ConfigurationException("Unable to load class " + className + " for " + attr + " attribute");
- }
- }
- return typeClass.getConstructor().newInstance();
- }
-
- /**
- * Create the table directory in each data directory
- */
- public static void createTableDirectories() throws IOException
- {
- for (String dataFile : dataFileDirectories_)
- {
- FileUtils.createDirectory(dataFile + File.separator + Table.SYSTEM_TABLE);
- for (String table : tables_)
- {
- FileUtils.createDirectory(dataFile + File.separator + table);
- }
- }
- }
-
- /**
- * Create the metadata tables. This table has information about
- * the table name and the column families that make up the table.
- * Each column family also has an associated ID which is an int.
- */
- // TODO duplicating data b/t tablemetadata and CFMetaData is confusing and error-prone
- public static void storeMetadata() throws IOException
- {
- int cfId = 0;
- Set<String> tables = tableToCFMetaDataMap_.keySet();
-
- for (String table : tables)
- {
- Table.TableMetadata tmetadata = Table.TableMetadata.instance(table);
- if (tmetadata.isEmpty())
- {
- tmetadata = Table.TableMetadata.instance(table);
- /* Column families associated with this table */
- Map<String, CFMetaData> columnFamilies = tableToCFMetaDataMap_.get(table);
-
- for (String columnFamily : columnFamilies.keySet())
- {
- tmetadata.add(columnFamily, cfId++, DatabaseDescriptor.getColumnType(table, columnFamily));
- }
- }
- }
- }
-
- public static int getGcGraceInSeconds()
- {
- return gcGraceInSeconds_;
- }
-
- public static IPartitioner getPartitioner()
- {
- return partitioner_;
- }
-
- public static IEndPointSnitch getEndPointSnitch()
- {
- return endPointSnitch_;
- }
-
- public static Class getReplicaPlacementStrategyClass()
- {
- return replicaPlacementStrategyClass_;
- }
-
- public static String getCalloutLocation()
- {
- return calloutLocation_;
- }
-
- public static String getJobTrackerAddress()
- {
- return jobTrackerHost_;
- }
-
- public static int getColumnIndexSize()
- {
- return columnIndexSizeInKB_ * 1024;
- }
-
- public static int getMemtableLifetime()
- {
- return memtableLifetime_;
- }
-
- public static String getInitialToken()
- {
- return initialToken_;
- }
-
- public static int getMemtableSize()
- {
- return memtableSize_;
- }
-
- public static double getMemtableObjectCount()
- {
- return memtableObjectCount_;
- }
-
- public static boolean getConsistencyCheck()
- {
- return doConsistencyCheck_;
- }
-
- public static String getClusterName()
- {
- return clusterName_;
- }
-
- public static String getConfigFileName() {
- return configFileName_;
- }
-
- public static boolean isApplicationColumnFamily(String columnFamily)
- {
- return applicationColumnFamilies_.contains(columnFamily);
- }
-
- public static String getJobJarLocation()
- {
- return jobJarFileLocation_;
- }
-
- public static Map<String, CFMetaData> getTableMetaData(String tableName)
- {
- assert tableName != null;
- return tableToCFMetaDataMap_.get(tableName);
- }
-
- /*
- * Given a table name & column family name, get the column family
- * meta data. If the table name or column family name is not valid
- * this function returns null.
- */
- public static CFMetaData getCFMetaData(String tableName, String cfName)
- {
- assert tableName != null;
- Map<String, CFMetaData> cfInfo = tableToCFMetaDataMap_.get(tableName);
- if (cfInfo == null)
- return null;
-
- return cfInfo.get(cfName);
- }
-
- public static String getColumnType(String tableName, String cfName)
- {
- assert tableName != null;
- CFMetaData cfMetaData = getCFMetaData(tableName, cfName);
-
- if (cfMetaData == null)
- return null;
- return cfMetaData.columnType;
- }
-
- public static int getFlushPeriod(String tableName, String columnFamilyName)
- {
- assert tableName != null;
- CFMetaData cfMetaData = getCFMetaData(tableName, columnFamilyName);
-
- if (cfMetaData == null)
- return 0;
- return cfMetaData.flushPeriodInMinutes;
- }
-
- public static List<String> getTables()
- {
- return tables_;
- }
-
- public static String getTable(String tableName)
- {
- assert tableName != null;
- int index = getTables().indexOf(tableName);
- return index >= 0 ? getTables().get(index) : null;
- }
-
- public static void setTables(String table)
- {
- tables_.add(table);
- }
-
- public static int getStoragePort()
- {
- return storagePort_;
- }
-
- public static int getControlPort()
- {
- return controlPort_;
- }
-
- public static int getThriftPort()
- {
- return thriftPort_;
- }
-
- public static int getReplicationFactor()
- {
- return replicationFactor_;
- }
-
- public static int getQuorum()
- {
- return (replicationFactor_ / 2) + 1;
- }
-
- public static long getRpcTimeout()
- {
- return rpcTimeoutInMillis_;
- }
-
- public static int getConsistencyThreads()
- {
- return consistencyThreads_;
- }
-
- public static int getConcurrentReaders()
- {
- return concurrentReaders_;
- }
-
- public static int getConcurrentWriters()
- {
- return concurrentWriters_;
- }
-
- public static String[] getAllDataFileLocations()
- {
- return dataFileDirectories_;
- }
-
- /**
- * Get a list of data directories for a given table
- *
- * @param table name of the table.
- *
- * @return an array of path to the data directories.
- */
- public static String[] getAllDataFileLocationsForTable(String table)
- {
- String[] tableLocations = new String[dataFileDirectories_.length];
-
- for (int i = 0; i < dataFileDirectories_.length; i++)
- {
- tableLocations[i] = dataFileDirectories_[i] + File.separator + table;
- }
-
- return tableLocations;
- }
-
- public static String getDataFileLocationForTable(String table)
- {
- String dataFileDirectory = dataFileDirectories_[currentIndex_] + File.separator + table;
- currentIndex_ = (currentIndex_ + 1) % dataFileDirectories_.length;
- return dataFileDirectory;
- }
-
- public static String getBootstrapFileLocation()
- {
- return bootstrapFileDirectory_;
- }
-
- public static void setBootstrapFileLocation(String bfLocation)
- {
- bootstrapFileDirectory_ = bfLocation;
- }
-
- public static String getLogFileLocation()
- {
- return logFileDirectory_;
- }
-
- public static void setLogFileLocation(String logLocation)
- {
- logFileDirectory_ = logLocation;
- }
-
- public static Set<String> getSeeds()
- {
- return seeds_;
- }
-
- public static String getColumnFamilyType(String tableName, String cfName)
- {
- assert tableName != null;
- String cfType = getColumnType(tableName, cfName);
- if ( cfType == null )
- cfType = "Standard";
- return cfType;
- }
-
- /*
- * Loop through all the disks to see which disk has the max free space
- * return the disk with max free space for compactions. If the size of the expected
- * compacted file is greater than the max disk space available return null, we cannot
- * do compaction in this case.
- */
- public static String getDataFileLocationForTable(String table, long expectedCompactedFileSize)
- {
- long maxFreeDisk = 0;
- int maxDiskIndex = 0;
- String dataFileDirectory = null;
- String[] dataDirectoryForTable = getAllDataFileLocationsForTable(table);
-
- for ( int i = 0 ; i < dataDirectoryForTable.length ; i++ )
- {
- File f = new File(dataDirectoryForTable[i]);
- if( maxFreeDisk < f.getUsableSpace())
- {
- maxFreeDisk = f.getUsableSpace();
- maxDiskIndex = i;
- }
- }
- // Load factor of 0.9 we do not want to use the entire disk that is too risky.
- maxFreeDisk = (long)(0.9 * maxFreeDisk);
- if( expectedCompactedFileSize < maxFreeDisk )
- {
- dataFileDirectory = dataDirectoryForTable[maxDiskIndex];
- currentIndex_ = (maxDiskIndex + 1 )%dataDirectoryForTable.length ;
- }
- else
- {
- currentIndex_ = maxDiskIndex;
- }
- return dataFileDirectory;
- }
-
- public static AbstractType getComparator(String tableName, String cfName)
- {
- assert tableName != null;
- return getCFMetaData(tableName, cfName).comparator;
- }
-
- public static AbstractType getSubComparator(String tableName, String cfName)
- {
- assert tableName != null;
- return getCFMetaData(tableName, cfName).comparator;
- }
-
- public static Map<String, Map<String, CFMetaData>> getTableToColumnFamilyMap()
- {
- return tableToCFMetaDataMap_;
- }
-
- public static double getKeysCachedFraction(String tableName)
- {
- return tableKeysCachedFractions_.get(tableName);
- }
-
- private static class ConfigurationException extends Exception
- {
- public ConfigurationException(String message)
- {
- super(message);
- }
- }
-
- public static String getListenAddress()
- {
- return listenAddress_;
- }
-
- public static String getThriftAddress()
- {
- return thriftAddress_;
- }
-
- public static int getCommitLogSyncDelay()
- {
- return commitLogSyncDelay_;
- }
-
- public static boolean isCommitLogSyncEnabled()
- {
- return commitLogSync_;
- }
-}
+
+ cfMetaData.n_rowKey = n_rowKey;
+ cfMetaData.n_columnMap = n_columnMap;
+ cfMetaData.n_columnKey = n_columnKey;
+ cfMetaData.n_columnValue = n_columnValue;
+ cfMetaData.n_columnTimestamp = n_columnTimestamp;
+ if ("Super".equals(columnType))
+ {
+ cfMetaData.n_superColumnKey = n_superColumnKey;
+ cfMetaData.n_superColumnMap = n_superColumnMap;
+ }
+ cfMetaData.flushPeriodInMinutes = flushPeriod;
+
+ tableToCFMetaDataMap_.get(tName).put(cfName, cfMetaData);
+ }
+ }
+
+ // Hardcoded system tables
+ Map<String, CFMetaData> systemMetadata = new HashMap<String, CFMetaData>();
+
+ CFMetaData data = new CFMetaData();
+ data.comparator = new AsciiType();
+ systemMetadata.put(SystemTable.LOCATION_CF, data);
+
+ data = new CFMetaData();
+ data.columnType = "Super";
+ data.comparator = new UTF8Type();
+ data.subcolumnComparator = new BytesType();
+ systemMetadata.put(HintedHandOffManager.HINTS_CF, data);
+
+ tableToCFMetaDataMap_.put("system", systemMetadata);
+
+ /* make sure we have a directory for each table */
+ createTableDirectories();
+
+ /* Load the seeds for node contact points */
+ String[] seeds = xmlUtils.getNodeValues("/Storage/Seeds/Seed");
+ for( int i = 0; i < seeds.length; ++i )
+ {
+ seeds_.add( seeds[i] );
+ }
+ }
+ catch (ConfigurationException e)
+ {
+ logger_.error("Fatal error: " + e.getMessage());
+ System.err.println("Bad configuration; unable to start server");
+ System.exit(1);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static AbstractType getComparator(Node columnFamily, String attr)
+ throws ConfigurationException, TransformerException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, InstantiationException
+ {
+ Class<? extends AbstractType> typeClass;
+ String compareWith = XMLUtils.getAttributeValue(columnFamily, attr);
+ if (compareWith == null)
+ {
+ typeClass = AsciiType.class;
+ }
+ else
+ {
+ String className = compareWith.contains(".") ? compareWith : "org.apache.cassandra.db.marshal." + compareWith;
+ try
+ {
+ typeClass = (Class<? extends AbstractType>)Class.forName(className);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new ConfigurationException("Unable to load class " + className + " for " + attr + " attribute");
+ }
+ }
+ return typeClass.getConstructor().newInstance();
+ }
+
+ /**
+ * Create the table directory in each data directory
+ */
+ public static void createTableDirectories() throws IOException
+ {
+ for (String dataFile : dataFileDirectories_)
+ {
+ FileUtils.createDirectory(dataFile + File.separator + Table.SYSTEM_TABLE);
+ for (String table : tables_)
+ {
+ FileUtils.createDirectory(dataFile + File.separator + table);
+ }
+ }
+ }
+
+ /**
+ * Create the metadata tables. This table has information about
+ * the table name and the column families that make up the table.
+ * Each column family also has an associated ID which is an int.
+ */
+ // TODO duplicating data b/t tablemetadata and CFMetaData is confusing and error-prone
+ public static void storeMetadata() throws IOException
+ {
+ int cfId = 0;
+ Set<String> tables = tableToCFMetaDataMap_.keySet();
+
+ for (String table : tables)
+ {
+ Table.TableMetadata tmetadata = Table.TableMetadata.instance(table);
+ if (tmetadata.isEmpty())
+ {
+ tmetadata = Table.TableMetadata.instance(table);
+ /* Column families associated with this table */
+ Map<String, CFMetaData> columnFamilies = tableToCFMetaDataMap_.get(table);
+
+ for (String columnFamily : columnFamilies.keySet())
+ {
+ tmetadata.add(columnFamily, cfId++, DatabaseDescriptor.getColumnType(table, columnFamily));
+ }
+ }
+ }
+ }
+
+ public static int getGcGraceInSeconds()
+ {
+ return gcGraceInSeconds_;
+ }
+
+ public static IPartitioner getPartitioner()
+ {
+ return partitioner_;
+ }
+
+ public static IEndPointSnitch getEndPointSnitch()
+ {
+ return endPointSnitch_;
+ }
+
+ public static Class getReplicaPlacementStrategyClass()
+ {
+ return replicaPlacementStrategyClass_;
+ }
+
+ public static String getCalloutLocation()
+ {
+ return calloutLocation_;
+ }
+
+ public static String getJobTrackerAddress()
+ {
+ return jobTrackerHost_;
+ }
+
+ public static int getColumnIndexSize()
+ {
+ return columnIndexSizeInKB_ * 1024;
+ }
+
+ public static int getMemtableLifetime()
+ {
+ return memtableLifetime_;
+ }
+
+ public static String getInitialToken()
+ {
+ return initialToken_;
+ }
+
+ public static int getMemtableSize()
+ {
+ return memtableSize_;
+ }
+
+ public static double getMemtableObjectCount()
+ {
+ return memtableObjectCount_;
+ }
+
+ public static boolean getConsistencyCheck()
+ {
+ return doConsistencyCheck_;
+ }
+
+ public static String getClusterName()
+ {
+ return clusterName_;
+ }
+
+ public static String getConfigFileName() {
+ return configFileName_;
+ }
+
+ public static boolean isApplicationColumnFamily(String columnFamily)
+ {
+ return applicationColumnFamilies_.contains(columnFamily);
+ }
+
+ public static String getJobJarLocation()
+ {
+ return jobJarFileLocation_;
+ }
+
+ public static Map<String, CFMetaData> getTableMetaData(String tableName)
+ {
+ assert tableName != null;
+ return tableToCFMetaDataMap_.get(tableName);
+ }
+
+ /*
+ * Given a table name & column family name, get the column family
+ * meta data. If the table name or column family name is not valid
+ * this function returns null.
+ */
+ public static CFMetaData getCFMetaData(String tableName, String cfName)
+ {
+ assert tableName != null;
+ Map<String, CFMetaData> cfInfo = tableToCFMetaDataMap_.get(tableName);
+ if (cfInfo == null)
+ return null;
+
+ return cfInfo.get(cfName);
+ }
+
+ public static String getColumnType(String tableName, String cfName)
+ {
+ assert tableName != null;
+ CFMetaData cfMetaData = getCFMetaData(tableName, cfName);
+
+ if (cfMetaData == null)
+ return null;
+ return cfMetaData.columnType;
+ }
+
+ public static int getFlushPeriod(String tableName, String columnFamilyName)
+ {
+ assert tableName != null;
+ CFMetaData cfMetaData = getCFMetaData(tableName, columnFamilyName);
+
+ if (cfMetaData == null)
+ return 0;
+ return cfMetaData.flushPeriodInMinutes;
+ }
+
+ public static List<String> getTables()
+ {
+ return tables_;
+ }
+
+ public static String getTable(String tableName)
+ {
+ assert tableName != null;
+ int index = getTables().indexOf(tableName);
+ return index >= 0 ? getTables().get(index) : null;
+ }
+
+ public static void setTables(String table)
+ {
+ tables_.add(table);
+ }
+
+ public static int getStoragePort()
+ {
+ return storagePort_;
+ }
+
+ public static int getControlPort()
+ {
+ return controlPort_;
+ }
+
+ public static int getThriftPort()
+ {
+ return thriftPort_;
+ }
+
+ public static int getReplicationFactor()
+ {
+ return replicationFactor_;
+ }
+
+ public static int getQuorum()
+ {
+ return (replicationFactor_ / 2) + 1;
+ }
+
+ public static long getRpcTimeout()
+ {
+ return rpcTimeoutInMillis_;
+ }
+
+ public static int getConsistencyThreads()
+ {
+ return consistencyThreads_;
+ }
+
+ public static int getConcurrentReaders()
+ {
+ return concurrentReaders_;
+ }
+
+ public static int getConcurrentWriters()
+ {
+ return concurrentWriters_;
+ }
+
+ public static String[] getAllDataFileLocations()
+ {
+ return dataFileDirectories_;
+ }
+
+ /**
+ * Get a list of data directories for a given table
+ *
+ * @param table name of the table.
+ *
+ * @return an array of path to the data directories.
+ */
+ public static String[] getAllDataFileLocationsForTable(String table)
+ {
+ String[] tableLocations = new String[dataFileDirectories_.length];
+
+ for (int i = 0; i < dataFileDirectories_.length; i++)
+ {
+ tableLocations[i] = dataFileDirectories_[i] + File.separator + table;
+ }
+
+ return tableLocations;
+ }
+
+ public static String getDataFileLocationForTable(String table)
+ {
+ String dataFileDirectory = dataFileDirectories_[currentIndex_] + File.separator + table;
+ currentIndex_ = (currentIndex_ + 1) % dataFileDirectories_.length;
+ return dataFileDirectory;
+ }
+
+ public static String getBootstrapFileLocation()
+ {
+ return bootstrapFileDirectory_;
+ }
+
+ public static void setBootstrapFileLocation(String bfLocation)
+ {
+ bootstrapFileDirectory_ = bfLocation;
+ }
+
+ public static String getLogFileLocation()
+ {
+ return logFileDirectory_;
+ }
+
+ public static void setLogFileLocation(String logLocation)
+ {
+ logFileDirectory_ = logLocation;
+ }
+
+ public static Set<String> getSeeds()
+ {
+ return seeds_;
+ }
+
+ public static String getColumnFamilyType(String tableName, String cfName)
+ {
+ assert tableName != null;
+ String cfType = getColumnType(tableName, cfName);
+ if ( cfType == null )
+ cfType = "Standard";
+ return cfType;
+ }
+
+ /*
+ * Loop through all the disks to see which disk has the max free space
+ * return the disk with max free space for compactions. If the size of the expected
+ * compacted file is greater than the max disk space available return null, we cannot
+ * do compaction in this case.
+ */
+ public static String getDataFileLocationForTable(String table, long expectedCompactedFileSize)
+ {
+ long maxFreeDisk = 0;
+ int maxDiskIndex = 0;
+ String dataFileDirectory = null;
+ String[] dataDirectoryForTable = getAllDataFileLocationsForTable(table);
+
+ for ( int i = 0 ; i < dataDirectoryForTable.length ; i++ )
+ {
+ File f = new File(dataDirectoryForTable[i]);
+ if( maxFreeDisk < f.getUsableSpace())
+ {
+ maxFreeDisk = f.getUsableSpace();
+ maxDiskIndex = i;
+ }
+ }
+ // Load factor of 0.9 we do not want to use the entire disk that is too risky.
+ maxFreeDisk = (long)(0.9 * maxFreeDisk);
+ if( expectedCompactedFileSize < maxFreeDisk )
+ {
+ dataFileDirectory = dataDirectoryForTable[maxDiskIndex];
+ currentIndex_ = (maxDiskIndex + 1 )%dataDirectoryForTable.length ;
+ }
+ else
+ {
+ currentIndex_ = maxDiskIndex;
+ }
+ return dataFileDirectory;
+ }
+
+ public static AbstractType getComparator(String tableName, String cfName)
+ {
+ assert tableName != null;
+ return getCFMetaData(tableName, cfName).comparator;
+ }
+
+ public static AbstractType getSubComparator(String tableName, String cfName)
+ {
+ assert tableName != null;
+ return getCFMetaData(tableName, cfName).comparator;
+ }
+
+ public static Map<String, Map<String, CFMetaData>> getTableToColumnFamilyMap()
+ {
+ return tableToCFMetaDataMap_;
+ }
+
+ public static double getKeysCachedFraction(String tableName)
+ {
+ return tableKeysCachedFractions_.get(tableName);
+ }
+
+ private static class ConfigurationException extends Exception
+ {
+ public ConfigurationException(String message)
+ {
+ super(message);
+ }
+ }
+
+ public static String getListenAddress()
+ {
+ return listenAddress_;
+ }
+
+ public static String getThriftAddress()
+ {
+ return thriftAddress_;
+ }
+
+ public static int getCommitLogSyncDelay()
+ {
+ return commitLogSyncDelay_;
+ }
+
+ public static boolean isCommitLogSyncEnabled()
+ {
+ return commitLogSync_;
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Thu Jul 30 15:30:21 2009
@@ -1,162 +1,162 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.cassandra.io.SSTableWriter;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.config.DatabaseDescriptor;
-
-import org.apache.log4j.Logger;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class BinaryMemtable
-{
- private static Logger logger_ = Logger.getLogger( Memtable.class );
- private int threshold_ = 512*1024*1024;
- private AtomicInteger currentSize_ = new AtomicInteger(0);
-
- /* Table and ColumnFamily name are used to determine the ColumnFamilyStore */
- private String table_;
- private String cfName_;
- private boolean isFrozen_ = false;
- private Map<String, byte[]> columnFamilies_ = new NonBlockingHashMap<String, byte[]>();
- /* Lock and Condition for notifying new clients about Memtable switches */
- Lock lock_ = new ReentrantLock();
- Condition condition_;
-
- BinaryMemtable(String table, String cfName) throws IOException
- {
- condition_ = lock_.newCondition();
- table_ = table;
- cfName_ = cfName;
- }
-
- public int getMemtableThreshold()
- {
- return currentSize_.get();
- }
-
- void resolveSize(int oldSize, int newSize)
- {
- currentSize_.addAndGet(newSize - oldSize);
- }
-
-
- boolean isThresholdViolated()
- {
- if (currentSize_.get() >= threshold_ || columnFamilies_.size() > 50000)
- {
- if (logger_.isDebugEnabled())
- logger_.debug("CURRENT SIZE:" + currentSize_.get());
- return true;
- }
- return false;
- }
-
- String getColumnFamily()
- {
- return cfName_;
- }
-
- /*
- * This version is used by the external clients to put data into
- * the memtable. This version will respect the threshold and flush
- * the memtable to disk when the size exceeds the threshold.
- */
- void put(String key, byte[] buffer) throws IOException
- {
- if (isThresholdViolated() )
- {
- lock_.lock();
- try
- {
- ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
- if (!isFrozen_)
- {
- isFrozen_ = true;
- BinaryMemtableManager.instance().submit(cfStore.getColumnFamilyName(), this);
- cfStore.switchBinaryMemtable(key, buffer);
- }
- else
- {
- cfStore.applyBinary(key, buffer);
- }
- }
- finally
- {
- lock_.unlock();
- }
- }
- else
- {
- resolve(key, buffer);
- }
- }
-
- private void resolve(String key, byte[] buffer)
- {
- columnFamilies_.put(key, buffer);
- currentSize_.addAndGet(buffer.length + key.length());
- }
-
-
- /*
- *
- */
- void flush() throws IOException
- {
- if ( columnFamilies_.size() == 0 )
- return;
-
- /*
- * Use the SSTable to write the contents of the TreeMap
- * to disk.
- */
- ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
- List<String> keys = new ArrayList<String>( columnFamilies_.keySet() );
- SSTableWriter writer = new SSTableWriter(cfStore.getTempSSTablePath(), keys.size(), StorageService.getPartitioner());
- Collections.sort(keys);
- /* Use this BloomFilter to decide if a key exists in a SSTable */
- for ( String key : keys )
- {
- byte[] bytes = columnFamilies_.get(key);
- if ( bytes.length > 0 )
- {
- /* Now write the key and value to disk */
- writer.append(key, bytes);
- }
- }
- cfStore.storeLocation(writer.closeAndOpenReader(DatabaseDescriptor.getKeysCachedFraction(table_)));
- columnFamilies_.clear();
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.io.SSTableWriter;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+import org.apache.log4j.Logger;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class BinaryMemtable
+{
+ private static Logger logger_ = Logger.getLogger( Memtable.class );
+ private int threshold_ = 512*1024*1024;
+ private AtomicInteger currentSize_ = new AtomicInteger(0);
+
+ /* Table and ColumnFamily name are used to determine the ColumnFamilyStore */
+ private String table_;
+ private String cfName_;
+ private boolean isFrozen_ = false;
+ private Map<String, byte[]> columnFamilies_ = new NonBlockingHashMap<String, byte[]>();
+ /* Lock and Condition for notifying new clients about Memtable switches */
+ Lock lock_ = new ReentrantLock();
+ Condition condition_;
+
+ BinaryMemtable(String table, String cfName) throws IOException
+ {
+ condition_ = lock_.newCondition();
+ table_ = table;
+ cfName_ = cfName;
+ }
+
+ public int getMemtableThreshold()
+ {
+ return currentSize_.get();
+ }
+
+ void resolveSize(int oldSize, int newSize)
+ {
+ currentSize_.addAndGet(newSize - oldSize);
+ }
+
+
+ boolean isThresholdViolated()
+ {
+ if (currentSize_.get() >= threshold_ || columnFamilies_.size() > 50000)
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("CURRENT SIZE:" + currentSize_.get());
+ return true;
+ }
+ return false;
+ }
+
+ String getColumnFamily()
+ {
+ return cfName_;
+ }
+
+ /*
+ * This version is used by the external clients to put data into
+ * the memtable. This version will respect the threshold and flush
+ * the memtable to disk when the size exceeds the threshold.
+ */
+ void put(String key, byte[] buffer) throws IOException
+ {
+ if (isThresholdViolated() )
+ {
+ lock_.lock();
+ try
+ {
+ ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
+ if (!isFrozen_)
+ {
+ isFrozen_ = true;
+ BinaryMemtableManager.instance().submit(cfStore.getColumnFamilyName(), this);
+ cfStore.switchBinaryMemtable(key, buffer);
+ }
+ else
+ {
+ cfStore.applyBinary(key, buffer);
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ else
+ {
+ resolve(key, buffer);
+ }
+ }
+
+ private void resolve(String key, byte[] buffer)
+ {
+ columnFamilies_.put(key, buffer);
+ currentSize_.addAndGet(buffer.length + key.length());
+ }
+
+
+ /*
+ *
+ */
+ void flush() throws IOException
+ {
+ if ( columnFamilies_.size() == 0 )
+ return;
+
+ /*
+ * Use the SSTable to write the contents of the TreeMap
+ * to disk.
+ */
+ ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
+ List<String> keys = new ArrayList<String>( columnFamilies_.keySet() );
+ SSTableWriter writer = new SSTableWriter(cfStore.getTempSSTablePath(), keys.size(), StorageService.getPartitioner());
+ Collections.sort(keys);
+ /* Use this BloomFilter to decide if a key exists in a SSTable */
+ for ( String key : keys )
+ {
+ byte[] bytes = columnFamilies_.get(key);
+ if ( bytes.length > 0 )
+ {
+ /* Now write the key and value to disk */
+ writer.append(key, bytes);
+ }
+ }
+ cfStore.storeLocation(writer.closeAndOpenReader(DatabaseDescriptor.getKeysCachedFraction(table_)));
+ columnFamilies_.clear();
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtableManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtableManager.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtableManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtableManager.java Thu Jul 30 15:30:21 2009
@@ -1,92 +1,92 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
-import org.apache.cassandra.concurrent.ThreadFactoryImpl;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class BinaryMemtableManager
-{
- private static BinaryMemtableManager instance_;
- private static Lock lock_ = new ReentrantLock();
- private static Logger logger_ = Logger.getLogger(BinaryMemtableManager.class);
-
- static BinaryMemtableManager instance()
- {
- if ( instance_ == null )
- {
- lock_.lock();
- try
- {
- if ( instance_ == null )
- instance_ = new BinaryMemtableManager();
- }
- finally
- {
- lock_.unlock();
- }
- }
- return instance_;
- }
-
- class BinaryMemtableFlusher implements Runnable
- {
- private BinaryMemtable memtable_;
-
- BinaryMemtableFlusher(BinaryMemtable memtable)
- {
- memtable_ = memtable;
- }
-
- public void run()
- {
- try
- {
- memtable_.flush();
- }
- catch (IOException e)
- {
- if (logger_.isDebugEnabled())
- logger_.debug( LogUtil.throwableToString(e) );
- }
- }
- }
-
- private ExecutorService flusher_ = new DebuggableThreadPoolExecutor("BINARY-MEMTABLE-FLUSHER-POOL");
-
- /* Submit memtables to be flushed to disk */
- void submit(String cfName, BinaryMemtable memtbl)
- {
- flusher_.submit( new BinaryMemtableFlusher(memtbl) );
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class BinaryMemtableManager
+{
+ private static BinaryMemtableManager instance_;
+ private static Lock lock_ = new ReentrantLock();
+ private static Logger logger_ = Logger.getLogger(BinaryMemtableManager.class);
+
+ static BinaryMemtableManager instance()
+ {
+ if ( instance_ == null )
+ {
+ lock_.lock();
+ try
+ {
+ if ( instance_ == null )
+ instance_ = new BinaryMemtableManager();
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ return instance_;
+ }
+
+ class BinaryMemtableFlusher implements Runnable
+ {
+ private BinaryMemtable memtable_;
+
+ BinaryMemtableFlusher(BinaryMemtable memtable)
+ {
+ memtable_ = memtable;
+ }
+
+ public void run()
+ {
+ try
+ {
+ memtable_.flush();
+ }
+ catch (IOException e)
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug( LogUtil.throwableToString(e) );
+ }
+ }
+ }
+
+ private ExecutorService flusher_ = new DebuggableThreadPoolExecutor("BINARY-MEMTABLE-FLUSHER-POOL");
+
+ /* Submit memtables to be flushed to disk */
+ void submit(String cfName, BinaryMemtable memtbl)
+ {
+ flusher_.submit( new BinaryMemtableFlusher(memtbl) );
+ }
+}