You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 07:45:23 UTC
svn commit: r759033 [1/2] - in
/incubator/cassandra/trunk/src/org/apache/cassandra: concurrent/ config/
cql/common/ db/ dht/ gms/ io/ locator/ net/ test/ tools/ utils/
Author: alakshman
Date: Fri Mar 27 06:45:19 2009
New Revision: 759033
URL: http://svn.apache.org/viewvc?rev=759033&view=rev
Log:
Final changes made to fix a checkin anomaly.
Added:
incubator/cassandra/trunk/src/org/apache/cassandra/tools/ClusterTool.java
- copied unchanged from r758964, incubator/cassandra/trunk/src/org/apache/cassandra/tools/ClusterTool.java
incubator/cassandra/trunk/src/org/apache/cassandra/tools/FileSizeTokenGenerator.java
- copied unchanged from r758964, incubator/cassandra/trunk/src/org/apache/cassandra/tools/FileSizeTokenGenerator.java
Removed:
incubator/cassandra/trunk/src/org/apache/cassandra/db/WriteResponse.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/BigIntegerToken.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/IPartitioner.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/OrderPreservingPartitioner.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/RandomPartitioner.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/StringToken.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/Token.java
incubator/cassandra/trunk/src/org/apache/cassandra/utils/DestructivePQIterator.java
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java
incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java
incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java
incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java
incubator/cassandra/trunk/src/org/apache/cassandra/gms/FailureDetector.java
incubator/cassandra/trunk/src/org/apache/cassandra/gms/Gossiper.java
incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java
incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java
incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java
incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java
incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java
incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java
incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java
incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java
incubator/cassandra/trunk/src/org/apache/cassandra/net/Message.java
incubator/cassandra/trunk/src/org/apache/cassandra/test/DBTest.java
incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
incubator/cassandra/trunk/src/org/apache/cassandra/test/SSTableTest.java
incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleaner.java
incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java
incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java
incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdater.java
incubator/cassandra/trunk/src/org/apache/cassandra/utils/BasicUtilities.java
incubator/cassandra/trunk/src/org/apache/cassandra/utils/FBUtilities.java
incubator/cassandra/trunk/src/org/apache/cassandra/utils/FastObjectHash.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Fri Mar 27 06:45:19 2009
@@ -57,23 +57,6 @@
public void afterExecute(Runnable r, Throwable t)
{
super.afterExecute(r,t);
-
- if (r instanceof FutureTask) {
- assert t == null;
- try
- {
- ((FutureTask)r).get();
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e)
- {
- t = e;
- }
- }
-
if ( t != null )
{
Context ctx = ThreadLocalContext.get();
@@ -83,10 +66,20 @@
if ( object != null )
{
- logger_.error("In afterExecute() " + t.getClass().getName() + " occured while working with " + object);
+ logger_.info("**** In afterExecute() " + t.getClass().getName() + " occured while working with " + object + " ****");
+ }
+ else
+ {
+ logger_.info("**** In afterExecute() " + t.getClass().getName() + " occured ****");
}
}
- logger_.error("Error in ThreadPoolExecutor", t);
+
+ Throwable cause = t.getCause();
+ if ( cause != null )
+ {
+ logger_.info( LogUtil.throwableToString(cause) );
+ }
+ logger_.info( LogUtil.throwableToString(t) );
}
}
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/config/DatabaseDescriptor.java Fri Mar 27 06:45:19 2009
@@ -18,24 +18,19 @@
package org.apache.cassandra.config;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.io.*;
import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.SystemTable;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.TypeInfo;
+import org.apache.cassandra.db.Table.TableMetadata;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.XMLUtils;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
+import org.apache.cassandra.io.*;
/**
@@ -89,7 +84,7 @@
*/
private static Map<String, Map<String, CFMetaData>> tableToCFMetaDataMap_;
/* Hashing strategy Random or OPHF */
- private static String partitionerClass_;
+ private static String hashingStrategy_ = DatabaseDescriptor.random_;
/* if the size of columns or super-columns are more than this, indexing will kick in */
private static int columnIndexSizeInKB_;
/* Size of touch key cache */
@@ -120,358 +115,306 @@
// the path qualified config file (storage-conf.xml) name
private static String configFileName_;
- static
+ public static Map<String, Map<String, CFMetaData>> init(String filePath) throws Throwable
{
- try
- {
- String file = System.getProperty("storage-config") + System.getProperty("file.separator") + "storage-conf.xml";
- String os = System.getProperty("os.name");
- XMLUtils xmlUtils = new XMLUtils(file);
-
- /* Cluster Name */
- clusterName_ = xmlUtils.getNodeValue("/Storage/ClusterName");
-
- /* Ganglia servers contact list */
- gangliaServers_ = xmlUtils.getNodeValues("/Storage/GangliaServers/GangliaServer");
-
- /* ZooKeeper's address */
- zkAddress_ = xmlUtils.getNodeValue("/Storage/ZookeeperAddress");
-
- /* Hashing strategy */
- partitionerClass_ = xmlUtils.getNodeValue("/Storage/Partitioner");
- /* Callout location */
- calloutLocation_ = xmlUtils.getNodeValue("/Storage/CalloutLocation");
-
- /* JobTracker address */
- jobTrackerHost_ = xmlUtils.getNodeValue("/Storage/JobTrackerHost");
-
- /* Job Jar file location */
- jobJarFileLocation_ = xmlUtils.getNodeValue("/Storage/JobJarFileLocation");
-
- /* Zookeeper's session timeout */
- String zkSessionTimeout = xmlUtils.getNodeValue("/Storage/ZookeeperSessionTimeout");
- if ( zkSessionTimeout != null )
- zkSessionTimeout_ = Integer.parseInt(zkSessionTimeout);
-
- /* Data replication factor */
- String replicationFactor = xmlUtils.getNodeValue("/Storage/ReplicationFactor");
- if ( replicationFactor != null )
- replicationFactor_ = Integer.parseInt(replicationFactor);
-
- /* RPC Timeout */
- String rpcTimeoutInMillis = xmlUtils.getNodeValue("/Storage/RpcTimeoutInMillis");
- if ( rpcTimeoutInMillis != null )
- rpcTimeoutInMillis_ = Integer.parseInt(rpcTimeoutInMillis);
-
- /* Thread per pool */
- String threadsPerPool = xmlUtils.getNodeValue("/Storage/ThreadsPerPool");
- if ( threadsPerPool != null )
- threadsPerPool_ = Integer.parseInt(threadsPerPool);
-
- /* TCP port on which the storage system listens */
- String port = xmlUtils.getNodeValue("/Storage/StoragePort");
- if ( port != null )
- storagePort_ = Integer.parseInt(port);
-
- /* UDP port for control messages */
- port = xmlUtils.getNodeValue("/Storage/ControlPort");
- if ( port != null )
- controlPort_ = Integer.parseInt(port);
-
- /* HTTP port for HTTP messages */
- port = xmlUtils.getNodeValue("/Storage/HttpPort");
- if ( port != null )
- httpPort_ = Integer.parseInt(port);
-
- /* Touch Key Cache Size */
- String touchKeyCacheSize = xmlUtils.getNodeValue("/Storage/TouchKeyCacheSize");
- if ( touchKeyCacheSize != null )
- touchKeyCacheSize_ = Integer.parseInt(touchKeyCacheSize);
-
- /* Number of days to keep the memtable around w/o flushing */
- String lifetime = xmlUtils.getNodeValue("/Storage/MemtableLifetimeInDays");
- if ( lifetime != null )
- memtableLifetime_ = Integer.parseInt(lifetime);
-
- /* Size of the memtable in memory in MB before it is dumped */
- String memtableSize = xmlUtils.getNodeValue("/Storage/MemtableSizeInMB");
- if ( memtableSize != null )
- memtableSize_ = Integer.parseInt(memtableSize);
- /* Number of objects in millions in the memtable before it is dumped */
- String memtableObjectCount = xmlUtils.getNodeValue("/Storage/MemtableObjectCountInMillions");
- if ( memtableObjectCount != null )
- memtableObjectCount_ = Integer.parseInt(memtableObjectCount);
-
- /* This parameter enables or disables consistency checks.
- * If set to false the read repairs are disable for very
- * high throughput on reads but at the cost of consistency.*/
- String doConsistencyCheck = xmlUtils.getNodeValue("/Storage/DoConsistencyChecksBoolean");
- if ( doConsistencyCheck != null )
- doConsistencyCheck_ = Boolean.parseBoolean(doConsistencyCheck);
-
-
- /* read the size at which we should do column indexes */
- String columnIndexSizeInKB = xmlUtils.getNodeValue("/Storage/ColumnIndexSizeInKB");
- if(columnIndexSizeInKB == null)
- {
- columnIndexSizeInKB_ = 64;
- }
- else
- {
- columnIndexSizeInKB_ = Integer.parseInt(columnIndexSizeInKB);
- }
-
- /* metadata directory */
- metadataDirectory_ = xmlUtils.getNodeValue("/Storage/MetadataDirectory");
- if ( metadataDirectory_ != null )
- FileUtils.createDirectory(metadataDirectory_);
- else
- {
- if ( os.equals("Linux") )
- {
- metadataDirectory_ = "/var/storage/system";
- }
- }
-
- /* snapshot directory */
- snapshotDirectory_ = xmlUtils.getNodeValue("/Storage/SnapshotDirectory");
- if ( snapshotDirectory_ != null )
- FileUtils.createDirectory(snapshotDirectory_);
- else
- {
- snapshotDirectory_ = metadataDirectory_ + System.getProperty("file.separator") + "snapshot";
- }
-
- /* map output directory */
- mapOutputDirectories_ = xmlUtils.getNodeValues("/Storage/MapOutputDirectories/MapOutputDirectory");
- if ( mapOutputDirectories_.length > 0 )
- {
- for ( String mapOutputDirectory : mapOutputDirectories_ )
- FileUtils.createDirectory(mapOutputDirectory);
- }
+ /* Read the configuration file to retrieve DB related properties. */
+ String file = filePath + System.getProperty("file.separator") + "storage-conf.xml";
+ return initInternal(file);
+ }
- /* data file directory */
- dataFileDirectories_ = xmlUtils.getNodeValues("/Storage/DataFileDirectories/DataFileDirectory");
- if ( dataFileDirectories_.length > 0 )
- {
- for ( String dataFileDirectory : dataFileDirectories_ )
- FileUtils.createDirectory(dataFileDirectory);
- }
- else
- {
- if ( os.equals("Linux") )
- {
- dataFileDirectories_ = new String[]{"/var/storage/data"};
- }
- }
+ public static Map<String, Map<String, CFMetaData>> init() throws Throwable
+ {
+ /* Read the configuration file to retrieve DB related properties. */
+ configFileName_ = System.getProperty("storage-config") + System.getProperty("file.separator") + "storage-conf.xml";
+ return initInternal(configFileName_);
+ }
+
+ public static Map<String, Map<String, CFMetaData>> initInternal(String file) throws Throwable
+ {
+ String os = System.getProperty("os.name");
+ XMLUtils xmlUtils = new XMLUtils(file);
- /* bootstrap file directory */
- bootstrapFileDirectory_ = xmlUtils.getNodeValue("/Storage/BootstrapFileDirectory");
- if ( bootstrapFileDirectory_ != null )
- FileUtils.createDirectory(bootstrapFileDirectory_);
- else
- {
- if ( os.equals("Linux") )
- {
- bootstrapFileDirectory_ = "/var/storage/bootstrap";
- }
- }
+ /* Cluster Name */
+ clusterName_ = xmlUtils.getNodeValue("/Storage/ClusterName");
- /* commit log directory */
- logFileDirectory_ = xmlUtils.getNodeValue("/Storage/CommitLogDirectory");
- if ( logFileDirectory_ != null )
- FileUtils.createDirectory(logFileDirectory_);
- else
- {
- if ( os.equals("Linux") )
- {
- logFileDirectory_ = "/var/storage/commitlog";
- }
- }
+ /* Ganglia servers contact list */
+ gangliaServers_ = xmlUtils.getNodeValues("/Storage/GangliaServers/GangliaServer");
+
+ /* ZooKeeper's address */
+ zkAddress_ = xmlUtils.getNodeValue("/Storage/ZookeeperAddress");
+
+ /* Hashing strategy */
+ hashingStrategy_ = xmlUtils.getNodeValue("/Storage/HashingStrategy");
+ /* Callout location */
+ calloutLocation_ = xmlUtils.getNodeValue("/Storage/CalloutLocation");
+
+ /* JobTracker address */
+ jobTrackerHost_ = xmlUtils.getNodeValue("/Storage/JobTrackerHost");
+
+ /* Job Jar file location */
+ jobJarFileLocation_ = xmlUtils.getNodeValue("/Storage/JobJarFileLocation");
+
+ /* Zookeeper's session timeout */
+ String zkSessionTimeout = xmlUtils.getNodeValue("/Storage/ZookeeperSessionTimeout");
+ if ( zkSessionTimeout != null )
+ zkSessionTimeout_ = Integer.parseInt(zkSessionTimeout);
+
+ /* Data replication factor */
+ String replicationFactor = xmlUtils.getNodeValue("/Storage/ReplicationFactor");
+ if ( replicationFactor != null )
+ replicationFactor_ = Integer.parseInt(replicationFactor);
+
+ /* RPC Timeout */
+ String rpcTimeoutInMillis = xmlUtils.getNodeValue("/Storage/RpcTimeoutInMillis");
+ if ( rpcTimeoutInMillis != null )
+ rpcTimeoutInMillis_ = Integer.parseInt(rpcTimeoutInMillis);
+
+ /* Thread per pool */
+ String threadsPerPool = xmlUtils.getNodeValue("/Storage/ThreadsPerPool");
+ if ( threadsPerPool != null )
+ threadsPerPool_ = Integer.parseInt(threadsPerPool);
+
+ /* TCP port on which the storage system listens */
+ String port = xmlUtils.getNodeValue("/Storage/StoragePort");
+ if ( port != null )
+ storagePort_ = Integer.parseInt(port);
+
+ /* UDP port for control messages */
+ port = xmlUtils.getNodeValue("/Storage/ControlPort");
+ if ( port != null )
+ controlPort_ = Integer.parseInt(port);
+
+ /* HTTP port for HTTP messages */
+ port = xmlUtils.getNodeValue("/Storage/HttpPort");
+ if ( port != null )
+ httpPort_ = Integer.parseInt(port);
+
+ /* Touch Key Cache Size */
+ String touchKeyCacheSize = xmlUtils.getNodeValue("/Storage/TouchKeyCacheSize");
+ if ( touchKeyCacheSize != null )
+ touchKeyCacheSize_ = Integer.parseInt(touchKeyCacheSize);
+
+ /* Number of days to keep the memtable around w/o flushing */
+ String lifetime = xmlUtils.getNodeValue("/Storage/MemtableLifetimeInDays");
+ if ( lifetime != null )
+ memtableLifetime_ = Integer.parseInt(lifetime);
+
+ /* Size of the memtable in memory in MB before it is dumped */
+ String memtableSize = xmlUtils.getNodeValue("/Storage/MemtableSizeInMB");
+ if ( memtableSize != null )
+ memtableSize_ = Integer.parseInt(memtableSize);
+ /* Number of objects in millions in the memtable before it is dumped */
+ String memtableObjectCount = xmlUtils.getNodeValue("/Storage/MemtableObjectCountInMillions");
+ if ( memtableObjectCount != null )
+ memtableObjectCount_ = Integer.parseInt(memtableObjectCount);
+
+ /* This parameter enables or disables consistency checks.
+ * If set to false the read repairs are disable for very
+ * high throughput on reads but at the cost of consistency.*/
+ String doConsistencyCheck = xmlUtils.getNodeValue("/Storage/DoConsistencyChecksBoolean");
+ if ( doConsistencyCheck != null )
+ doConsistencyCheck_ = Boolean.parseBoolean(doConsistencyCheck);
+
+
+ /* read the size at which we should do column indexes */
+ String columnIndexSizeInKB = xmlUtils.getNodeValue("/Storage/ColumnIndexSizeInKB");
+ if(columnIndexSizeInKB == null)
+ {
+ columnIndexSizeInKB_ = 64;
+ }
+ else
+ {
+ columnIndexSizeInKB_ = Integer.parseInt(columnIndexSizeInKB);
+ }
- /* threshold after which commit log should be rotated. */
- String value = xmlUtils.getNodeValue("/Storage/CommitLogRotationThresholdInMB");
- if ( value != null)
- logRotationThreshold_ = Integer.parseInt(value) * 1024 * 1024;
-
- /* fast sync option */
- value = xmlUtils.getNodeValue("/Storage/CommitLogFastSync");
- if ( value != null )
- fastSync_ = Boolean.parseBoolean(value);
-
- tableToCFMetaDataMap_ = new HashMap<String, Map<String, CFMetaData>>();
-
- /* Rack Aware option */
- value = xmlUtils.getNodeValue("/Storage/RackAware");
- if ( value != null )
- rackAware_ = Boolean.parseBoolean(value);
-
- /* Read the table related stuff from config */
- NodeList tables = xmlUtils.getRequestedNodeList("/Storage/Tables/Table");
- int size = tables.getLength();
- if (size == 0) {
- throw new UnsupportedOperationException("A Table must be configured");
- }
- for ( int i = 0; i < size; ++i )
+ /* metadata directory */
+ metadataDirectory_ = xmlUtils.getNodeValue("/Storage/MetadataDirectory");
+ if ( metadataDirectory_ != null )
+ FileUtils.createDirectory(metadataDirectory_);
+ else
+ {
+ if ( os.equals("Linux") )
{
- Node table = tables.item(i);
-
- /* parsing out the table name */
- String tName = XMLUtils.getAttributeValue(table, "Name");
- tables_.add(tName);
- tableToCFMetaDataMap_.put(tName, new HashMap<String, CFMetaData>());
-
- String xqlTable = "/Storage/Tables/Table[@Name='" + tName + "']/";
- NodeList columnFamilies = xmlUtils.getRequestedNodeList(xqlTable + "ColumnFamily");
-
- // get name of the rowKey for this table
- String n_rowKey = xmlUtils.getNodeValue(xqlTable + "RowKey");
- if (n_rowKey == null)
- n_rowKey = d_rowKey_;
-
- //NodeList columnFamilies = xmlUtils.getRequestedNodeList(table, "ColumnFamily");
- int size2 = columnFamilies.getLength();
-
- for ( int j = 0; j < size2; ++j )
- {
- Node columnFamily = columnFamilies.item(j);
- String cName = XMLUtils.getAttributeValue(columnFamily, "Name");
- if (cName == null)
- {
- throw new IllegalArgumentException("ColumnFamily element missing Name attribute: " + columnFamily);
- }
- String xqlCF = xqlTable + "ColumnFamily[@Name='" + cName + "']/";
-
- /* squirrel away the application column families */
- applicationColumnFamilies_.add(cName);
-
- // Parse out the column type
- String columnType = xmlUtils.getAttributeValue(columnFamily, "ColumnType");
- columnType = ColumnFamily.getColumnType(columnType);
-
- // Parse out the column family sorting property for columns
- String columnIndexProperty = XMLUtils.getAttributeValue(columnFamily, "ColumnSort");
- String columnIndexType = ColumnFamily.getColumnSortProperty(columnIndexProperty);
-
- // Parse out user-specified logical names for the various dimensions
- // of a the column family from the config.
- String n_superColumnMap = xmlUtils.getNodeValue(xqlCF + "SuperColumnMap");
- if (n_superColumnMap == null)
- n_superColumnMap = d_superColumnMap_;
-
- String n_superColumnKey = xmlUtils.getNodeValue(xqlCF + "SuperColumnKey");
- if (n_superColumnKey == null)
- n_superColumnKey = d_superColumnKey_;
-
- String n_columnMap = xmlUtils.getNodeValue(xqlCF + "ColumnMap");
- if (n_columnMap == null)
- n_columnMap = d_columnMap_;
-
- String n_columnKey = xmlUtils.getNodeValue(xqlCF + "ColumnKey");
- if (n_columnKey == null)
- n_columnKey = d_columnKey_;
-
- String n_columnValue = xmlUtils.getNodeValue(xqlCF + "ColumnValue");
- if (n_columnValue == null)
- n_columnValue = d_columnValue_;
-
- String n_columnTimestamp = xmlUtils.getNodeValue(xqlCF + "ColumnTimestamp");
- if (n_columnTimestamp == null)
- n_columnTimestamp = d_columnTimestamp_;
-
- // now populate the column family meta data and
- // insert it into the table dictionary.
- CFMetaData cfMetaData = new CFMetaData();
-
- cfMetaData.tableName = tName;
- cfMetaData.cfName = cName;
-
- cfMetaData.columnType = columnType;
- cfMetaData.indexProperty_ = columnIndexType;
-
- cfMetaData.n_rowKey = n_rowKey;
- cfMetaData.n_columnMap = n_columnMap;
- cfMetaData.n_columnKey = n_columnKey;
- cfMetaData.n_columnValue = n_columnValue;
- cfMetaData.n_columnTimestamp = n_columnTimestamp;
- if ("Super".equals(columnType))
- {
- cfMetaData.n_superColumnKey = n_superColumnKey;
- cfMetaData.n_superColumnMap = n_superColumnMap;
- }
-
- tableToCFMetaDataMap_.get(tName).put(cName, cfMetaData);
- }
+ metadataDirectory_ = "/var/storage/system";
}
+ }
- /* Load the seeds for node contact points */
- String[] seeds = xmlUtils.getNodeValues("/Storage/Seeds/Seed");
- for( int i = 0; i < seeds.length; ++i )
- {
- seeds_.add( seeds[i] );
- }
+ /* snapshot directory */
+ snapshotDirectory_ = xmlUtils.getNodeValue("/Storage/SnapshotDirectory");
+ if ( snapshotDirectory_ != null )
+ FileUtils.createDirectory(snapshotDirectory_);
+ else
+ {
+ snapshotDirectory_ = metadataDirectory_ + System.getProperty("file.separator") + "snapshot";
}
- catch (Exception e)
+
+ /* map output directory */
+ mapOutputDirectories_ = xmlUtils.getNodeValues("/Storage/MapOutputDirectories/MapOutputDirectory");
+ if ( mapOutputDirectories_.length > 0 )
{
- throw new RuntimeException(e);
+ for ( String mapOutputDirectory : mapOutputDirectories_ )
+ FileUtils.createDirectory(mapOutputDirectory);
}
-
- try
+
+ /* data file directory */
+ dataFileDirectories_ = xmlUtils.getNodeValues("/Storage/DataFileDirectories/DataFileDirectory");
+ if ( dataFileDirectories_.length > 0 )
{
- storeMetadata();
+ for ( String dataFileDirectory : dataFileDirectories_ )
+ FileUtils.createDirectory(dataFileDirectory);
}
- catch (IOException e)
+ else
{
- throw new RuntimeException(e);
+ if ( os.equals("Linux") )
+ {
+ dataFileDirectories_ = new String[]{"/var/storage/data"};
+ }
}
- }
- /*
- * Create the metadata tables. This table has information about
- * the table name and the column families that make up the table.
- * Each column family also has an associated ID which is an int.
- */
- private static void storeMetadata() throws IOException
- {
- AtomicInteger idGenerator = new AtomicInteger(0);
- Set<String> tables = tableToCFMetaDataMap_.keySet();
+ /* bootstrap file directory */
+ bootstrapFileDirectory_ = xmlUtils.getNodeValue("/Storage/BootstrapFileDirectory");
+ if ( bootstrapFileDirectory_ != null )
+ FileUtils.createDirectory(bootstrapFileDirectory_);
+ else
+ {
+ if ( os.equals("Linux") )
+ {
+ bootstrapFileDirectory_ = "/var/storage/bootstrap";
+ }
+ }
- for ( String table : tables )
+ /* commit log directory */
+ logFileDirectory_ = xmlUtils.getNodeValue("/Storage/CommitLogDirectory");
+ if ( logFileDirectory_ != null )
+ FileUtils.createDirectory(logFileDirectory_);
+ else
{
- Table.TableMetadata tmetadata = Table.TableMetadata.instance();
- if (tmetadata.isEmpty())
+ if ( os.equals("Linux") )
{
- tmetadata = Table.TableMetadata.instance();
- /* Column families associated with this table */
- Map<String, CFMetaData> columnFamilies = tableToCFMetaDataMap_.get(table);
+ logFileDirectory_ = "/var/storage/commitlog";
+ }
+ }
+
+ /* threshold after which commit log should be rotated. */
+ String value = xmlUtils.getNodeValue("/Storage/CommitLogRotationThresholdInMB");
+ if ( value != null)
+ logRotationThreshold_ = Integer.parseInt(value) * 1024 * 1024;
+
+ /* fast sync option */
+ value = xmlUtils.getNodeValue("/Storage/CommitLogFastSync");
+ if ( value != null )
+ fastSync_ = Boolean.parseBoolean(value);
+
+ tableToCFMetaDataMap_ = new HashMap<String, Map<String, CFMetaData>>();
+
+ /* Rack Aware option */
+ value = xmlUtils.getNodeValue("/Storage/RackAware");
+ if ( value != null )
+ rackAware_ = Boolean.parseBoolean(value);
+
+ /* Read the table related stuff from config */
+ NodeList tables = xmlUtils.getRequestedNodeList("/Storage/Tables/Table");
+ int size = tables.getLength();
+ for ( int i = 0; i < size; ++i )
+ {
+ Node table = tables.item(i);
- for (String columnFamily : columnFamilies.keySet())
+ /* parsing out the table name */
+ String tName = XMLUtils.getAttributeValue(table, "Name");
+ tables_.add(tName);
+ tableToCFMetaDataMap_.put(tName, new HashMap<String, CFMetaData>());
+
+ String xqlTable = "/Storage/Tables/Table[@Name='" + tName + "']/";
+ NodeList columnFamilies = xmlUtils.getRequestedNodeList(xqlTable + "ColumnFamily");
+
+ // get name of the rowKey for this table
+ String n_rowKey = xmlUtils.getNodeValue(xqlTable + "RowKey");
+ if (n_rowKey == null)
+ n_rowKey = d_rowKey_;
+
+ //NodeList columnFamilies = xmlUtils.getRequestedNodeList(table, "ColumnFamily");
+ int size2 = columnFamilies.getLength();
+
+ for ( int j = 0; j < size2; ++j )
+ {
+ Node columnFamily = columnFamilies.item(j);
+ String cName = XMLUtils.getAttributeValue(columnFamily, "Name");
+ String xqlCF = xqlTable + "ColumnFamily[@Name='" + cName + "']/";
+
+ /* squirrel away the application column families */
+ applicationColumnFamilies_.add(cName);
+
+ // Parse out the column type
+ String columnType = xmlUtils.getAttributeValue(columnFamily, "ColumnType");
+ columnType = ColumnFamily.getColumnType(columnType);
+
+ // Parse out the column family sorting property for columns
+ String columnIndexProperty = XMLUtils.getAttributeValue(columnFamily, "ColumnSort");
+ String columnIndexType = ColumnFamily.getColumnSortProperty(columnIndexProperty);
+
+ // Parse out user-specified logical names for the various dimensions
+ // of a the column family from the config.
+ String n_superColumnMap = xmlUtils.getNodeValue(xqlCF + "SuperColumnMap");
+ if (n_superColumnMap == null)
+ n_superColumnMap = d_superColumnMap_;
+
+ String n_superColumnKey = xmlUtils.getNodeValue(xqlCF + "SuperColumnKey");
+ if (n_superColumnKey == null)
+ n_superColumnKey = d_superColumnKey_;
+
+ String n_columnMap = xmlUtils.getNodeValue(xqlCF + "ColumnMap");
+ if (n_columnMap == null)
+ n_columnMap = d_columnMap_;
+
+ String n_columnKey = xmlUtils.getNodeValue(xqlCF + "ColumnKey");
+ if (n_columnKey == null)
+ n_columnKey = d_columnKey_;
+
+ String n_columnValue = xmlUtils.getNodeValue(xqlCF + "ColumnValue");
+ if (n_columnValue == null)
+ n_columnValue = d_columnValue_;
+
+ String n_columnTimestamp = xmlUtils.getNodeValue(xqlCF + "ColumnTimestamp");
+ if (n_columnTimestamp == null)
+ n_columnTimestamp = d_columnTimestamp_;
+
+ // now populate the column family meta data and
+ // insert it into the table dictionary.
+ CFMetaData cfMetaData = new CFMetaData();
+
+ cfMetaData.tableName = tName;
+ cfMetaData.cfName = cName;
+
+ cfMetaData.columnType = columnType;
+ cfMetaData.indexProperty_ = columnIndexType;
+
+ cfMetaData.n_rowKey = n_rowKey;
+ cfMetaData.n_columnMap = n_columnMap;
+ cfMetaData.n_columnKey = n_columnKey;
+ cfMetaData.n_columnValue = n_columnValue;
+ cfMetaData.n_columnTimestamp = n_columnTimestamp;
+ if ("Super".equals(columnType))
{
- tmetadata.add(columnFamily, idGenerator.getAndIncrement(), DatabaseDescriptor.getColumnType(columnFamily));
+ cfMetaData.n_superColumnKey = n_superColumnKey;
+ cfMetaData.n_superColumnMap = n_superColumnMap;
}
- /*
- * Here we add all the system related column families.
- */
- /* Add the TableMetadata column family to this map. */
- tmetadata.add(Table.TableMetadata.cfName_, idGenerator.getAndIncrement());
- /* Add the LocationInfo column family to this map. */
- tmetadata.add(SystemTable.cfName_, idGenerator.getAndIncrement());
- /* Add the recycle column family to this map. */
- tmetadata.add(Table.recycleBin_, idGenerator.getAndIncrement());
- /* Add the Hints column family to this map. */
- tmetadata.add(Table.hints_, idGenerator.getAndIncrement(), ColumnFamily.getColumnType("Super"));
- tmetadata.apply();
- idGenerator.set(0);
+ tableToCFMetaDataMap_.get(tName).put(cName, cfMetaData);
}
}
- }
-
+ /* Load the seeds for node contact points */
+ String[] seeds = xmlUtils.getNodeValues("/Storage/Seeds/Seed");
+ for( int i = 0; i < seeds.length; ++i )
+ {
+ seeds_.add( seeds[i] );
+ }
+ return tableToCFMetaDataMap_;
+ }
- public static String getPartitionerClass()
+ public static String getHashingStrategy()
{
- return partitionerClass_;
+ return hashingStrategy_;
}
public static String getZkAddress()
@@ -792,14 +735,9 @@
return TypeInfo.LONG;
}
}
-
- public static Map<String, Map<String, CFMetaData>> getTableToColumnFamilyMap()
- {
- return tableToCFMetaDataMap_;
- }
-
- public static String getTableName()
+
+ public static void main(String[] args) throws Throwable
{
- return tables_.get(0);
+ DatabaseDescriptor.initInternal("C:\\Engagements\\Cassandra-Golden\\storage-conf.xml");
}
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java Fri Mar 27 06:45:19 2009
@@ -33,6 +33,7 @@
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
+import org.apache.cassandra.db.*;
/**
* A Row Source Defintion (RSD) for doing a range query on a column map
@@ -111,7 +112,7 @@
List<Map<String, String>> rows = new LinkedList<Map<String, String>>();
if (row != null)
{
- Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+ Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
if (cfMap != null && cfMap.size() > 0)
{
ColumnFamily cfamily = cfMap.get(cfMetaData_.cfName);
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java Fri Mar 27 06:45:19 2009
@@ -18,6 +18,7 @@
package org.apache.cassandra.cql.common;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
@@ -31,8 +32,11 @@
import org.apache.cassandra.db.Row;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.column_t;
+import org.apache.cassandra.service.superColumn_t;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
+import org.apache.cassandra.db.*;
/**
* A Row Source Defintion (RSD) for doing a super column range query on a Super Column Family.
@@ -80,7 +84,7 @@
List<Map<String, String>> rows = new LinkedList<Map<String, String>>();
if (row != null)
{
- Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+ Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
if (cfMap != null && cfMap.size() > 0)
{
ColumnFamily cfamily = cfMap.get(cfMetaData_.cfName);
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java Fri Mar 27 06:45:19 2009
@@ -33,6 +33,7 @@
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
+import org.apache.cassandra.db.*;
/**
* A Row Source Defintion (RSD) for looking up a unique column within a column family.
@@ -95,7 +96,7 @@
if (row != null)
{
- Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+ Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
if (cfMap != null && cfMap.size() > 0)
{
ColumnFamily cfamily = cfMap.get(cfMetaData_.cfName);
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/gms/FailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/FailureDetector.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/FailureDetector.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/FailureDetector.java Fri Mar 27 06:45:19 2009
@@ -177,7 +177,7 @@
/* We need this so that we do not suspect a convict. */
boolean isConvicted = false;
double phi = hbWnd.phi(now);
- logger_.trace("PHI for " + ep + " : " + phi);
+ logger_.info("PHI for " + ep + " : " + phi);
/*
if ( phi > phiConvictThreshold_ )
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/Gossiper.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/Gossiper.java Fri Mar 27 06:45:19 2009
@@ -76,7 +76,7 @@
if ( !bVal )
doGossipToSeed(message);
- logger_.trace("Performing status check ...");
+ logger_.debug("Performing status check ...");
doStatusCheck();
}
}
@@ -344,7 +344,7 @@
sb.append(gDigest);
sb.append(" ");
}
- logger_.trace("Gossip Digests are : " + sb.toString());
+ logger_.debug("Gossip Digests are : " + sb.toString());
}
public int getCurrentGenerationNumber(EndPoint endpoint)
@@ -367,7 +367,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
DataOutputStream dos = new DataOutputStream(bos);
GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
- logger_.trace("@@@@ Size of GossipDigestAckMessage is " + bos.toByteArray().length);
+ logger_.debug("@@@@ Size of GossipDigestAckMessage is " + bos.toByteArray().length);
Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK_VERB, new Object[]{bos.toByteArray()});
return message;
}
@@ -392,7 +392,7 @@
}
EndPoint to = eps.get(++rrIndex_);
- logger_.trace("Sending a GossipDigestSynMessage to " + to + " ...");
+ logger_.info("Sending a GossipDigestSynMessage to " + to + " ...");
MessagingService.getMessagingInstance().sendUdpOneWay(message, to);
return seeds_.contains(to);
}
@@ -411,7 +411,7 @@
List<EndPoint> liveEndPoints = new ArrayList<EndPoint>(epSet);
int index = (size == 1) ? 0 : random_.nextInt(size);
EndPoint to = liveEndPoints.get(index);
- logger_.trace("Sending a GossipDigestSynMessage to " + to + " ...");
+ logger_.info("Sending a GossipDigestSynMessage to " + to + " ...");
MessagingService.getMessagingInstance().sendUdpOneWay(message, to);
return seeds_.contains(to);
}
@@ -977,7 +977,7 @@
public void doVerb(Message message)
{
EndPoint from = message.getFrom();
- logger_.trace("Received a GossipDigestSynMessage from " + from);
+ logger_.info("Received a GossipDigestSynMessage from " + from);
byte[] bytes = (byte[])message.getMessageBody()[0];
DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
@@ -1001,7 +1001,7 @@
GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap);
Message gDigestAckMessage = Gossiper.instance().makeGossipDigestAckMessage(gDigestAck);
- logger_.trace("Sending a GossipDigestAckMessage to " + from);
+ logger_.info("Sending a GossipDigestAckMessage to " + from);
MessagingService.getMessagingInstance().sendUdpOneWay(gDigestAckMessage, from);
}
catch (IOException e)
@@ -1061,7 +1061,7 @@
public void doVerb(Message message)
{
EndPoint from = message.getFrom();
- logger_.trace("Received a GossipDigestAckMessage from " + from);
+ logger_.info("Received a GossipDigestAckMessage from " + from);
byte[] bytes = (byte[])message.getMessageBody()[0];
DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
@@ -1091,7 +1091,7 @@
GossipDigestAck2Message gDigestAck2 = new GossipDigestAck2Message(deltaEpStateMap);
Message gDigestAck2Message = Gossiper.instance().makeGossipDigestAck2Message(gDigestAck2);
- logger_.trace("Sending a GossipDigestAck2Message to " + from);
+ logger_.info("Sending a GossipDigestAck2Message to " + from);
MessagingService.getMessagingInstance().sendUdpOneWay(gDigestAck2Message, from);
}
catch ( IOException e )
@@ -1108,7 +1108,7 @@
public void doVerb(Message message)
{
EndPoint from = message.getFrom();
- logger_.trace("Received a GossipDigestAck2Message from " + from);
+ logger_.info("Received a GossipDigestAck2Message from " + from);
byte[] bytes = (byte[])message.getMessageBody()[0];
DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java Fri Mar 27 06:45:19 2009
@@ -21,10 +21,10 @@
* Section of a file that needs to be scanned
* is represented by this class.
*/
-public class Coordinate
+class Coordinate
{
- public final long start_;
- public final long end_;
+ long start_;
+ long end_;
Coordinate(long start, long end)
{
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java Fri Mar 27 06:45:19 2009
@@ -92,6 +92,31 @@
/**
* This method dumps the next key/value into the DataOuputStream
+ * passed in.
+ *
+ * @param key key we are interested in.
+ * @param dos DataOutputStream that needs to be filled.
+ * @param section region of the file that needs to be read
+ * @throws IOException
+ * @return the number of bytes read.
+ */
+ public long next(String key, DataOutputBuffer bufOut, Coordinate section) throws IOException;
+
+ /**
+ * This method dumps the next key/value into the DataOuputStream
+ * passed in.
+ *
+ * @param key key we are interested in.
+ * @param dos DataOutputStream that needs to be filled.
+ * @param column name of the column in our format.
+ * @param section region of the file that needs to be read
+ * @throws IOException
+ * @return number of bytes that were read.
+ */
+ public long next(String key, DataOutputBuffer bufOut, String column, Coordinate section) throws IOException;
+
+ /**
+ * This method dumps the next key/value into the DataOuputStream
* passed in. Always use this method to query for application
* specific data as it will have indexes.
*
@@ -105,9 +130,23 @@
* @return number of bytes read.
*
*/
- public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, List<String> columnNames, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException;
+ public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, List<String> columnNames, Coordinate section) throws IOException;
/**
+ * This method dumps the next key/value into the DataOuputStream
+ * passed in.
+ *
+ * @param key key we are interested in.
+ * @param dos DataOutputStream that needs to be filled.
+ * @param column name of the column in our format.
+ * @param timeRange time range we are interested in.
+ * @param section region of the file that needs to be read
+ * @throws IOException
+ * @return number of bytes that were read.
+ */
+ public long next(String key, DataOutputBuffer bufOut, String column, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException;
+
+ /**
* Close the file after reading.
* @throws IOException
*/
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java Fri Mar 27 06:45:19 2009
@@ -18,8 +18,11 @@
package org.apache.cassandra.io;
+import java.io.File;
import java.io.IOException;
+import org.apache.cassandra.db.PrimaryKey;
+
/**
* An interface for writing into the SequenceFile abstraction.
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java Fri Mar 27 06:45:19 2009
@@ -18,30 +18,24 @@
package org.apache.cassandra.io;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Hashtable;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import org.apache.log4j.Logger;
+import java.io.*;
+import java.math.BigInteger;
+import java.nio.channels.FileChannel;
+import java.util.*;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.service.PartitionerType;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.BasicUtilities;
import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.dht.IPartitioner;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
/**
* This class is built on top of the SequenceFile. It stores
@@ -144,26 +138,62 @@
}
/**
+ * This compares two strings and does it in reverse
+ * order.
+ *
+ * @author alakshman
+ *
+ */
+ private static class OrderPreservingPartitionerComparator implements Comparator<String>
+ {
+ public int compare(String c1, String c2)
+ {
+ return c2.compareTo(c1);
+ }
+ }
+
+ /**
+ * This class compares two BigInteger's passes in
+ * as strings and does so in reverse order.
+ * @author alakshman
+ *
+ */
+ private static class RandomPartitionerComparator implements Comparator<String>
+ {
+ public int compare(String c1, String c2)
+ {
+ BigInteger b1 = new BigInteger(c1);
+ BigInteger b2 = new BigInteger(c2);
+ return b2.compareTo(b1);
+ }
+ }
+
+ /**
* This is a simple container for the index Key and its corresponding position
* in the data file. Binary search is performed on a list of these objects
* to lookup keys within the SSTable data file.
*/
public static class KeyPositionInfo implements Comparable<KeyPositionInfo>
{
- private final String decoratedKey;
+ private String key_;
private long position_;
- public KeyPositionInfo(String decoratedKey)
+ public KeyPositionInfo(String key)
{
- this.decoratedKey = decoratedKey;
+ key_ = key;
}
- public KeyPositionInfo(String decoratedKey, long position)
+ public KeyPositionInfo(String key, long position)
{
- this(decoratedKey);
+ this(key);
position_ = position;
}
+ public String key()
+ {
+ return key_;
+ }
+
public long position()
{
return position_;
@@ -171,13 +201,25 @@
public int compareTo(KeyPositionInfo kPosInfo)
{
- IPartitioner p = StorageService.getPartitioner();
- return p.getDecoratedKeyComparator().compare(decoratedKey, kPosInfo.decoratedKey);
+ int value = 0;
+ PartitionerType pType = StorageService.getPartitionerType();
+ switch( pType )
+ {
+ case OPHF:
+ value = key_.compareTo(kPosInfo.key_);
+ break;
+
+ default:
+ BigInteger b = new BigInteger(key_);
+ value = b.compareTo( new BigInteger(kPosInfo.key_) );
+ break;
+ }
+ return value;
}
public String toString()
{
- return decoratedKey + ":" + position_;
+ return key_ + ":" + position_;
}
}
@@ -262,7 +304,7 @@
List<String> indexedKeys = new ArrayList<String>();
for ( KeyPositionInfo keyPositionInfo : keyPositionInfos )
{
- indexedKeys.add(keyPositionInfo.decoratedKey);
+ indexedKeys.add(keyPositionInfo.key_);
}
Collections.sort(indexedKeys);
@@ -319,13 +361,13 @@
* Determines if the given key is in the specified file. If the
* key is not present then we skip processing this file.
*/
- public static boolean isKeyInFile(String clientKey, String filename)
+ public static boolean isKeyInFile(String key, String filename)
{
boolean bVal = false;
BloomFilter bf = bfs_.get(filename);
if ( bf != null )
{
- bVal = bf.isPresent(clientKey);
+ bVal = bf.isPresent(key);
}
return bVal;
}
@@ -358,13 +400,48 @@
public SSTable(String directory, String filename) throws IOException
{
dataFile_ = directory + System.getProperty("file.separator") + filename + "-Data.db";
- blockIndex_ = new TreeMap<String, BlockMetadata>(StorageService.getPartitioner().getReverseDecoratedKeyComparator());
- blockIndexes_ = new ArrayList<SortedMap<String, BlockMetadata>>();
- dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4*1024*1024);
- // dataWriter_ = SequenceFile.chksumWriter(dataFile_, 4*1024*1024);
- SSTable.positionAfterFirstBlockIndex_ = dataWriter_.getCurrentPosition();
+ blockIndex_ = new TreeMap<String, BlockMetadata>(Collections.reverseOrder());
+ blockIndexes_ = new ArrayList<SortedMap<String, BlockMetadata>>();
+ // dataWriter_ = SequenceFile.writer(dataFile_);
+ dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4*1024*1024);
+ SSTable.positionAfterFirstBlockIndex_ = dataWriter_.getCurrentPosition();
}
-
+
+ private void initBlockIndex()
+ {
+ initBlockIndex(StorageService.getPartitionerType());
+ }
+
+ private void initBlockIndex(PartitionerType pType)
+ {
+ switch ( pType )
+ {
+ case OPHF:
+ blockIndex_ = new TreeMap<String, BlockMetadata>( new SSTable.OrderPreservingPartitionerComparator() );
+ break;
+
+ default:
+ blockIndex_ = new TreeMap<String, BlockMetadata>( new SSTable.RandomPartitionerComparator() );
+ break;
+ }
+ }
+
+ /**
+ * This ctor is used for DB writes into the SSTable. Use this
+ * version to write to the SSTable.
+ */
+ public SSTable(String directory, String filename, PartitionerType pType) throws IOException
+ {
+ dataFile_ = directory + System.getProperty("file.separator") + filename + "-Data.db";
+ // dataWriter_ = SequenceFile.writer(dataFile_);
+ dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4*1024*1024);
+ // dataWriter_ = SequenceFile.chksumWriter(dataFile_, 4*1024*1024);
+ SSTable.positionAfterFirstBlockIndex_ = dataWriter_.getCurrentPosition();
+ /* set up the block index based on partition type */
+ initBlockIndex(pType);
+ blockIndexes_ = new ArrayList<SortedMap<String, BlockMetadata>>();
+ }
+
private void loadBloomFilter(IFileReader indexReader, long size) throws IOException
{
/* read the position of the bloom filter */
@@ -381,8 +458,8 @@
indexReader.next(bufOut);
bufOut.close();
bufIn.reset(bufOut.getData(), bufOut.getLength());
- String clientKey = bufIn.readUTF();
- if ( clientKey.equals(SequenceFile.marker_) )
+ String key = bufIn.readUTF();
+ if ( key.equals(SequenceFile.marker_) )
{
/*
* We are now reading the serialized Bloom Filter. We read
@@ -515,26 +592,31 @@
return getFile(dataFile_);
}
+ public long lastModified()
+ {
+ return dataWriter_.lastModified();
+ }
+
/*
* Seeks to the specified key on disk.
*/
- public void touch(final String clientKey, boolean fData) throws IOException
+ public void touch(String key, boolean fData) throws IOException
{
- if (touchCache_.containsKey(dataFile_ + ":" + clientKey))
+ if ( touchCache_.containsKey(key) )
return;
IFileReader dataReader = SequenceFile.reader(dataFile_);
try
{
/* Morph the key */
- String decoratedKey = StorageService.getPartitioner().decorateKey(clientKey);
- Coordinate fileCoordinate = getCoordinates(decoratedKey, dataReader);
+ key = morphKey(key);
+ Coordinate fileCoordinate = getCoordinates(key, dataReader);
/* Get offset of key from block Index */
dataReader.seek(fileCoordinate.end_);
- BlockMetadata blockMetadata = dataReader.getBlockMetadata(decoratedKey);
+ BlockMetadata blockMetadata = dataReader.getBlockMetadata(key);
if ( blockMetadata.position_ != -1L )
{
- touchCache_.put(dataFile_ + ":" + clientKey, blockMetadata.position_);
+ touchCache_.put(dataFile_ + ":" + key, blockMetadata.position_);
}
if ( fData )
@@ -557,34 +639,67 @@
}
}
- private long beforeAppend(String decoratedKey) throws IOException
+ private long beforeAppend(String key) throws IOException
{
- if (decoratedKey == null )
+ if(key == null )
throw new IOException("Keys must not be null.");
- Comparator<String> c = StorageService.getPartitioner().getReverseDecoratedKeyComparator();
- if ( lastWrittenKey_ != null && c.compare(lastWrittenKey_, decoratedKey) <= 0 )
+ if ( lastWrittenKey_ != null && key.compareTo(lastWrittenKey_) <= 0 )
{
logger_.info("Last written key : " + lastWrittenKey_);
- logger_.info("Current key : " + decoratedKey);
+ logger_.info("Current key : " + key);
logger_.info("Writing into file " + dataFile_);
throw new IOException("Keys must be written in ascending order.");
}
- return (lastWrittenKey_ == null) ? SSTable.positionAfterFirstBlockIndex_ : dataWriter_.getCurrentPosition();
+ long currentPosition = (lastWrittenKey_ == null) ? SSTable.positionAfterFirstBlockIndex_ : dataWriter_.getCurrentPosition();
+ return currentPosition;
+ }
+
+ private long beforeAppend(BigInteger hash) throws IOException
+ {
+ if(hash == null )
+ throw new IOException("Keys must not be null.");
+ if ( lastWrittenKey_ != null )
+ {
+ BigInteger previousKey = new BigInteger(lastWrittenKey_);
+ if ( hash.compareTo(previousKey) <= 0 )
+ {
+ logger_.info("Last written key : " + previousKey);
+ logger_.info("Current key : " + hash);
+ logger_.info("Writing into file " + dataFile_);
+ throw new IOException("Keys must be written in ascending order.");
+ }
+ }
+ long currentPosition = (lastWrittenKey_ == null) ? SSTable.positionAfterFirstBlockIndex_ : dataWriter_.getCurrentPosition();
+ return currentPosition;
}
- private void afterAppend(String decoratedKey, long position, long size) throws IOException
+ private void afterAppend(String key, long position, long size) throws IOException
{
++indexKeysWritten_;
- lastWrittenKey_ = decoratedKey;
- blockIndex_.put(decoratedKey, new BlockMetadata(position, size));
+ lastWrittenKey_ = key;
+ blockIndex_.put(key, new BlockMetadata(position, size));
if ( indexKeysWritten_ == indexInterval_ )
{
blockIndexes_.add(blockIndex_);
- blockIndex_ = new TreeMap<String, BlockMetadata>(StorageService.getPartitioner().getReverseDecoratedKeyComparator());
+ blockIndex_ = new TreeMap<String, BlockMetadata>(Collections.reverseOrder());
indexKeysWritten_ = 0;
}
}
-
+
+ private void afterAppend(BigInteger hash, long position, long size) throws IOException
+ {
+ ++indexKeysWritten_;
+ String key = hash.toString();
+ lastWrittenKey_ = key;
+ blockIndex_.put(key, new BlockMetadata(position, size));
+ if ( indexKeysWritten_ == indexInterval_ )
+ {
+ blockIndexes_.add(blockIndex_);
+ initBlockIndex();
+ indexKeysWritten_ = 0;
+ }
+ }
+
/**
* Dumps all the block indicies for this SSTable
* at the end of the file.
@@ -615,10 +730,10 @@
Set<String> keys = blockIndex.keySet();
/* Number of keys in this block */
bufOut.writeInt(keys.size());
- for ( String decoratedKey : keys )
+ for ( String key : keys )
{
- bufOut.writeUTF(decoratedKey);
- BlockMetadata blockMetadata = blockIndex.get(decoratedKey);
+ bufOut.writeUTF(key);
+ BlockMetadata blockMetadata = blockIndex.get(key);
/* position of the key as a relative offset */
bufOut.writeLong(position - blockMetadata.position_);
bufOut.writeLong(blockMetadata.size_);
@@ -637,31 +752,45 @@
blockIndex.clear();
}
- public void append(String clientKey, DataOutputBuffer buffer) throws IOException
+ public void append(String key, DataOutputBuffer buffer) throws IOException
{
- String decoratedKey = StorageService.getPartitioner().decorateKey(clientKey);
- long currentPosition = beforeAppend(decoratedKey);
- dataWriter_.append(decoratedKey, buffer);
- afterAppend(decoratedKey, currentPosition, buffer.getLength());
+ long currentPosition = beforeAppend(key);
+ dataWriter_.append(key, buffer);
+ afterAppend(key, currentPosition, buffer.getLength());
+ }
+
+ public void append(String key, BigInteger hash, DataOutputBuffer buffer) throws IOException
+ {
+ long currentPosition = beforeAppend(hash);
+ /* Use as key - hash + ":" + key */
+ dataWriter_.append(hash + ":" + key, buffer);
+ afterAppend(hash, currentPosition, buffer.getLength());
}
- public void append(String clientKey, byte[] value) throws IOException
+ public void append(String key, byte[] value) throws IOException
+ {
+ long currentPosition = beforeAppend(key);
+ dataWriter_.append(key, value);
+ afterAppend(key, currentPosition, value.length );
+ }
+
+ public void append(String key, BigInteger hash, byte[] value) throws IOException
{
- String decoratedKey = StorageService.getPartitioner().decorateKey(clientKey);
- long currentPosition = beforeAppend(decoratedKey);
- dataWriter_.append(decoratedKey, value);
- afterAppend(decoratedKey, currentPosition, value.length );
+ long currentPosition = beforeAppend(hash);
+ /* Use as key - hash + ":" + key */
+ dataWriter_.append(hash + ":" + key, value);
+ afterAppend(hash, currentPosition, value.length);
}
- public static Coordinate getCoordinates(String decoratedKey, IFileReader dataReader) throws IOException
+ private Coordinate getCoordinates(String key, IFileReader dataReader) throws IOException
{
- List<KeyPositionInfo> indexInfo = indexMetadataMap_.get(dataReader.getFileName());
+ List<KeyPositionInfo> indexInfo = indexMetadataMap_.get(dataFile_);
int size = (indexInfo == null) ? 0 : indexInfo.size();
long start = 0L;
long end = dataReader.getEOF();
if ( size > 0 )
{
- int index = Collections.binarySearch(indexInfo, new KeyPositionInfo(decoratedKey));
+ int index = Collections.binarySearch(indexInfo, new KeyPositionInfo(key));
if ( index < 0 )
{
/*
@@ -711,41 +840,97 @@
return new Coordinate(start, end);
}
- public DataInputBuffer next(final String clientKey, String cfName, List<String> columnNames) throws IOException
+ /**
+ * Convert the application key into the appropriate application
+ * key based on the partition type.
+ *
+ * @param key the application key
+ * @return the appropriate key based on partition mechanism
+ */
+ private String morphKey(String key)
{
- return next(clientKey, cfName, columnNames, null);
+ String internalKey = key;
+ PartitionerType pType = StorageService.getPartitionerType();
+ switch ( pType )
+ {
+ case OPHF:
+ break;
+
+ default:
+ internalKey = FBUtilities.hash(key).toString();
+ break;
+ }
+ return internalKey;
}
-
- public DataInputBuffer next(final String clientKey, String cfName, List<String> columnNames, IndexHelper.TimeRange timeRange) throws IOException
+
+ public DataInputBuffer next(String key, String cf, List<String> cNames) throws IOException
{
+ DataInputBuffer bufIn = null;
+ IFileReader dataReader = null;
+ try
+ {
+ dataReader = SequenceFile.reader(dataFile_);
+ /* Morph key into actual key based on the partition type. */
+ key = morphKey(key);
+ Coordinate fileCoordinate = getCoordinates(key, dataReader);
+ /*
+ * we have the position we have to read from in order to get the
+ * column family, get the column family and column(s) needed.
+ */
+ bufIn = getData(dataReader, key, cf, cNames, fileCoordinate);
+ }
+ finally
+ {
+ if ( dataReader != null )
+ {
+ dataReader.close();
+ }
+ }
+ return bufIn;
+ }
+
+ public DataInputBuffer next(String key, String columnName) throws IOException
+ {
+ DataInputBuffer bufIn = null;
IFileReader dataReader = null;
try
{
dataReader = SequenceFile.reader(dataFile_);
// dataReader = SequenceFile.chksumReader(dataFile_, 4*1024*1024);
-
- /* Morph key into actual key based on the partition type. */
- String decoratedKey = StorageService.getPartitioner().decorateKey(clientKey);
- Coordinate fileCoordinate = getCoordinates(decoratedKey, dataReader);
+ /* Morph key into actual key based on the partition type. */
+ key = morphKey(key);
+ Coordinate fileCoordinate = getCoordinates(key, dataReader);
/*
* we have the position we have to read from in order to get the
* column family, get the column family and column(s) needed.
- */
- DataOutputBuffer bufOut = new DataOutputBuffer();
- DataInputBuffer bufIn = new DataInputBuffer();
-
- long bytesRead = dataReader.next(decoratedKey, bufOut, cfName, columnNames, timeRange, fileCoordinate);
- if ( bytesRead != -1L )
+ */
+ bufIn = getData(dataReader, key, columnName, fileCoordinate);
+ }
+ finally
+ {
+ if ( dataReader != null )
{
- if ( bufOut.getLength() > 0 )
- {
- bufIn.reset(bufOut.getData(), bufOut.getLength());
- /* read the key even though we do not use it */
- bufIn.readUTF();
- bufIn.readInt();
- }
+ dataReader.close();
}
- return bufIn;
+ }
+ return bufIn;
+ }
+
+ public DataInputBuffer next(String key, String columnName, IndexHelper.TimeRange timeRange) throws IOException
+ {
+ DataInputBuffer bufIn = null;
+ IFileReader dataReader = null;
+ try
+ {
+ dataReader = SequenceFile.reader(dataFile_);
+ /* Morph key into actual key based on the partition type. */
+ key = morphKey(key);
+ Coordinate fileCoordinate = getCoordinates(key, dataReader);
+ /*
+ * we have the position we have to read from in order to get the
+ * column family, get the column family and column(s) needed.
+ */
+ bufIn = getData(dataReader, key, columnName, timeRange, fileCoordinate);
}
finally
{
@@ -754,14 +939,125 @@
dataReader.close();
}
}
+ return bufIn;
}
-
- public DataInputBuffer next(String clientKey, String columnFamilyColumn) throws IOException
+
+ long getSeekPosition(String key, long start)
+ {
+ Long seekStart = touchCache_.get(dataFile_ + ":" + key);
+ if( seekStart != null)
+ {
+ return seekStart;
+ }
+ return start;
+ }
+
+ /*
+ * Get the data for the key from the position passed in.
+ */
+ private DataInputBuffer getData(IFileReader dataReader, String key, String column, Coordinate section) throws IOException
+ {
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ DataInputBuffer bufIn = new DataInputBuffer();
+
+ long bytesRead = dataReader.next(key, bufOut, column, section);
+ if ( bytesRead != -1L )
+ {
+ if ( bufOut.getLength() > 0 )
+ {
+ bufIn.reset(bufOut.getData(), bufOut.getLength());
+ /* read the key even though we do not use it */
+ bufIn.readUTF();
+ bufIn.readInt();
+ }
+ }
+
+ return bufIn;
+ }
+
+ private DataInputBuffer getData(IFileReader dataReader, String key, String cf, List<String> columns, Coordinate section) throws IOException
{
- String[] values = RowMutation.getColumnAndColumnFamily(columnFamilyColumn);
- String columnFamilyName = values[0];
- List<String> cnNames = (values.length == 1) ? null : Arrays.asList(values[1]);
- return next(clientKey, columnFamilyName, cnNames);
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ DataInputBuffer bufIn = new DataInputBuffer();
+
+ long bytesRead = dataReader.next(key, bufOut, cf, columns, section);
+ if ( bytesRead != -1L )
+ {
+ if ( bufOut.getLength() > 0 )
+ {
+ bufIn.reset(bufOut.getData(), bufOut.getLength());
+ /* read the key even though we do not use it */
+ bufIn.readUTF();
+ bufIn.readInt();
+ }
+ }
+ return bufIn;
+ }
+
+ /*
+ * Get the data for the key from the position passed in.
+ */
+ private DataInputBuffer getData(IFileReader dataReader, String key, String column, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException
+ {
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ DataInputBuffer bufIn = new DataInputBuffer();
+
+ try
+ {
+ dataReader.next(key, bufOut, column, timeRange, section);
+ if ( bufOut.getLength() > 0 )
+ {
+ bufIn.reset(bufOut.getData(), bufOut.getLength());
+ /* read the key even though we do not use it */
+ bufIn.readUTF();
+ bufIn.readInt();
+ }
+ }
+ catch ( IOException ex )
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ return bufIn;
+ }
+
+ /*
+ * Given a key we are interested in this method gets the
+ * closest index before the key on disk.
+ *
+ * param @ key - key we are interested in.
+ * return position of the closest index before the key
+ * on disk or -1 if this key is not on disk.
+ */
+ private long getClosestIndexPositionToKeyOnDisk(String key)
+ {
+ long position = -1L;
+ List<KeyPositionInfo> indexInfo = indexMetadataMap_.get(dataFile_);
+ int size = indexInfo.size();
+ int index = Collections.binarySearch(indexInfo, new KeyPositionInfo(key));
+ if ( index < 0 )
+ {
+ /*
+ * We are here which means that the requested
+ * key is not an index.
+ */
+ index = (++index)*(-1);
+ /* this means key is not present at all */
+ if ( index >= size )
+ return position;
+ /* a scan is in order. */
+ position = (index == 0) ? 0 : indexInfo.get(index - 1).position();
+ }
+ else
+ {
+ /*
+ * If we are here that means the key is in the index file
+ * and we can retrieve it w/o a scan. In reality we would
+ * like to have a retreive(key, fromPosition) but for now
+ * we use scan(start, start + 1) - a hack.
+ */
+ position = indexInfo.get(index).position();
+ }
+ return position;
}
public void close() throws IOException